Skip to content

Deprecate QueueInfo#replicas() in favor of QueueInfo#members() #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/main/java/com/rabbitmq/client/amqp/Management.java
Original file line number Diff line number Diff line change
Expand Up @@ -861,12 +861,20 @@ interface QueueInfo {
String leader();

/**
* The nodes the queue has replicas (members) on.
* Deprecated, use {@link #members()} instead.
*
* @return the nodes of the queue replicas (members)
* @return the nodes of the queue members
*/
@Deprecated(forRemoval = true)
List<String> replicas();

/**
* The nodes the queue has members on.
*
* @return the nodes of the queue members
*/
List<String> members();

/**
* The number of messages in the queue.
*
Expand Down
20 changes: 13 additions & 7 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ private static class DefaultQueueInfo implements QueueInfo {
private final QueueType type;
private final Map<String, Object> arguments;
private final String leader;
private final List<String> replicas;
private final List<String> members;
private final long messageCount;
private final int consumerCount;

Expand All @@ -624,11 +624,11 @@ private DefaultQueueInfo(Map<String, Object> response) {
this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
this.arguments = Map.copyOf((Map<String, Object>) response.get("arguments"));
this.leader = (String) response.get("leader");
String[] replicas = (String[]) response.get("replicas");
if (replicas == null || replicas.length == 0) {
this.replicas = Collections.emptyList();
String[] members = (String[]) response.get("replicas");
if (members == null || members.length == 0) {
this.members = Collections.emptyList();
} else {
this.replicas = List.of(replicas);
this.members = List.of(members);
}
this.messageCount = ((Number) response.get("message_count")).longValue();
this.consumerCount = ((Number) response.get("consumer_count")).intValue();
Expand Down Expand Up @@ -670,8 +670,14 @@ public String leader() {
}

@Override
@SuppressWarnings("removal")
public List<String> replicas() {
return this.replicas;
return this.members();
}

@Override
public List<String> members() {
return this.members;
}

@Override
Expand Down Expand Up @@ -704,7 +710,7 @@ public String toString() {
+ leader
+ '\''
+ ", replicas="
+ replicas
+ members
+ ", messageCount="
+ messageCount
+ ", consumerCount="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
info.name(),
info.type(),
info.leader(),
info.replicas(),
info.members(),
connectionName);
if (nodesWithAffinity == null) {
nodesWithAffinity = strategy.nodesWithAffinity(context, info);
Expand Down Expand Up @@ -255,9 +255,9 @@ static class LeaderForPublishingMembersForConsumingStrategy
public List<String> nodesWithAffinity(
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
List<String> nodesWithAffinity =
(info.replicas() == null || info.replicas().isEmpty())
(info.members() == null || info.members().isEmpty())
? Collections.emptyList()
: List.copyOf(info.replicas());
: List.copyOf(info.members());
if (context.operation() == ConnectionSettings.Affinity.Operation.PUBLISH) {
if (info.leader() != null && !info.leader().isBlank()) {
nodesWithAffinity = List.of(info.leader());
Expand All @@ -277,7 +277,7 @@ public List<String> nodesWithAffinity(
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
ConnectionSettings.Affinity.Operation operation = context.operation();
String leader = info.leader();
List<String> replicas = info.replicas() == null ? Collections.emptyList() : info.replicas();
List<String> replicas = info.members() == null ? Collections.emptyList() : info.members();
List<String> nodesWithAffinity;
LOGGER.debug(
"Trying to find affinity {} with leader = {}, replicas = {}", context, leader, replicas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,14 @@ private static class TestQueueInfo implements Management.QueueInfo {

private final String name, leader;
private final Management.QueueType type;
private final List<String> replicas;
private final List<String> members;

private TestQueueInfo(
String name, Management.QueueType type, String leader, List<String> replicas) {
String name, Management.QueueType type, String leader, List<String> members) {
this.name = name;
this.type = type;
this.leader = leader;
this.replicas = replicas;
this.members = members;
}

@Override
Expand Down Expand Up @@ -386,8 +386,14 @@ public String leader() {
}

@Override
@SuppressWarnings("removal")
public List<String> replicas() {
return this.replicas;
return this.members();
}

@Override
public List<String> members() {
return this.members;
}

@Override
Expand All @@ -408,12 +414,12 @@ public boolean equals(Object o) {
return Objects.equals(name, that.name)
&& Objects.equals(leader, that.leader)
&& type == that.type
&& Objects.equals(replicas, that.replicas);
&& Objects.equals(members, that.members);
}

@Override
public int hashCode() {
return Objects.hash(name, leader, type, replicas);
return Objects.hash(name, leader, type, members);
}

@Override
Expand All @@ -428,7 +434,7 @@ public String toString() {
+ leader
+ '\''
+ ", replicas="
+ replicas
+ members
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ ConnectionAssert isOnLeader(Management.QueueInfo info) {
ConnectionAssert isOnFollower(Management.QueueInfo info) {
Assert.notNull(info, "Queue info cannot be null");
List<String> followers =
info.replicas().stream()
info.members().stream()
.filter(n -> !n.equals(info.leader()))
.collect(Collectors.toList());
if (!followers.contains(actual.connectionNodename())) {
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) {
Management.QueueInfo info = connection.management().queueInfo(q);
assertThat(publishConnection.connectionNodename()).isEqualTo(info.leader());
assertThat(consumeConnection.connectionNodename())
.isIn(info.replicas())
.isIn(info.members())
.isNotEqualTo(info.leader());
assertThat(Cli.listConnections()).hasSize(3);
} finally {
Expand Down Expand Up @@ -315,7 +315,7 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
consumeSync.reset();

List<String> initialFollowers =
queueInfo.replicas().stream().filter(n -> !n.equals(initialLeader)).collect(toList());
queueInfo.members().stream().filter(n -> !n.equals(initialLeader)).collect(toList());
assertThat(initialFollowers).isNotEmpty();

Cli.pauseNode(initialLeader);
Expand Down Expand Up @@ -498,10 +498,10 @@ String deleteStreamLeader() {
String deleteLeader(Consumer<String> deleteMemberOperation) {
Management.QueueInfo info = queueInfo();
String initialLeader = info.leader();
int initialReplicaCount = info.replicas().size();
int initialReplicaCount = info.members().size();
deleteMemberOperation.accept(initialLeader);
TestUtils.waitAtMost(() -> !initialLeader.equals(queueInfo().leader()));
assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1);
assertThat(queueInfo().members()).hasSize(initialReplicaCount - 1);
return initialLeader;
}

Expand All @@ -527,9 +527,9 @@ void addStreamMember(String newMember) {

void addMember(Runnable addMemberOperation) {
Management.QueueInfo info = queueInfo();
int initialReplicaCount = info.replicas().size();
int initialReplicaCount = info.members().size();
addMemberOperation.run();
TestUtils.waitAtMost(() -> queueInfo().replicas().size() == initialReplicaCount + 1);
TestUtils.waitAtMost(() -> queueInfo().members().size() == initialReplicaCount + 1);
}

Management.QueueInfo queueInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ void clusterRestart() {
queueConfigurations.forEach(
c -> {
if (c.type == Management.QueueType.QUORUM || c.type == Management.QueueType.STREAM) {
assertThat(management.queueInfo(c.name).replicas())
assertThat(management.queueInfo(c.name).members())
.hasSameSizeAs(nodes)
.containsExactlyInAnyOrderElementsOf(nodes);
} else {
assertThat(management.queueInfo(c.name).replicas())
assertThat(management.queueInfo(c.name).members())
.hasSize(1)
.containsAnyElementsOf(nodes);
}
Expand Down Expand Up @@ -240,7 +240,7 @@ void clusterRestart() {
"Queue '%s': leader '%s', followers '%s'%n",
q,
queueInfo.leader(),
queueInfo.replicas().stream()
queueInfo.members().stream()
.filter(n -> !n.equals(queueInfo.leader()))
.collect(toList()));
});
Expand Down Expand Up @@ -426,7 +426,7 @@ boolean isOnMember() {
return this.connection
.management()
.queueInfo(this.queue)
.replicas()
.members()
.contains(this.connection.connectionNodename());
}

Expand Down