diff --git a/src/main/java/com/rabbitmq/client/amqp/Management.java b/src/main/java/com/rabbitmq/client/amqp/Management.java index b71b4e91e..2f93e4fc6 100644 --- a/src/main/java/com/rabbitmq/client/amqp/Management.java +++ b/src/main/java/com/rabbitmq/client/amqp/Management.java @@ -861,12 +861,20 @@ interface QueueInfo { String leader(); /** - * The nodes the queue has replicas (members) on. + * Deprecated, use {@link #members()} instead. * - * @return the nodes of the queue replicas (members) + * @return the nodes of the queue members */ + @Deprecated(forRemoval = true) List replicas(); + /** + * The nodes the queue has members on. + * + * @return the nodes of the queue members + */ + List members(); + /** * The number of messages in the queue. * diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java index f6424ef49..199f936a1 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java @@ -611,7 +611,7 @@ private static class DefaultQueueInfo implements QueueInfo { private final QueueType type; private final Map arguments; private final String leader; - private final List replicas; + private final List members; private final long messageCount; private final int consumerCount; @@ -624,11 +624,11 @@ private DefaultQueueInfo(Map response) { this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH)); this.arguments = Map.copyOf((Map) response.get("arguments")); this.leader = (String) response.get("leader"); - String[] replicas = (String[]) response.get("replicas"); - if (replicas == null || replicas.length == 0) { - this.replicas = Collections.emptyList(); + String[] members = (String[]) response.get("replicas"); + if (members == null || members.length == 0) { + this.members = Collections.emptyList(); } else { - this.replicas = List.of(replicas); + this.members = List.of(members); } this.messageCount = ((Number) response.get("message_count")).longValue(); this.consumerCount = ((Number) response.get("consumer_count")).intValue(); @@ -670,8 +670,14 @@ public String leader() { } @Override + @SuppressWarnings("removal") public List replicas() { - return this.replicas; + return this.members(); + } + + @Override + public List members() { + return this.members; } @Override @@ -704,7 +710,7 @@ public String toString() { + leader + '\'' + ", replicas=" - + replicas + + members + ", messageCount=" + messageCount + ", consumerCount=" diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java b/src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java index 32c239f61..01c9f477e 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java @@ -83,7 +83,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity( info.name(), info.type(), info.leader(), - info.replicas(), + info.members(), connectionName); if (nodesWithAffinity == null) { nodesWithAffinity = strategy.nodesWithAffinity(context, info); @@ -255,9 +255,9 @@ static class LeaderForPublishingMembersForConsumingStrategy public List nodesWithAffinity( ConnectionSettings.AffinityContext context, Management.QueueInfo info) { List nodesWithAffinity = - (info.replicas() == null || info.replicas().isEmpty()) + (info.members() == null || info.members().isEmpty()) ? Collections.emptyList() - : List.copyOf(info.replicas()); + : List.copyOf(info.members()); if (context.operation() == ConnectionSettings.Affinity.Operation.PUBLISH) { if (info.leader() != null && !info.leader().isBlank()) { nodesWithAffinity = List.of(info.leader()); @@ -277,7 +277,7 @@ public List nodesWithAffinity( ConnectionSettings.AffinityContext context, Management.QueueInfo info) { ConnectionSettings.Affinity.Operation operation = context.operation(); String leader = info.leader(); - List replicas = info.replicas() == null ? Collections.emptyList() : info.replicas(); + List replicas = info.members() == null ? Collections.emptyList() : info.members(); List nodesWithAffinity; LOGGER.debug( "Trying to find affinity {} with leader = {}, replicas = {}", context, leader, replicas); diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java index 940b59931..8391dc725 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java @@ -340,14 +340,14 @@ private static class TestQueueInfo implements Management.QueueInfo { private final String name, leader; private final Management.QueueType type; - private final List replicas; + private final List members; private TestQueueInfo( - String name, Management.QueueType type, String leader, List replicas) { + String name, Management.QueueType type, String leader, List members) { this.name = name; this.type = type; this.leader = leader; - this.replicas = replicas; + this.members = members; } @Override @@ -386,8 +386,14 @@ public String leader() { } @Override + @SuppressWarnings("removal") public List replicas() { - return this.replicas; + return this.members(); + } + + @Override + public List members() { + return this.members; } @Override @@ -408,12 +414,12 @@ public boolean equals(Object o) { return Objects.equals(name, that.name) && Objects.equals(leader, that.leader) && type == that.type - && Objects.equals(replicas, that.replicas); + && Objects.equals(members, that.members); } @Override public int hashCode() { - return Objects.hash(name, leader, type, replicas); + return Objects.hash(name, leader, type, members); } @Override @@ -428,7 +434,7 @@ public String toString() { + leader + '\'' + ", replicas=" - + replicas + + members + '}'; } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java index 1ec86c248..1ba0caa08 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java @@ -392,7 +392,7 @@ ConnectionAssert isOnLeader(Management.QueueInfo info) { ConnectionAssert isOnFollower(Management.QueueInfo info) { Assert.notNull(info, "Queue info cannot be null"); List followers = - info.replicas().stream() + info.members().stream() .filter(n -> !n.equals(info.leader())) .collect(Collectors.toList()); if (!followers.contains(actual.connectionNodename())) { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java index 88d26b8a5..f14c1922c 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java @@ -81,7 +81,7 @@ void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) { Management.QueueInfo info = connection.management().queueInfo(q); assertThat(publishConnection.connectionNodename()).isEqualTo(info.leader()); assertThat(consumeConnection.connectionNodename()) - .isIn(info.replicas()) + .isIn(info.members()) .isNotEqualTo(info.leader()); assertThat(Cli.listConnections()).hasSize(3); } finally { @@ -315,7 +315,7 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() { consumeSync.reset(); List initialFollowers = - queueInfo.replicas().stream().filter(n -> !n.equals(initialLeader)).collect(toList()); + queueInfo.members().stream().filter(n -> !n.equals(initialLeader)).collect(toList()); assertThat(initialFollowers).isNotEmpty(); Cli.pauseNode(initialLeader); @@ -498,10 +498,10 @@ String deleteStreamLeader() { String deleteLeader(Consumer deleteMemberOperation) { Management.QueueInfo info = queueInfo(); String initialLeader = info.leader(); - int initialReplicaCount = info.replicas().size(); + int initialReplicaCount = info.members().size(); deleteMemberOperation.accept(initialLeader); TestUtils.waitAtMost(() -> !initialLeader.equals(queueInfo().leader())); - assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1); + assertThat(queueInfo().members()).hasSize(initialReplicaCount - 1); return initialLeader; } @@ -527,9 +527,9 @@ void addStreamMember(String newMember) { void addMember(Runnable addMemberOperation) { Management.QueueInfo info = queueInfo(); - int initialReplicaCount = info.replicas().size(); + int initialReplicaCount = info.members().size(); addMemberOperation.run(); - TestUtils.waitAtMost(() -> queueInfo().replicas().size() == initialReplicaCount + 1); + TestUtils.waitAtMost(() -> queueInfo().members().size() == initialReplicaCount + 1); } Management.QueueInfo queueInfo() { diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java index b0bec96d3..59b3b8c12 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java @@ -199,11 +199,11 @@ void clusterRestart() { queueConfigurations.forEach( c -> { if (c.type == Management.QueueType.QUORUM || c.type == Management.QueueType.STREAM) { - assertThat(management.queueInfo(c.name).replicas()) + assertThat(management.queueInfo(c.name).members()) .hasSameSizeAs(nodes) .containsExactlyInAnyOrderElementsOf(nodes); } else { - assertThat(management.queueInfo(c.name).replicas()) + assertThat(management.queueInfo(c.name).members()) .hasSize(1) .containsAnyElementsOf(nodes); } @@ -240,7 +240,7 @@ void clusterRestart() { "Queue '%s': leader '%s', followers '%s'%n", q, queueInfo.leader(), - queueInfo.replicas().stream() + queueInfo.members().stream() .filter(n -> !n.equals(queueInfo.leader())) .collect(toList())); }); @@ -426,7 +426,7 @@ boolean isOnMember() { return this.connection .management() .queueInfo(this.queue) - .replicas() + .members() .contains(this.connection.connectionNodename()); }