From 2a821c27a119dc6f55ad03f16767ec16fc4d2546 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 17 Apr 2025 22:00:30 +0100 Subject: [PATCH 1/5] KAFKA-17897: Deprecate Admin.listConsumerGroups [2/N] --- .../kafka/clients/admin/KafkaAdminClient.java | 99 ++- .../clients/admin/ListGroupsOptions.java | 33 +- .../clients/admin/KafkaAdminClientTest.java | 573 ++++++++++++++---- .../clients/admin/ListGroupsOptionsTest.java | 113 ++++ .../scala/kafka/admin/ConfigCommand.scala | 4 +- .../consumer/group/ShareGroupCommand.java | 13 +- .../tools/streams/StreamsGroupCommand.java | 8 +- 7 files changed, 676 insertions(+), 167 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 844f6962160ff..6bafcc6306b93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) { for (final Node node : allNodes) { final long nowList = time.milliseconds(); runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) { + + // If only regular consumer group types are required, we can try an earlier request version if + // UnsupportedVersionException is thrown + final boolean canTryEarlierRequestVersion = options.regularConsumerGroupTypes(); + boolean tryUsingEarlierRequestVersion = false; + @Override ListGroupsRequest.Builder createRequest(int timeoutMs) { - List groupTypes = options.types() - .stream() - .map(GroupType::toString) - .collect(Collectors.toList()); - List groupStates = options.groupStates() - .stream() - .map(GroupState::toString) - .collect(Collectors.toList()); - return new ListGroupsRequest.Builder(new ListGroupsRequestData() - .setTypesFilter(groupTypes) - .setStatesFilter(groupStates) - ); + if (tryUsingEarlierRequestVersion) { + List groupStates = options.groupStates() + .stream() + .map(GroupState::toString) + .collect(Collectors.toList()); + return new ListGroupsRequest.Builder(new ListGroupsRequestData() + .setStatesFilter(groupStates) + ); + } else { + List groupTypes = options.types() + .stream() + .map(GroupType::toString) + .collect(Collectors.toList()); + List groupStates = options.groupStates() + .stream() + .map(GroupState::toString) + .collect(Collectors.toList()); + return new ListGroupsRequest.Builder(new ListGroupsRequestData() + .setTypesFilter(groupTypes) + .setStatesFilter(groupStates) + ); + } } private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) { - final String groupId = group.groupId(); - final Optional type; - if (group.groupType() == null || group.groupType().isEmpty()) { - type = Optional.empty(); - } else { - type = Optional.of(GroupType.parse(group.groupType())); - } - final String protocolType = group.protocolType(); - final Optional groupState; - if (group.groupState() == null || group.groupState().isEmpty()) { - groupState = Optional.empty(); - } else { - groupState = Optional.of(GroupState.parse(group.groupState())); + String protocolType = group.protocolType(); + if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) { + final String groupId = group.groupId(); + final Optional type; + if (group.groupType() == null || group.groupType().isEmpty()) { + type = Optional.empty(); + } else { + type = Optional.of(GroupType.parse(group.groupType())); + } + final Optional groupState; + if (group.groupState() == null || group.groupState().isEmpty()) { + groupState = Optional.empty(); + } else { + groupState = Optional.of(GroupState.parse(group.groupState())); + } + final GroupListing groupListing = new GroupListing( + groupId, + type, + protocolType, + groupState + ); + results.addListing(groupListing); } - final GroupListing groupListing = new GroupListing( - groupId, - type, - protocolType, - groupState - ); - results.addListing(groupListing); } @Override @@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) { } } + @Override + boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) { + // If we cannot try the earlier request version, give up + if (!canTryEarlierRequestVersion) { + return false; + } + + // If have already tried the earlier request version, give up + if (tryUsingEarlierRequestVersion) { + return false; + } + + // Have a try using the earlier request version + tryUsingEarlierRequestVersion = true; + return true; + } + @Override void handleFailure(Throwable throwable) { synchronized (results) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java index e5d70133186cd..b187b07ba673b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java @@ -36,16 +36,38 @@ public class ListGroupsOptions extends AbstractOptions { private Set types = Set.of(); private Set protocolTypes = Set.of(); + // Types filter is supported by brokers with version 4.0.0 or later. Older brokers only support + // classic groups, so listing consumer groups on an older broker does not need to use a types filter. + private boolean regularConsumerGroupTypes = false; + /** * Only consumer groups will be returned by listGroups(). * This operation sets filters on group type and protocol type which select consumer groups. */ public static ListGroupsOptions forConsumerGroups() { return new ListGroupsOptions() - .withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER)) + .withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER), true) .withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE)); } + /** + * Only share groups will be returned by listGroups(). + * This operation sets a filter on group type which select share groups. + */ + public static ListGroupsOptions forShareGroups() { + return new ListGroupsOptions() + .withTypes(Set.of(GroupType.SHARE)); + } + + /** + * Only streams groups will be returned by listGroups(). + * This operation sets a filter on group type which select streams groups. + */ + public static ListGroupsOptions forStreamsGroups() { + return new ListGroupsOptions() + .withTypes(Set.of(GroupType.STREAMS)); + } + /** * If groupStates is set, only groups in these states will be returned by listGroups(). * Otherwise, all groups are returned. @@ -66,7 +88,12 @@ public ListGroupsOptions withProtocolTypes(Set protocolTypes) { * Otherwise, all groups are returned. */ public ListGroupsOptions withTypes(Set types) { + return this.withTypes(types, false); + } + + ListGroupsOptions withTypes(Set types, boolean regularConsumerGroupTypes) { this.types = (types == null || types.isEmpty()) ? Set.of() : Set.copyOf(types); + this.regularConsumerGroupTypes = regularConsumerGroupTypes; return this; } @@ -90,4 +117,8 @@ public Set protocolTypes() { public Set types() { return types; } + + boolean regularConsumerGroupTypes() { + return regularConsumerGroupTypes; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 5d16bd0b9042f..97886f000ea0e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3268,7 +3268,6 @@ public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers } @Test - @SuppressWarnings("removal") public void testListConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), AdminClientConfig.RETRIES_CONFIG, "2")) { @@ -3276,100 +3275,100 @@ public void testListConsumerGroups() throws Exception { // Empty metadata response should be retried env.kafkaClient().prepareResponse( - RequestTestUtils.metadataResponse( - Collections.emptyList(), - env.cluster().clusterResource().clusterId(), - -1, - Collections.emptyList())); + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); env.kafkaClient().prepareResponse( - RequestTestUtils.metadataResponse( - env.cluster().nodes(), - env.cluster().clusterResource().clusterId(), - env.cluster().controller().id(), - Collections.emptyList())); + RequestTestUtils.metadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + List.of())); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-1") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-connect-1") - .setProtocolType("connector") - .setGroupState("Stable") - ))), - env.cluster().nodeById(0)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-1") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(0)); // handle retriable errors env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - .setGroups(Collections.emptyList()) - ), - env.cluster().nodeById(1)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setGroups(Collections.emptyList()) + ), + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) - .setGroups(Collections.emptyList()) - ), - env.cluster().nodeById(1)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setGroups(Collections.emptyList()) + ), + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-2") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-connect-2") - .setProtocolType("connector") - .setGroupState("Stable") - ))), - env.cluster().nodeById(1)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-2") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-3") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-connect-3") - .setProtocolType("connector") - .setGroupState("Stable") - ))), - env.cluster().nodeById(2)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-3") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-3") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(2)); // fatal error env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) - .setGroups(Collections.emptyList())), - env.cluster().nodeById(3)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setGroups(Collections.emptyList())), + env.cluster().nodeById(3)); - final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); - Collection listings = result.valid().get(); + Collection listings = result.valid().get(); assertEquals(3, listings.size()); Set groupIds = new HashSet<>(); - for (ConsumerGroupListing listing : listings) { + for (GroupListing listing : listings) { groupIds.add(listing.groupId()); - assertTrue(listing.state().isPresent()); + assertTrue(listing.groupState().isPresent()); } assertEquals(Set.of("group-1", "group-2", "group-3"), groupIds); @@ -3378,7 +3377,6 @@ public void testListConsumerGroups() throws Exception { } @Test - @SuppressWarnings("removal") public void testListConsumerGroupsMetadataFailure() throws Exception { final Cluster cluster = mockCluster(3, 0); final Time time = new MockTime(); @@ -3390,19 +3388,18 @@ public void testListConsumerGroupsMetadataFailure() throws Exception { // Empty metadata causes the request to fail since we have no list of brokers // to send the ListGroups requests to env.kafkaClient().prepareResponse( - RequestTestUtils.metadataResponse( - Collections.emptyList(), - env.cluster().clusterResource().clusterId(), - -1, - Collections.emptyList())); + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); - final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @Test - @SuppressWarnings("removal") public void testListConsumerGroupsWithStates() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -3412,14 +3409,351 @@ public void testListConsumerGroupsWithStates() throws Exception { env.kafkaClient().prepareResponseFrom( new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(asList( + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty")))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups(); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(2, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-2", Optional.empty(), "", Optional.of(GroupState.EMPTY))); + expected.add(new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListConsumerGroupsWithTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with a specific state filter but no type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CLASSIC.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(1, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + + // Test with list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options2 = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.CONSUMER)); + final ListGroupsResult result2 = env.adminClient().listGroups(options2); + Collection listings2 = result2.valid().get(); + + assertEquals(2, listings2.size()); + List expected2 = new ArrayList<>(); + expected2.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY))); + expected2.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected2, listings2); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV3 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV3))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check we can list groups with older broker if we don't specify states + // First attempt to build request will require v5 (type filter), but the broker only supports v3 + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && + !((ListGroupsRequest) request).data().typesFilter().isEmpty() + ); + // Second attempt to build request will use v3 (neither type nor state filter) + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), + env.cluster().nodeById(0)); + + ListGroupsOptions options = ListGroupsOptions.forConsumerGroups(); + ListGroupsResult result = env.adminClient().listGroups(options); + Collection listing = result.all().get(); + assertEquals(1, listing.size()); + List expected = List.of(new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty())); + assertEquals(expected, listing); + + // But we cannot set a state filter with older broker + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + // First attempt to build request will require v5 (type and state filter), but the broker only supports v3 + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && + !((ListGroupsRequest) request).data().typesFilter().isEmpty() && + !((ListGroupsRequest) request).data().statesFilter().isEmpty() + ); + // Second attempt to build request will require v4 (state filter only), but the broker only support v3 + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && + !((ListGroupsRequest) request).data().statesFilter().isEmpty() + ); + + options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); + result = env.adminClient().listGroups(options); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + } + } + + @Test + public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV4 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 4); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check if we can list groups with older broker if we specify states and don't specify types. + // First attempt to build request will require v5 (type and state filter), but the broker only supports v4 + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && + !((ListGroupsRequest) request).data().typesFilter().isEmpty() && + !((ListGroupsRequest) request).data().statesFilter().isEmpty() + ); + // Second attempt to build request will use v4 (state filter) + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(GroupState.STABLE.toString())))), + env.cluster().nodeById(0)); + + ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); + ListGroupsResult result = env.adminClient().listGroups(options); + + Collection listing = result.all().get(); + assertEquals(1, listing.size()); + List expected = List.of( + new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)) + ); + assertEquals(expected, listing); + + // Check that we cannot set a type filter with an older broker. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + // First attempt to build request will require v5 (type filter), but the broker only supports v4 + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() + ); + + options = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.CLASSIC)); + result = env.adminClient().listGroups(options); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + } + } + + @Test + @SuppressWarnings("removal") + public void testListConsumerGroupsDeprecated() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), + AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Empty metadata response should be retried + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); + + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + List.of())); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-1") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(0)); + + // handle retriable errors + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setGroups(List.of()) + ), + env.cluster().nodeById(1)); + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setGroups(List.of()) + ), + env.cluster().nodeById(1)); + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-2") - .setGroupState("Empty")))), + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-2") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(1)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-3") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-3") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(2)); + + // fatal error + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setGroups(List.of())), + env.cluster().nodeById(3)); + + final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + + Collection listings = result.valid().get(); + assertEquals(3, listings.size()); + + Set groupIds = new HashSet<>(); + for (ConsumerGroupListing listing : listings) { + groupIds.add(listing.groupId()); + assertTrue(listing.state().isPresent()); + } + + assertEquals(Set.of("group-1", "group-2", "group-3"), groupIds); + assertEquals(1, result.errors().get().size()); + } + } + + @Test + @SuppressWarnings("removal") + public void testListConsumerGroupsDeprecatedMetadataFailure() throws Exception { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Empty metadata causes the request to fail since we have no list of brokers + // to send the ListGroups requests to + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); + + final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); + TestUtils.assertFutureThrows(KafkaException.class, result.all()); + } + } + + @Test + @SuppressWarnings("removal") + public void testListConsumerGroupsDeprecatedWithStates() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty")))), env.cluster().nodeById(0)); final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions(); @@ -3437,7 +3771,7 @@ public void testListConsumerGroupsWithStates() throws Exception { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithTypes() throws Exception { + public void testListConsumerGroupsDeprecatedWithTypes() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -3445,10 +3779,10 @@ public void testListConsumerGroupsWithTypes() throws Exception { env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()), + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(singletonList( + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3456,7 +3790,7 @@ public void testListConsumerGroupsWithTypes() throws Exception { .setGroupType(GroupType.CLASSIC.toString())))), env.cluster().nodeById(0)); - final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); + final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listings = result.valid().get(); @@ -3470,10 +3804,10 @@ public void testListConsumerGroupsWithTypes() throws Exception { env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())), + expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString())), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(asList( + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3500,30 +3834,31 @@ public void testListConsumerGroupsWithTypes() throws Exception { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception { + public void testListConsumerGroupsDeprecatedWithStatesOlderBrokerVersion() throws Exception { ApiVersion listGroupV3 = new ApiVersion() - .setApiKey(ApiKeys.LIST_GROUPS.id) - .setMinVersion((short) 0) - .setMaxVersion((short) 3); + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3))); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV3))); env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); // Check we can list groups with older broker if we don't specify states env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse(new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(Collections.singletonList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-1") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), - env.cluster().nodeById(0)); + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), + env.cluster().nodeById(0)); + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions(); ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listing = result.all().get(); assertEquals(1, listing.size()); - List expected = Collections.singletonList(new ConsumerGroupListing("group-1", false)); + List expected = List.of(new ConsumerGroupListing("group-1", false)); assertEquals(expected, listing); // But we cannot set a state filter with older broker @@ -3531,7 +3866,7 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio env.kafkaClient().prepareUnsupportedVersionResponse( body -> body instanceof ListGroupsRequest); - options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); + options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); result = env.adminClient().listConsumerGroups(options); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } @@ -3539,34 +3874,34 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception { + public void testListConsumerGroupsDeprecatedWithTypesOlderBrokerVersion() throws Exception { ApiVersion listGroupV4 = new ApiVersion() .setApiKey(ApiKeys.LIST_GROUPS.id) .setMinVersion((short) 0) .setMaxVersion((short) 4); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4))); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4))); env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); // Check if we can list groups with older broker if we specify states and don't specify types. env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()), + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Collections.singletonList( + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setGroupState(GroupState.STABLE.toString())))), env.cluster().nodeById(0)); - ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listing = result.all().get(); assertEquals(1, listing.size()); - List expected = Collections.singletonList( + List expected = List.of( new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false) ); assertEquals(expected, listing); @@ -3577,7 +3912,7 @@ public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() ); - options = new ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); + options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.CLASSIC)); result = env.adminClient().listConsumerGroups(options); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } @@ -6087,7 +6422,7 @@ public void testListStreamsGroups() throws Exception { .setGroups(Collections.emptyList())), env.cluster().nodeById(3)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); @@ -6122,7 +6457,7 @@ public void testListStreamsGroupsMetadataFailure() throws Exception { -1, Collections.emptyList())); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -6150,7 +6485,7 @@ public void testListStreamsGroupsWithStates() throws Exception { .setGroupState("NotReady")))), env.cluster().nodeById(0)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); Collection listings = result.valid().get(); assertEquals(2, listings.size()); @@ -6181,7 +6516,7 @@ public void testListStreamsGroupsWithStatesOlderBrokerVersion() { new ListGroupsResponseData.ListedGroup() .setGroupId("streams-group-1")))), env.cluster().nodeById(0)); - ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -6497,7 +6832,7 @@ public void testListShareGroups() throws Exception { .setGroups(Collections.emptyList())), env.cluster().nodeById(3)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); @@ -6532,7 +6867,7 @@ public void testListShareGroupsMetadataFailure() throws Exception { -1, Collections.emptyList())); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -6560,7 +6895,7 @@ public void testListShareGroupsWithStates() throws Exception { .setGroupState("Empty")))), env.cluster().nodeById(0)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); Collection listings = result.valid().get(); assertEquals(2, listings.size()); @@ -6591,7 +6926,7 @@ public void testListShareGroupsWithStatesOlderBrokerVersion() { new ListGroupsResponseData.ListedGroup() .setGroupId("share-group-1")))), env.cluster().nodeById(0)); - ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java new file mode 100644 index 0000000000000..2a804730d2b70 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; +import org.junit.jupiter.api.Test; + +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ListGroupsOptionsTest { + @Test + public void testForConsumerGroups() { + ListGroupsOptions options = ListGroupsOptions.forConsumerGroups(); + assertTrue(options.groupStates().isEmpty()); + assertEquals(Set.of(GroupType.CONSUMER, GroupType.CLASSIC), options.types()); + assertEquals(Set.of("", ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + + options.inGroupStates(Set.of(GroupState.STABLE)); + options.withTypes(Set.of(GroupType.CONSUMER)); + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(GroupState.STABLE), options.groupStates()); + assertEquals(Set.of(GroupType.CONSUMER), options.types()); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + } + + @Test + public void testForShareGroups() { + ListGroupsOptions options = ListGroupsOptions.forShareGroups(); + assertTrue(options.groupStates().isEmpty()); + assertEquals(Set.of(GroupType.SHARE), options.types()); + assertTrue(options.protocolTypes().isEmpty()); + + options.inGroupStates(Set.of(GroupState.STABLE)); + options.withTypes(Set.of(GroupType.CONSUMER)); + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(GroupState.STABLE), options.groupStates()); + assertEquals(Set.of(GroupType.CONSUMER), options.types()); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + } + + @Test + public void testForStreamsGroups() { + ListGroupsOptions options = ListGroupsOptions.forStreamsGroups(); + assertTrue(options.groupStates().isEmpty()); + assertEquals(Set.of(GroupType.STREAMS), options.types()); + assertTrue(options.protocolTypes().isEmpty()); + + options.inGroupStates(Set.of(GroupState.STABLE)); + options.withTypes(Set.of(GroupType.CONSUMER)); + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(GroupState.STABLE), options.groupStates()); + assertEquals(Set.of(GroupType.CONSUMER), options.types()); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + } + + @Test + public void testGroupStates() { + ListGroupsOptions options = new ListGroupsOptions(); + assertTrue(options.groupStates().isEmpty()); + + options.inGroupStates(Set.of(GroupState.DEAD)); + assertEquals(Set.of(GroupState.DEAD), options.groupStates()); + + Set groupStates = Set.of(GroupState.values()); + options = new ListGroupsOptions().inGroupStates(groupStates); + assertEquals(groupStates, options.groupStates()); + } + + @Test + public void testProtocolTypes() { + ListGroupsOptions options = new ListGroupsOptions(); + assertTrue(options.protocolTypes().isEmpty()); + + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + + Set protocolTypes = Set.of("", "consumer", "share"); + options = new ListGroupsOptions().withProtocolTypes(protocolTypes); + assertEquals(protocolTypes, options.protocolTypes()); + } + + @Test + public void testTypes() { + ListGroupsOptions options = new ListGroupsOptions(); + assertTrue(options.types().isEmpty()); + + options.withTypes(Set.of(GroupType.CLASSIC)); + assertEquals(Set.of(GroupType.CLASSIC), options.types()); + + Set groupTypes = Set.of(GroupType.values()); + options = new ListGroupsOptions().withTypes(groupTypes); + assertEquals(groupTypes, options.types()); + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 034f99f868cc5..8931feb445ddb 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -24,7 +24,7 @@ import joptsimple._ import kafka.server.DynamicConfig import kafka.utils.Implicits._ import kafka.utils.Logging -import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListGroupsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} +import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic @@ -350,7 +350,7 @@ object ConfigCommand extends Logging { case ClientMetricsType => adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups(ListGroupsOptions.forConsumerGroups()).all.get.asScala.map(_.groupId).toSeq + adminClient.listGroups().all.get.asScala.map(_.groupId).toSeq case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index dcebff0d3ea62..63bb8a9fe0fc9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -148,9 +148,8 @@ public void listGroups() throws ExecutionException, InterruptedException { List listShareGroups() { try { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() - .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.SHARE))); + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Collection listings = result.all().get(); return listings.stream().map(GroupListing::groupId).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -160,9 +159,8 @@ List listShareGroups() { List listDetailedShareGroups() { try { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() - .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.SHARE))); + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Collection listings = result.all().get(); return listings.stream().toList(); } catch (InterruptedException | ExecutionException e) { @@ -171,9 +169,8 @@ List listDetailedShareGroups() { } List listShareGroupsInStates(Set states) throws ExecutionException, InterruptedException { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups() .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.SHARE)) .inGroupStates(states)); return new ArrayList<>(result.all().get()); } diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index bbf09c3f36a62..ddd69aa56757c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -136,9 +136,8 @@ public void listGroups() throws ExecutionException, InterruptedException { List listStreamsGroups() { try { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() - .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.STREAMS))); + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forStreamsGroups() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Collection listings = result.all().get(); return listings.stream().map(GroupListing::groupId).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -147,9 +146,8 @@ List listStreamsGroups() { } List listStreamsGroupsInStates(Set states) throws ExecutionException, InterruptedException { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forStreamsGroups() .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.STREAMS)) .inGroupStates(states)); return new ArrayList<>(result.all().get()); } From 5d03440afc68c5b5338112110f27aee744cf188d Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 17 Apr 2025 22:13:33 +0100 Subject: [PATCH 2/5] Missing comment --- .../org/apache/kafka/clients/admin/ListGroupsOptions.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java index b187b07ba673b..04b5bc25bce40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java @@ -78,6 +78,10 @@ public ListGroupsOptions inGroupStates(Set groupStates) { return this; } + /** + * If protocol types is set, only groups of these protocol types will be returned by listGroups(). + * Otherwise, all groups are returned. + */ public ListGroupsOptions withProtocolTypes(Set protocolTypes) { this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes); return this; From c3568c2859ef52b98401e1a854aacf4b01a0dad2 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 17 Apr 2025 22:23:31 +0100 Subject: [PATCH 3/5] Spotless --- .../org/apache/kafka/clients/admin/ListGroupsOptionsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java index 2a804730d2b70..360da83b8da57 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; + import org.junit.jupiter.api.Test; import java.util.Set; From 67b3d6927326a3523a20e935c6ac1e00e7545f85 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Fri, 18 Apr 2025 10:30:48 +0100 Subject: [PATCH 4/5] Add protocol type filter tests --- .../clients/admin/KafkaAdminClientTest.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 97886f000ea0e..7da0f2eed4553 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3190,6 +3190,42 @@ public void testListGroupsEmptyGroupType() throws Exception { } } + @Test + public void testListGroupsWithProtocolTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with list group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = new ListGroupsOptions().withProtocolTypes(Set.of("")); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listing = result.valid().get(); + + assertEquals(1, listing.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY))); + assertEquals(expected, listing); + assertEquals(0, result.errors().get().size()); + } + } + @Test public void testListGroupsWithTypes() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -3432,6 +3468,42 @@ public void testListConsumerGroupsWithStates() throws Exception { } } + @Test + public void testListConsumerGroupsWithProtocolTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with a specific protocol type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(1, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + } + } + @Test public void testListConsumerGroupsWithTypes() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { From 97a83fa9245b35ba8045868cca7a149127a411c6 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Fri, 18 Apr 2025 17:18:23 +0100 Subject: [PATCH 5/5] Restrict kafka-consumer-groups --list to valid consumer group types --- .../kafka/tools/consumer/group/ConsumerGroupCommand.java | 7 ++++--- .../kafka/tools/consumer/group/ListConsumerGroupTest.java | 4 ++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index b001ae7c6f7fd..92e654cdca492 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -177,10 +177,11 @@ static Set groupStatesFromString(String input) { @SuppressWarnings("Regexp") static Set consumerGroupTypesFromString(String input) { + Set validTypes = Set.of(GroupType.CLASSIC, GroupType.CONSUMER); Set parsedTypes = Stream.of(input.toLowerCase().split(",")).map(s -> GroupType.parse(s.trim())).collect(Collectors.toSet()); - if (parsedTypes.contains(GroupType.UNKNOWN)) { - List validTypes = Arrays.stream(GroupType.values()).filter(t -> t != GroupType.UNKNOWN).map(Object::toString).collect(Collectors.toList()); - throw new IllegalArgumentException("Invalid types list '" + input + "'. Valid types are: " + String.join(", ", validTypes)); + if (!validTypes.containsAll(parsedTypes)) { + throw new IllegalArgumentException("Invalid types list '" + input + "'. Valid types are: " + + String.join(", ", validTypes.stream().map(GroupType::toString).collect(Collectors.toSet()))); } return parsedTypes; } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java index dba06a3e098e3..fd50aa3b7d741 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java @@ -662,6 +662,10 @@ public void testConsumerGroupTypesFromString() { result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic"); Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupType.CONSUMER, GroupType.CLASSIC)), result); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("Share")); + + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("streams")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")); Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic"));