diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java index 968baa869..f6424ef49 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java @@ -82,7 +82,9 @@ class AmqpManagement implements Management { private final TopologyListener topologyListener; private final Supplier nameSupplier; private final AtomicReference state = new AtomicReference<>(CREATED); - private final AtomicBoolean initializing = new AtomicBoolean(false); + // private final AtomicBoolean initializing = new AtomicBoolean(false); + private volatile boolean initializing = false; + private final Lock initializationLock = new ReentrantLock(); private final Duration receiveLoopIdleTimeout; private final Lock instanceLock = new ReentrantLock(); @@ -170,7 +172,7 @@ public UnbindSpecification unbind() { @Override public void close() { - if (this.initializing.get()) { + if (this.initializing) { throw new AmqpException.AmqpResourceInvalidStateException( "Management is initializing, retry closing later."); } @@ -203,45 +205,53 @@ public void close() { void init() { if (this.state() != OPEN) { - if (this.initializing.compareAndSet(false, true)) { - LOGGER.debug("Initializing management ({}).", this); - this.state(UNAVAILABLE); + if (!this.initializing) { try { - if (this.receiveLoop != null) { - this.receiveLoop.cancel(true); - this.receiveLoop = null; + initializationLock.lock(); + if (!this.initializing) { + this.initializing = true; + LOGGER.debug("Initializing management ({}).", this); + this.state(UNAVAILABLE); + try { + if (this.receiveLoop != null) { + this.receiveLoop.cancel(true); + this.receiveLoop = null; + } + LOGGER.debug("Creating management session ({}).", this); + this.session = this.connection.nativeConnection().openSession(); + String linkPairName = "management-link-pair"; + Map properties = Collections.singletonMap("paired", Boolean.TRUE); + LOGGER.debug("Creating management sender ({}).", this); + this.sender = + session.openSender( + MANAGEMENT_NODE_ADDRESS, + new SenderOptions() + .deliveryMode(DeliveryMode.AT_MOST_ONCE) + .linkName(linkPairName) + .properties(properties)); + + LOGGER.debug("Creating management receiver ({}).", this); + this.receiver = + session.openReceiver( + MANAGEMENT_NODE_ADDRESS, + new ReceiverOptions() + .deliveryMode(DeliveryMode.AT_MOST_ONCE) + .linkName(linkPairName) + .properties(properties) + .creditWindow(100)); + + this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS); + LOGGER.debug("Management sender created ({}).", this); + this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS); + LOGGER.debug("Management receiver created ({}).", this); + this.state(OPEN); + this.initializing = false; + } catch (Exception e) { + throw new AmqpException(e); + } } - LOGGER.debug("Creating management session ({}).", this); - this.session = this.connection.nativeConnection().openSession(); - String linkPairName = "management-link-pair"; - Map properties = Collections.singletonMap("paired", Boolean.TRUE); - LOGGER.debug("Creating management sender ({}).", this); - this.sender = - session.openSender( - MANAGEMENT_NODE_ADDRESS, - new SenderOptions() - .deliveryMode(DeliveryMode.AT_MOST_ONCE) - .linkName(linkPairName) - .properties(properties)); - - LOGGER.debug("Creating management receiver ({}).", this); - this.receiver = - session.openReceiver( - MANAGEMENT_NODE_ADDRESS, - new ReceiverOptions() - .deliveryMode(DeliveryMode.AT_MOST_ONCE) - .linkName(linkPairName) - .properties(properties) - .creditWindow(100)); - - this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS); - LOGGER.debug("Management sender created ({}).", this); - this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS); - LOGGER.debug("Management receiver created ({}).", this); - this.state(OPEN); - this.initializing.set(false); - } catch (Exception e) { - throw new AmqpException(e); + } finally { + initializationLock.unlock(); } } }