-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-14690; Add TopicId to OffsetCommit API #19461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 12 out of 23 changed files in this pull request and generated no comments.
Files not reviewed (11)
- clients/src/main/resources/common/message/OffsetCommitRequest.json: Language not supported
- clients/src/main/resources/common/message/OffsetCommitResponse.json: Language not supported
- core/src/main/scala/kafka/server/KafkaApis.scala: Language not supported
- core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala: Language not supported
- core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: Language not supported
Comments suppressed due to low confidence (2)
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java:425
- [nitpick] Consider using a fixed Uuid value in tests for version >= 10 instead of Uuid.randomUuid() to ensure determinism and avoid flaky test behavior.
.setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID)
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:244
- Add tests to verify that the builder correctly throws an exception when a null TopicId is provided, ensuring that the new validation logic is covered.
if (topicId == null) { throw new IllegalArgumentException("TopicId cannot be null."); }
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Leave some minor comment.
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dajac! Took a first pass. The approach makes sense to me
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/OffsetCommitRequest.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/OffsetCommitResponse.json
Outdated
Show resolved
Hide resolved
...-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM
"flexibleVersions": "8+", | ||
"latestVersionUnstable": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a Jira to mark this as stable later on? (I guess we'll wait to complete the broker side, and the client side, just in case we decide to adjust something along the way?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We can mark is as stable once we are confident on the client side. Opened https://issues.apache.org/jira/browse/KAFKA-19186.
@@ -727,7 +727,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { | |||
lastEpochSentOnCommit = Optional.empty(); | |||
} | |||
|
|||
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); | |||
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for the record, this I expect is temporary and will change to forTopicIdsOrNames once we include client support for the new RPC versions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're absolutely right.
This patch extends the OffsetCommit API to support topic ids. From
version 10 of the API, topic ids must be used. Originally, we wanted to
support both using topic ids and topic names from version 10 but it
turns out that it makes everything more complicated. Hence we propose to
only support topic ids from version 10. Clients which only support using
topic names can either lookup the topic ids using the Metadata API or
stay on using an earlier version.
The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.
Reviewers: Lianet Magrans [email protected], PoAn Yang
[email protected]