diff --git a/src/main/java/com/rabbitmq/client/amqp/Message.java b/src/main/java/com/rabbitmq/client/amqp/Message.java index 75ca29b49..3792a4f86 100644 --- a/src/main/java/com/rabbitmq/client/amqp/Message.java +++ b/src/main/java/com/rabbitmq/client/amqp/Message.java @@ -591,6 +591,19 @@ public interface Message { */ byte[] body(); + /** + * Mark the message as durable or not. + * + *

Messages are durable by default, use false to make them explicitly non-durable. + * + *

Durability depends also on the queue messages end up in (e.g. quorum queues and streams + * always store messages durably). + * + * @param durable true for a durable message, false for a non-durable message + * @return the message + */ + Message durable(boolean durable); + /** * Whether the message is durable. * diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java index 988f7cab9..17c52f78b 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java @@ -35,6 +35,8 @@ final class AmqpMessage implements Message { private final org.apache.qpid.protonj2.client.Message delegate; + private boolean durableIsSet = false; + AmqpMessage() { this(org.apache.qpid.protonj2.client.Message.create(EMPTY_BODY)); } @@ -455,6 +457,14 @@ public byte[] body() { } // header section + + @Override + public Message durable(boolean durable) { + this.durableIsSet = true; + callOnDelegate(m -> m.durable(durable)); + return this; + } + @Override public boolean durable() { return returnFromDelegate(org.apache.qpid.protonj2.client.Message::durable); @@ -535,6 +545,13 @@ public MessageAddressBuilder replyToAddress() { return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.REPLY_TO_CALLBACK); } + AmqpMessage enforceDurability() throws ClientException { + if (!this.durableIsSet) { + this.delegate.durable(true); + } + return this; + } + private static class DefaultMessageAddressBuilder extends DefaultAddressBuilder implements MessageAddressBuilder { diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java index e8c72b202..91c717819 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java @@ -86,9 +86,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher { this.publishCall = msg -> { try { - org.apache.qpid.protonj2.client.Message nativeMessage = - ((AmqpMessage) msg).nativeMessage(); - return this.sender.send(nativeMessage.durable(true)); + return this.doSend((AmqpMessage) msg); } catch (ClientIllegalStateException e) { // the link is closed LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage()); @@ -154,6 +152,11 @@ private Status mapDeliveryState(DeliveryState in) { } } + private Tracker doSend(AmqpMessage msg) throws ClientException { + msg.enforceDurability(); + return this.sender.send(msg.nativeMessage()); + } + private static MetricsCollector.PublishDisposition mapToPublishDisposition(Status status) { if (status == Status.ACCEPTED) { return MetricsCollector.PublishDisposition.ACCEPTED; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java index b054204a6..7e18266a4 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java @@ -25,13 +25,32 @@ public class AmqpMessageTest { @Test void toShouldBePathEncoded() { - assertThat(new AmqpMessage().toAddress().exchange("foo bar").message().to()) + assertThat(msg().toAddress().exchange("foo bar").message().to()) .isEqualTo("/exchanges/foo%20bar"); } @Test void replyToShouldBePathEncoded() { - assertThat(new AmqpMessage().replyToAddress().exchange("foo bar").message().replyTo()) + assertThat(msg().replyToAddress().exchange("foo bar").message().replyTo()) .isEqualTo("/exchanges/foo%20bar"); } + + @Test + void shouldBeNonDurableOnlyIfExplicitlySet() throws Exception { + AmqpMessage msg = msg(); + // durable by default + assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue(); + // non-durable explicitly set + msg = msg(); + msg.durable(false); + assertThat(msg.enforceDurability().nativeMessage().durable()).isFalse(); + // durable explicitly set + msg = msg(); + msg.durable(true); + assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue(); + } + + private static AmqpMessage msg() { + return new AmqpMessage(); + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java index 6e50c324f..55a18e669 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java @@ -23,6 +23,7 @@ import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3; +import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0; import static com.rabbitmq.client.amqp.impl.TestUtils.*; import static com.rabbitmq.client.amqp.impl.Utils.threadFactory; import static java.nio.charset.StandardCharsets.*; @@ -36,6 +37,7 @@ import com.rabbitmq.client.amqp.AmqpException; import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.ConsumerBuilder; import com.rabbitmq.client.amqp.Environment; import com.rabbitmq.client.amqp.Management; import com.rabbitmq.client.amqp.Message; @@ -55,6 +57,7 @@ import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; @AmqpTestInfrastructure @@ -906,4 +909,76 @@ void messageAnnotationsSupportListMapArray() { ints = (int[]) m.annotation("x-arrayInt"); org.assertj.core.api.Assertions.assertThat(ints).containsExactly(4, 5, 6); } + + @ParameterizedTest + @CsvSource({ + "CLASSIC,true", + "CLASSIC,false", + "QUORUM,true", + "QUORUM,false", + "STREAM,true", + "STREAM,false" + }) + @BrokerVersionAtLeast(RABBITMQ_4_2_0) + void explicitDurabilityShouldBeEnforced(Management.QueueType type, boolean durable) { + try { + connection.management().queue(this.name).type(type).declare(); + Publisher p = connection.publisherBuilder().queue(this.name).build(); + p.publish(p.message().durable(durable), ctx -> {}); + + Sync consumeSync = sync(); + AtomicReference messageRef = new AtomicReference<>(); + ConsumerBuilder builder = + connection + .consumerBuilder() + .queue(this.name) + .messageHandler( + (context, message) -> { + messageRef.set(message); + context.accept(); + consumeSync.down(); + }); + if (type == STREAM) { + builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST); + } + builder.build(); + assertThat(consumeSync).completes(); + Message message = messageRef.get(); + assertThat(message).isDurable(durable); + } finally { + connection.management().queueDelete(this.name); + } + } + + @ParameterizedTest + @EnumSource(Management.QueueType.class) + void durableByDefault(Management.QueueType type) { + try { + connection.management().queue(this.name).type(type).declare(); + Publisher p = connection.publisherBuilder().queue(this.name).build(); + p.publish(p.message(), ctx -> {}); + + Sync consumeSync = sync(); + AtomicReference messageRef = new AtomicReference<>(); + ConsumerBuilder builder = + connection + .consumerBuilder() + .queue(this.name) + .messageHandler( + (context, message) -> { + messageRef.set(message); + context.accept(); + consumeSync.down(); + }); + if (type == STREAM) { + builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST); + } + builder.build(); + assertThat(consumeSync).completes(); + Message message = messageRef.get(); + assertThat(message).isDurable(true); + } finally { + connection.management().queueDelete(this.name); + } + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java index bf9d1b94b..de7616def 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java @@ -359,6 +359,13 @@ private MessageAssert hasField(String fieldLabel, Object value, Object expected) .isEqualTo(expected); return this; } + + void isDurable(boolean durable) { + isNotNull(); + if (actual.durable() != durable) { + fail("Message durable flag should be %s but is %s", durable, actual.durable()); + } + } } static class ConnectionAssert extends AbstractObjectAssert { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java index 017dda68d..9db064c3e 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java @@ -35,7 +35,8 @@ private TestConditions() {} public enum BrokerVersion { RABBITMQ_4_0_3("4.0.3"), - RABBITMQ_4_1_0("4.1.0"); + RABBITMQ_4_1_0("4.1.0"), + RABBITMQ_4_2_0("4.2.0"); final String value;