-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17897: Deprecate Admin.listConsumerGroups [2/N] #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
2a821c2
5d03440
c3568c2
67b3d69
97a83fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) { | |
for (final Node node : allNodes) { | ||
final long nowList = time.milliseconds(); | ||
runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backward Compatibility Risk: The fallback mechanism in Consider expanding the fallback mechanism to work with all group type filters or clearly document this limitation. |
||
// If only regular consumer group types are required, we can try an earlier request version if | ||
// UnsupportedVersionException is thrown | ||
final boolean canTryEarlierRequestVersion = options.regularConsumerGroupTypes(); | ||
boolean tryUsingEarlierRequestVersion = false; | ||
|
||
@Override | ||
ListGroupsRequest.Builder createRequest(int timeoutMs) { | ||
List<String> groupTypes = options.types() | ||
.stream() | ||
.map(GroupType::toString) | ||
.collect(Collectors.toList()); | ||
List<String> groupStates = options.groupStates() | ||
.stream() | ||
.map(GroupState::toString) | ||
.collect(Collectors.toList()); | ||
return new ListGroupsRequest.Builder(new ListGroupsRequestData() | ||
.setTypesFilter(groupTypes) | ||
.setStatesFilter(groupStates) | ||
); | ||
if (tryUsingEarlierRequestVersion) { | ||
List<String> groupStates = options.groupStates() | ||
.stream() | ||
.map(GroupState::toString) | ||
.collect(Collectors.toList()); | ||
return new ListGroupsRequest.Builder(new ListGroupsRequestData() | ||
.setStatesFilter(groupStates) | ||
); | ||
} else { | ||
List<String> groupTypes = options.types() | ||
.stream() | ||
.map(GroupType::toString) | ||
.collect(Collectors.toList()); | ||
List<String> groupStates = options.groupStates() | ||
.stream() | ||
.map(GroupState::toString) | ||
.collect(Collectors.toList()); | ||
return new ListGroupsRequest.Builder(new ListGroupsRequestData() | ||
.setTypesFilter(groupTypes) | ||
.setStatesFilter(groupStates) | ||
); | ||
} | ||
} | ||
|
||
private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) { | ||
final String groupId = group.groupId(); | ||
final Optional<GroupType> type; | ||
if (group.groupType() == null || group.groupType().isEmpty()) { | ||
type = Optional.empty(); | ||
} else { | ||
type = Optional.of(GroupType.parse(group.groupType())); | ||
} | ||
final String protocolType = group.protocolType(); | ||
final Optional<GroupState> groupState; | ||
if (group.groupState() == null || group.groupState().isEmpty()) { | ||
groupState = Optional.empty(); | ||
} else { | ||
groupState = Optional.of(GroupState.parse(group.groupState())); | ||
String protocolType = group.protocolType(); | ||
if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) { | ||
final String groupId = group.groupId(); | ||
final Optional<GroupType> type; | ||
if (group.groupType() == null || group.groupType().isEmpty()) { | ||
type = Optional.empty(); | ||
} else { | ||
type = Optional.of(GroupType.parse(group.groupType())); | ||
} | ||
final Optional<GroupState> groupState; | ||
if (group.groupState() == null || group.groupState().isEmpty()) { | ||
groupState = Optional.empty(); | ||
} else { | ||
groupState = Optional.of(GroupState.parse(group.groupState())); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code Complexity: The Consider extracting the filtering logic into a separate method to improve readability and maintainability. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Performance Consideration: The Consider investigating if this filtering can be pushed to the server side for better efficiency with large result sets. |
||
final GroupListing groupListing = new GroupListing( | ||
groupId, | ||
type, | ||
protocolType, | ||
groupState | ||
); | ||
results.addListing(groupListing); | ||
} | ||
final GroupListing groupListing = new GroupListing( | ||
groupId, | ||
type, | ||
protocolType, | ||
groupState | ||
); | ||
results.addListing(groupListing); | ||
} | ||
|
||
@Override | ||
|
@@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) { | |
} | ||
} | ||
|
||
@Override | ||
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) { | ||
// If we cannot try the earlier request version, give up | ||
if (!canTryEarlierRequestVersion) { | ||
return false; | ||
} | ||
|
||
// If have already tried the earlier request version, give up | ||
if (tryUsingEarlierRequestVersion) { | ||
return false; | ||
} | ||
|
||
// Have a try using the earlier request version | ||
tryUsingEarlierRequestVersion = true; | ||
return true; | ||
} | ||
|
||
@Override | ||
void handleFailure(Throwable throwable) { | ||
synchronized (results) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security Consideration: The new filtering mechanism in
maybeAddGroup
method should ensure that authorization checks are performed before any filtering logic is applied. While the current implementation appears to be handling this correctly, it's important to verify that authorization checks remain in place throughout the call chain to prevent potential information disclosure about restricted groups.