From 33e9fcd8e6f97e8e6fbc8a4365ac22adcd197632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 10 Mar 2025 16:54:59 +0100 Subject: [PATCH] Close consumer gracefully in RPC server References #165 --- .../client/amqp/RpcServerBuilder.java | 13 +++ .../client/amqp/impl/AmqpRpcServer.java | 27 ++++++ .../rabbitmq/client/amqp/impl/RpcSupport.java | 11 +++ .../rabbitmq/client/amqp/impl/RpcTest.java | 92 +++++++++++++++++++ 4 files changed, 143 insertions(+) diff --git a/src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java b/src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java index 65957c56f..20e7eba78 100644 --- a/src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java +++ b/src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java @@ -17,6 +17,7 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp; +import java.time.Duration; import java.util.function.BiFunction; import java.util.function.Function; @@ -62,6 +63,18 @@ public interface RpcServerBuilder { */ RpcServerBuilder replyPostProcessor(BiFunction replyPostProcessor); + /** + * The time the server waits for all outstanding requests to be processed before closing. + * + *

Default is 60 seconds. + * + *

Set the duration to {@link Duration#ZERO} to close immediately. + * + * @param closeTimeout close timeout + * @return this builder instance + */ + RpcServerBuilder closeTimeout(Duration closeTimeout); + /** * Create the configured instance. * diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java index 1cda9b6f8..6f239cd59 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java @@ -51,9 +51,11 @@ class AmqpRpcServer implements RpcServer { private final Function correlationIdExtractor; private final BiFunction replyPostProcessor; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Duration closeTimeout; AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder builder) { this.connection = builder.connection(); + this.closeTimeout = builder.closeTimeout(); Handler handler = builder.handler(); this.publisher = this.connection.publisherBuilder().build(); @@ -117,6 +119,15 @@ public void close() { if (this.closed.compareAndSet(false, true)) { this.connection.removeRpcServer(this); try { + this.maybeWaitForUnsettledMessages(); + } catch (Exception e) { + LOGGER.warn("Error while waiting for unsettled messages in RPC server: {}", e.getMessage()); + } + try { + long unsettledMessageCount = this.consumer.unsettledMessageCount(); + if (unsettledMessageCount > 0) { + LOGGER.info("Closing RPC server with {} unsettled message(s)", unsettledMessageCount); + } this.consumer.close(); } catch (Exception e) { LOGGER.warn("Error while closing RPC server consumer: {}", e.getMessage()); @@ -143,4 +154,20 @@ private void sendReply(Message reply) { LOGGER.info("Error while processing RPC request: {}", e.getMessage()); } } + + private void maybeWaitForUnsettledMessages() { + if (this.closeTimeout.toNanos() > 0) { + Duration waited = Duration.ZERO; + Duration waitStep = Duration.ofMillis(10); + while (this.consumer.unsettledMessageCount() > 0 && waited.compareTo(this.closeTimeout) < 0) { + try { + Thread.sleep(100); + waited = waited.plus(waitStep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java b/src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java index b20280665..1e9adc24d 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java @@ -149,6 +149,7 @@ static class AmqpRpcServerBuilder implements RpcServerBuilder { private RpcServer.Handler handler; private Function correlationIdExtractor; private BiFunction replyPostProcessor; + private Duration closeTimeout = Duration.ofSeconds(60); AmqpRpcServerBuilder(AmqpConnection connection) { this.connection = connection; @@ -180,6 +181,12 @@ public RpcServerBuilder replyPostProcessor( return this; } + @Override + public RpcServerBuilder closeTimeout(Duration closeTimeout) { + this.closeTimeout = closeTimeout; + return this; + } + @Override public RpcServer build() { return this.connection.createRpcServer(this); @@ -204,5 +211,9 @@ Function correlationIdExtractor() { BiFunction replyPostProcessor() { return this.replyPostProcessor; } + + Duration closeTimeout() { + return this.closeTimeout; + } } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java index 720ab6e39..5bcd308a2 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java @@ -476,6 +476,98 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info) } } + @Test + void rpcServerShouldWaitForAllOutstandingMessagesToBeProcessedBeforeClosingInternalConsumer() + throws ExecutionException, InterruptedException, TimeoutException { + try (Connection clientConnection = environment.connectionBuilder().build(); + Connection serverConnection = environment.connectionBuilder().build()) { + + String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); + + RpcClient rpcClient = + clientConnection + .rpcClientBuilder() + .requestAddress() + .queue(requestQueue) + .rpcClient() + .build(); + + Sync receivedSync = sync(); + RpcServer rpcServer = + serverConnection + .rpcServerBuilder() + .requestQueue(requestQueue) + .handler( + (ctx, request) -> { + receivedSync.down(); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return HANDLER.handle(ctx, request); + }) + .build(); + + String request = UUID.randomUUID().toString(); + CompletableFuture responseFuture = + rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + assertThat(receivedSync).completes(); + rpcServer.close(); + Message response = responseFuture.get(10, TimeUnit.SECONDS); + assertThat(response.body()).asString(UTF_8).isEqualTo(process(request)); + } + } + + @Test + void outstandingRequestShouldTimeOutWhenRpcServerDoesNotCloseConsumerGracefully() + throws ExecutionException, InterruptedException, TimeoutException { + try (Connection clientConnection = environment.connectionBuilder().build(); + Connection serverConnection = environment.connectionBuilder().build()) { + + String requestQueue = serverConnection.management().queue().exclusive(true).declare().name(); + + Duration requestTimeout = Duration.ofSeconds(1); + RpcClient rpcClient = + clientConnection + .rpcClientBuilder() + .requestTimeout(requestTimeout) + .requestAddress() + .queue(requestQueue) + .rpcClient() + .build(); + + Sync receivedSync = sync(); + RpcServer rpcServer = + serverConnection + .rpcServerBuilder() + .closeTimeout(Duration.ZERO) // close the consumer immediately + .requestQueue(requestQueue) + .handler( + (ctx, request) -> { + receivedSync.down(); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return HANDLER.handle(ctx, request); + }) + .build(); + + String request = UUID.randomUUID().toString(); + CompletableFuture responseFuture = + rpcClient.publish(rpcClient.message(request.getBytes(UTF_8))); + assertThat(receivedSync).completes(); + rpcServer.close(); + assertThatThrownBy( + () -> responseFuture.get(requestTimeout.multipliedBy(3).toMillis(), MILLISECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(AmqpException.class); + waitAtMost(() -> serverConnection.management().queueInfo(requestQueue).messageCount() == 1); + } + } + private static AmqpConnectionBuilder connectionBuilder() { return (AmqpConnectionBuilder) environment.connectionBuilder(); }