Skip to content

KAFKA-17897: Deprecate Admin.listConsumerGroups [2/N] #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) {
for (final Node node : allNodes) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Consideration: The new filtering mechanism in maybeAddGroup method should ensure that authorization checks are performed before any filtering logic is applied. While the current implementation appears to be handling this correctly, it's important to verify that authorization checks remain in place throughout the call chain to prevent potential information disclosure about restricted groups.

final long nowList = time.milliseconds();
runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) {

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backward Compatibility Risk: The fallback mechanism in listGroups for handling UnsupportedVersionException is only enabled when regularConsumerGroupTypes is true. This could lead to inconsistent behavior when working with older broker versions depending on the options used.

Consider expanding the fallback mechanism to work with all group type filters or clearly document this limitation.

// If only regular consumer group types are required, we can try an earlier request version if
// UnsupportedVersionException is thrown
final boolean canTryEarlierRequestVersion = options.regularConsumerGroupTypes();
boolean tryUsingEarlierRequestVersion = false;

@Override
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> groupTypes = options.types()
.stream()
.map(GroupType::toString)
.collect(Collectors.toList());
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setTypesFilter(groupTypes)
.setStatesFilter(groupStates)
);
if (tryUsingEarlierRequestVersion) {
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setStatesFilter(groupStates)
);
} else {
List<String> groupTypes = options.types()
.stream()
.map(GroupType::toString)
.collect(Collectors.toList());
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setTypesFilter(groupTypes)
.setStatesFilter(groupStates)
);
}
}

private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
}
final String protocolType = group.protocolType();
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
String protocolType = group.protocolType();
if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
}
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Complexity: The handleResponse method has become quite complex with nested conditionals and multiple responsibilities (parsing response, handling errors, filtering groups, etc).

Consider extracting the filtering logic into a separate method to improve readability and maintainability.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance Consideration: The maybeAddGroup method performs filtering of protocol types in the client rather than at the broker level, which could be less efficient when dealing with large numbers of groups.

Consider investigating if this filtering can be pushed to the server side for better efficiency with large result sets.

final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType,
groupState
);
results.addListing(groupListing);
}
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType,
groupState
);
results.addListing(groupListing);
}

@Override
Expand All @@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) {
}
}

@Override
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) {
// If we cannot try the earlier request version, give up
if (!canTryEarlierRequestVersion) {
return false;
}

// If have already tried the earlier request version, give up
if (tryUsingEarlierRequestVersion) {
return false;
}

// Have a try using the earlier request version
tryUsingEarlierRequestVersion = true;
return true;
}

@Override
void handleFailure(Throwable throwable) {
synchronized (results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,38 @@ public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
private Set<GroupType> types = Set.of();
private Set<String> protocolTypes = Set.of();

// Types filter is supported by brokers with version 4.0.0 or later. Older brokers only support
// classic groups, so listing consumer groups on an older broker does not need to use a types filter.
private boolean regularConsumerGroupTypes = false;

/**
* Only consumer groups will be returned by listGroups().
* This operation sets filters on group type and protocol type which select consumer groups.
*/
public static ListGroupsOptions forConsumerGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER))
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER), true)
.withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE));
}

/**
* Only share groups will be returned by listGroups().
* This operation sets a filter on group type which select share groups.
*/
public static ListGroupsOptions forShareGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.SHARE));
}

/**
* Only streams groups will be returned by listGroups().
* This operation sets a filter on group type which select streams groups.
*/
public static ListGroupsOptions forStreamsGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.STREAMS));
}

/**
* If groupStates is set, only groups in these states will be returned by listGroups().
* Otherwise, all groups are returned.
Expand All @@ -56,6 +78,10 @@ public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) {
return this;
}

/**
* If protocol types is set, only groups of these protocol types will be returned by listGroups().
* Otherwise, all groups are returned.
*/
public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes);
return this;
Expand All @@ -66,7 +92,12 @@ public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
* Otherwise, all groups are returned.
*/
public ListGroupsOptions withTypes(Set<GroupType> types) {
return this.withTypes(types, false);
}

ListGroupsOptions withTypes(Set<GroupType> types, boolean regularConsumerGroupTypes) {
this.types = (types == null || types.isEmpty()) ? Set.of() : Set.copyOf(types);
this.regularConsumerGroupTypes = regularConsumerGroupTypes;
return this;
}

Expand All @@ -90,4 +121,8 @@ public Set<String> protocolTypes() {
public Set<GroupType> types() {
return types;
}

boolean regularConsumerGroupTypes() {
return regularConsumerGroupTypes;
}
}
Loading