diff --git a/src/main/java/com/rabbitmq/client/amqp/Message.java b/src/main/java/com/rabbitmq/client/amqp/Message.java
index 75ca29b49..3792a4f86 100644
--- a/src/main/java/com/rabbitmq/client/amqp/Message.java
+++ b/src/main/java/com/rabbitmq/client/amqp/Message.java
@@ -591,6 +591,19 @@ public interface Message {
*/
byte[] body();
+ /**
+ * Mark the message as durable or not.
+ *
+ *
Messages are durable by default, use false
to make them explicitly non-durable.
+ *
+ *
Durability depends also on the queue messages end up in (e.g. quorum queues and streams
+ * always store messages durably).
+ *
+ * @param durable true for a durable message, false for a non-durable message
+ * @return the message
+ */
+ Message durable(boolean durable);
+
/**
* Whether the message is durable.
*
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
index 988f7cab9..17c52f78b 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java
@@ -35,6 +35,8 @@ final class AmqpMessage implements Message {
private final org.apache.qpid.protonj2.client.Message delegate;
+ private boolean durableIsSet = false;
+
AmqpMessage() {
this(org.apache.qpid.protonj2.client.Message.create(EMPTY_BODY));
}
@@ -455,6 +457,14 @@ public byte[] body() {
}
// header section
+
+ @Override
+ public Message durable(boolean durable) {
+ this.durableIsSet = true;
+ callOnDelegate(m -> m.durable(durable));
+ return this;
+ }
+
@Override
public boolean durable() {
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::durable);
@@ -535,6 +545,13 @@ public MessageAddressBuilder replyToAddress() {
return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.REPLY_TO_CALLBACK);
}
+ AmqpMessage enforceDurability() throws ClientException {
+ if (!this.durableIsSet) {
+ this.delegate.durable(true);
+ }
+ return this;
+ }
+
private static class DefaultMessageAddressBuilder
extends DefaultAddressBuilder implements MessageAddressBuilder {
diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
index e8c72b202..91c717819 100644
--- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
+++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
@@ -86,9 +86,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
this.publishCall =
msg -> {
try {
- org.apache.qpid.protonj2.client.Message> nativeMessage =
- ((AmqpMessage) msg).nativeMessage();
- return this.sender.send(nativeMessage.durable(true));
+ return this.doSend((AmqpMessage) msg);
} catch (ClientIllegalStateException e) {
// the link is closed
LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage());
@@ -154,6 +152,11 @@ private Status mapDeliveryState(DeliveryState in) {
}
}
+ private Tracker doSend(AmqpMessage msg) throws ClientException {
+ msg.enforceDurability();
+ return this.sender.send(msg.nativeMessage());
+ }
+
private static MetricsCollector.PublishDisposition mapToPublishDisposition(Status status) {
if (status == Status.ACCEPTED) {
return MetricsCollector.PublishDisposition.ACCEPTED;
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java
index b054204a6..7e18266a4 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java
@@ -25,13 +25,32 @@ public class AmqpMessageTest {
@Test
void toShouldBePathEncoded() {
- assertThat(new AmqpMessage().toAddress().exchange("foo bar").message().to())
+ assertThat(msg().toAddress().exchange("foo bar").message().to())
.isEqualTo("/exchanges/foo%20bar");
}
@Test
void replyToShouldBePathEncoded() {
- assertThat(new AmqpMessage().replyToAddress().exchange("foo bar").message().replyTo())
+ assertThat(msg().replyToAddress().exchange("foo bar").message().replyTo())
.isEqualTo("/exchanges/foo%20bar");
}
+
+ @Test
+ void shouldBeNonDurableOnlyIfExplicitlySet() throws Exception {
+ AmqpMessage msg = msg();
+ // durable by default
+ assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
+ // non-durable explicitly set
+ msg = msg();
+ msg.durable(false);
+ assertThat(msg.enforceDurability().nativeMessage().durable()).isFalse();
+ // durable explicitly set
+ msg = msg();
+ msg.durable(true);
+ assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
+ }
+
+ private static AmqpMessage msg() {
+ return new AmqpMessage();
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
index 6e50c324f..55a18e669 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
@@ -23,6 +23,7 @@
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3;
+import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0;
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
import static com.rabbitmq.client.amqp.impl.Utils.threadFactory;
import static java.nio.charset.StandardCharsets.*;
@@ -36,6 +37,7 @@
import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Connection;
+import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.Message;
@@ -55,6 +57,7 @@
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
@AmqpTestInfrastructure
@@ -906,4 +909,76 @@ void messageAnnotationsSupportListMapArray() {
ints = (int[]) m.annotation("x-arrayInt");
org.assertj.core.api.Assertions.assertThat(ints).containsExactly(4, 5, 6);
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "CLASSIC,true",
+ "CLASSIC,false",
+ "QUORUM,true",
+ "QUORUM,false",
+ "STREAM,true",
+ "STREAM,false"
+ })
+ @BrokerVersionAtLeast(RABBITMQ_4_2_0)
+ void explicitDurabilityShouldBeEnforced(Management.QueueType type, boolean durable) {
+ try {
+ connection.management().queue(this.name).type(type).declare();
+ Publisher p = connection.publisherBuilder().queue(this.name).build();
+ p.publish(p.message().durable(durable), ctx -> {});
+
+ Sync consumeSync = sync();
+ AtomicReference messageRef = new AtomicReference<>();
+ ConsumerBuilder builder =
+ connection
+ .consumerBuilder()
+ .queue(this.name)
+ .messageHandler(
+ (context, message) -> {
+ messageRef.set(message);
+ context.accept();
+ consumeSync.down();
+ });
+ if (type == STREAM) {
+ builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
+ }
+ builder.build();
+ assertThat(consumeSync).completes();
+ Message message = messageRef.get();
+ assertThat(message).isDurable(durable);
+ } finally {
+ connection.management().queueDelete(this.name);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(Management.QueueType.class)
+ void durableByDefault(Management.QueueType type) {
+ try {
+ connection.management().queue(this.name).type(type).declare();
+ Publisher p = connection.publisherBuilder().queue(this.name).build();
+ p.publish(p.message(), ctx -> {});
+
+ Sync consumeSync = sync();
+ AtomicReference messageRef = new AtomicReference<>();
+ ConsumerBuilder builder =
+ connection
+ .consumerBuilder()
+ .queue(this.name)
+ .messageHandler(
+ (context, message) -> {
+ messageRef.set(message);
+ context.accept();
+ consumeSync.down();
+ });
+ if (type == STREAM) {
+ builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
+ }
+ builder.build();
+ assertThat(consumeSync).completes();
+ Message message = messageRef.get();
+ assertThat(message).isDurable(true);
+ } finally {
+ connection.management().queueDelete(this.name);
+ }
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
index bf9d1b94b..de7616def 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java
@@ -359,6 +359,13 @@ private MessageAssert hasField(String fieldLabel, Object value, Object expected)
.isEqualTo(expected);
return this;
}
+
+ void isDurable(boolean durable) {
+ isNotNull();
+ if (actual.durable() != durable) {
+ fail("Message durable flag should be %s but is %s", durable, actual.durable());
+ }
+ }
}
static class ConnectionAssert extends AbstractObjectAssert {
diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
index 017dda68d..9db064c3e 100644
--- a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
+++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
@@ -35,7 +35,8 @@ private TestConditions() {}
public enum BrokerVersion {
RABBITMQ_4_0_3("4.0.3"),
- RABBITMQ_4_1_0("4.1.0");
+ RABBITMQ_4_1_0("4.1.0"),
+ RABBITMQ_4_2_0("4.2.0");
final String value;