From f2827915ede68a46caf24e97448e44c1495f069c Mon Sep 17 00:00:00 2001 From: David Jacot Date: Sun, 13 Apr 2025 16:48:42 +0200 Subject: [PATCH 1/7] KAFKA-14690; Add TopicId to OffsetCommit API --- .../common/requests/OffsetCommitRequest.java | 26 +- .../common/requests/OffsetCommitResponse.java | 128 +++++++-- .../common/message/OffsetCommitRequest.json | 9 +- .../common/message/OffsetCommitResponse.json | 9 +- .../requests/OffsetCommitRequestTest.java | 7 + .../main/scala/kafka/server/KafkaApis.scala | 93 ++++--- .../ConsumerProtocolMigrationTest.scala | 9 +- .../server/DeleteGroupsRequestTest.scala | 3 +- .../GroupCoordinatorBaseRequestTest.scala | 21 +- .../unit/kafka/server/KafkaApisTest.scala | 261 ++++++++++++++---- .../server/OffsetCommitRequestTest.scala | 25 +- .../server/OffsetDeleteRequestTest.scala | 3 +- .../kafka/server/OffsetFetchRequestTest.scala | 9 +- .../group/OffsetMetadataManager.java | 8 +- .../group/OffsetMetadataManagerTest.java | 69 +++++ 15 files changed, 532 insertions(+), 148 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 8f6ab39d1fce4..c11612e3f0a23 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -45,20 +45,35 @@ public static class Builder extends AbstractRequest.Builder private final OffsetCommitRequestData data; - public Builder(OffsetCommitRequestData data, boolean enableUnstableLastVersion) { - super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion); + public Builder( + OffsetCommitRequestData data, + boolean allowTopicIds, + boolean enableUnstableLastVersion + ) { + super( + ApiKeys.OFFSET_COMMIT, + ApiKeys.OFFSET_COMMIT.oldestVersion(), + allowTopicIds ? ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion) : 9 + ); this.data = data; } + public Builder( + OffsetCommitRequestData data, + boolean allowTopicIds + ) { + this(data, allowTopicIds, false); + } + public Builder(OffsetCommitRequestData data) { - this(data, false); + this(data, true); } @Override public OffsetCommitRequest build(short version) { if (data.groupInstanceId() != null && version < 7) { - throw new UnsupportedVersionException("The broker offset commit protocol version " + - version + " does not support usage of config group.instance.id."); + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does not support usage of config group.instance.id."); } return new OffsetCommitRequest(data, version); } @@ -97,6 +112,7 @@ public static OffsetCommitResponseData getErrorResponse( OffsetCommitResponseData response = new OffsetCommitResponseData(); request.topics().forEach(topic -> { OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic() + .setTopicId(topic.topicId()) .setName(topic.name()); response.topics().add(responseTopic); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 2b6d00b1a47f6..a4d740c06f908 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; @@ -123,43 +124,56 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } - public static class Builder { - OffsetCommitResponseData data = new OffsetCommitResponseData(); - HashMap byTopicName = new HashMap<>(); + public static boolean useTopicIds(short version) { + return version >= 10; + } - private OffsetCommitResponseTopic getOrCreateTopic( - String topicName - ) { - OffsetCommitResponseTopic topic = byTopicName.get(topicName); - if (topic == null) { - topic = new OffsetCommitResponseTopic().setName(topicName); - data.topics().add(topic); - byTopicName.put(topicName, topic); - } - return topic; + public static Builder newBuilder(boolean useTopicIds) { + if (useTopicIds) { + return new TopicIdBuilder(); + } else { + return new TopicNameBuilder(); } + } + + public abstract static class Builder { + protected OffsetCommitResponseData data = new OffsetCommitResponseData(); + + protected abstract void add( + OffsetCommitResponseTopic topic + ); + + protected abstract OffsetCommitResponseTopic get( + Uuid topicId, + String topicName + ); + + protected abstract OffsetCommitResponseTopic getOrCreate( + Uuid topicId, + String topicName + ); public Builder addPartition( + Uuid topicId, String topicName, int partitionIndex, Errors error ) { - final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); - + final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName); topicResponse.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partitionIndex) .setErrorCode(error.code())); - return this; } public

Builder addPartitions( + Uuid topicId, String topicName, List

partitions, Function partitionIndex, Errors error ) { - final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName); partitions.forEach(partition -> topicResponse.partitions().add(new OffsetCommitResponsePartition() .setPartitionIndex(partitionIndex.apply(partition)) @@ -177,11 +191,10 @@ public Builder merge( } else { // Otherwise, we have to merge them together. newData.topics().forEach(newTopic -> { - OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + OffsetCommitResponseTopic existingTopic = get(newTopic.topicId(), newTopic.name()); if (existingTopic == null) { // If no topic exists, we can directly copy the new topic data. - data.topics().add(newTopic); - byTopicName.put(newTopic.name(), newTopic); + add(newTopic); } else { // Otherwise, we add the partitions to the existing one. Note we // expect non-overlapping partitions here as we don't verify @@ -190,7 +203,6 @@ public Builder merge( } }); } - return this; } @@ -198,4 +210,78 @@ public OffsetCommitResponse build() { return new OffsetCommitResponse(data); } } + + public static class TopicIdBuilder extends Builder { + private final HashMap byTopicId = new HashMap<>(); + + @Override + protected void add(OffsetCommitResponseTopic topic) { + throwIfTopicIdIsNull(topic.topicId()); + data.topics().add(topic); + byTopicId.put(topic.topicId(), topic); + } + + @Override + protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) { + throwIfTopicIdIsNull(topicId); + return byTopicId.get(topicId); + } + + @Override + protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) { + throwIfTopicIdIsNull(topicId); + OffsetCommitResponseTopic topic = byTopicId.get(topicId); + if (topic == null) { + topic = new OffsetCommitResponseTopic() + .setName(topicName) + .setTopicId(topicId); + data.topics().add(topic); + byTopicId.put(topicId, topic); + } + return topic; + } + + private static void throwIfTopicIdIsNull(Uuid topicId) { + if (topicId == null) { + throw new IllegalArgumentException("TopicId cannot be null."); + } + } + } + + public static class TopicNameBuilder extends Builder { + private final HashMap byTopicName = new HashMap<>(); + + @Override + protected void add(OffsetCommitResponseTopic topic) { + throwIfTopicNameIsNull(topic.name()); + data.topics().add(topic); + byTopicName.put(topic.name(), topic); + } + + @Override + protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) { + throwIfTopicNameIsNull(topicName); + return byTopicName.get(topicName); + } + + @Override + protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) { + throwIfTopicNameIsNull(topicName); + OffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new OffsetCommitResponseTopic() + .setName(topicName) + .setTopicId(topicId); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + private void throwIfTopicNameIsNull(String topicName) { + if (topicName == null) { + throw new IllegalArgumentException("TopicName cannot be null."); + } + } + } } diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index 348ed2b90c5c8..c6d583fc49ccd 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -36,8 +36,11 @@ // // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The // request is the same as version 8. - "validVersions": "2-9", + // + // Version 10 adds support for topic ids (KIP-848). + "validVersions": "2-10", "flexibleVersions": "8+", + "latestVersionUnstable": true, "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, @@ -52,8 +55,10 @@ "about": "The time period in ms to retain the offset." }, { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+", "about": "The topics to commit offsets for.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, + "about": "The topic ID." }, { "name": "Partitions", "type": "[]OffsetCommitRequestPartition", "versions": "0+", "about": "Each partition to commit offsets for.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json index 0cccd64816c47..5450b1238774e 100644 --- a/clients/src/main/resources/common/message/OffsetCommitResponse.json +++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json @@ -34,7 +34,9 @@ // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is // the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and // GROUP_ID_NOT_FOUND when the group does not exist for both protocols. - "validVersions": "2-9", + // + // Version 10 adds support for topic ids (KIP-848). + "validVersions": "2-10", "flexibleVersions": "8+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -47,13 +49,16 @@ // - FENCED_MEMBER_EPOCH (version 7+) // - GROUP_ID_NOT_FOUND (version 9+) // - STALE_MEMBER_EPOCH (version 9+) + // - UNKNOWN_TOPIC_ID (version 10+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+", "about": "The responses for each topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, + "about": "The topic ID." }, { "name": "Partitions", "type": "[]OffsetCommitResponsePartition", "versions": "0+", "about": "The responses for each partition in the topic.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java index 161a4dd5f1192..1d768cf3d5a4a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; @@ -45,6 +46,8 @@ public class OffsetCommitRequestTest { protected static String groupId = "groupId"; protected static String memberId = "consumerId"; protected static String groupInstanceId = "groupInstanceId"; + protected static Uuid topicIdOne = Uuid.randomUuid(); + protected static Uuid topicIdTwo = Uuid.randomUuid(); protected static String topicOne = "topicOne"; protected static String topicTwo = "topicTwo"; protected static int partitionOne = 1; @@ -61,6 +64,7 @@ public class OffsetCommitRequestTest { public void setUp() { List topics = Arrays.asList( new OffsetCommitRequestTopic() + .setTopicId(topicIdOne) .setName(topicOne) .setPartitions(Collections.singletonList( new OffsetCommitRequestPartition() @@ -70,6 +74,7 @@ public void setUp() { .setCommittedMetadata(metadata) )), new OffsetCommitRequestTopic() + .setTopicId(topicIdTwo) .setName(topicTwo) .setPartitions(Collections.singletonList( new OffsetCommitRequestPartition() @@ -127,12 +132,14 @@ public void testGetErrorResponse() { OffsetCommitResponseData expectedResponse = new OffsetCommitResponseData() .setTopics(Arrays.asList( new OffsetCommitResponseTopic() + .setTopicId(topicIdOne) .setName(topicOne) .setPartitions(Collections.singletonList( new OffsetCommitResponsePartition() .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) .setPartitionIndex(partitionOne))), new OffsetCommitResponseTopic() + .setTopicId(topicIdTwo) .setName(topicTwo) .setPartitions(Collections.singletonList( new OffsetCommitResponsePartition() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 30f53f3d26be6..a97f6a9ae4943 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -271,11 +271,23 @@ class KafkaApis(val requestChannel: RequestChannel, ): CompletableFuture[Unit] = { val offsetCommitRequest = request.body[OffsetCommitRequest] - // Reject the request if not authorized to the group + // Reject the request if not authorized to the group. if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) { requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val useTopicIds = OffsetCommitResponse.useTopicIds(request.header.apiVersion) + + if (useTopicIds) { + offsetCommitRequest.data.topics.forEach { topic => + if (topic.topicId != Uuid.ZERO_UUID) { + metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) + } + } + } + + println(offsetCommitRequest.data) + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, @@ -283,28 +295,40 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.data.topics.asScala )(_.name) - val responseBuilder = new OffsetCommitResponse.Builder() + val responseBuilder = OffsetCommitResponse.newBuilder(useTopicIds) val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => - if (!authorizedTopics.contains(topic.name)) { + if (useTopicIds && topic.name.isEmpty) { + // If the topic name is undefined, it means that the topic id is unknown so we add + // the topic and all its partitions to the response with UNKNOWN_TOPIC_ID. + responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_ID) + } else if (!authorizedTopics.contains(topic.name)) { // If the topic is not authorized, we add the topic and all its partitions // to the response with TOPIC_AUTHORIZATION_FAILED. responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( - topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) } else if (!metadataCache.contains(topic.name)) { // If the topic is unknown, we add the topic and all its partitions // to the response with UNKNOWN_TOPIC_OR_PARTITION. responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition]( - topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + topic.topicId, topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } else { // Otherwise, we check all partitions to ensure that they all exist. - val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name) + val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(topic.topicId) + .setName(topic.name) topic.partitions.forEach { partition => - if (metadataCache.getLeaderAndIsr(topic.name, partition.partitionIndex).isPresent()) { + if (metadataCache.getLeaderAndIsr(topic.name, partition.partitionIndex).isPresent) { topicWithValidPartitions.partitions.add(partition) } else { - responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + responseBuilder.addPartition( + topic.topicId, + topic.name, + partition.partitionIndex, + Errors.UNKNOWN_TOPIC_OR_PARTITION + ) } } @@ -318,42 +342,23 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, responseBuilder.build()) CompletableFuture.completedFuture(()) } else { - // For version > 0, store offsets in Coordinator. - commitOffsetsToCoordinator( - request, - offsetCommitRequest, - authorizedTopicsRequest, - responseBuilder, - requestLocal - ) - } - } - } - - private def commitOffsetsToCoordinator( - request: RequestChannel.Request, - offsetCommitRequest: OffsetCommitRequest, - authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], - responseBuilder: OffsetCommitResponse.Builder, - requestLocal: RequestLocal - ): CompletableFuture[Unit] = { - val offsetCommitRequestData = new OffsetCommitRequestData() - .setGroupId(offsetCommitRequest.data.groupId) - .setMemberId(offsetCommitRequest.data.memberId) - .setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch) - .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) - .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) - .setTopics(authorizedTopicsRequest.asJava) - - groupCoordinator.commitOffsets( - request.context, - offsetCommitRequestData, - requestLocal.bufferSupplier - ).handle[Unit] { (results, exception) => - if (exception != null) { - requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) - } else { - requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) + groupCoordinator.commitOffsets( + request.context, + new OffsetCommitRequestData() + .setGroupId(offsetCommitRequest.data.groupId) + .setMemberId(offsetCommitRequest.data.memberId) + .setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch) + .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) + .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) + .setTopics(authorizedTopicsRequest.asJava), + requestLocal.bufferSupplier + ).handle[Unit] { (results, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, responseBuilder.merge(results).build()) + } + } } } } diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 37c81ce20e508..8f5f759250b61 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -690,7 +690,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord val topicName = "foo" // Create the topic. - createTopic( + val topicId = createTopic( topic = topicName, numPartitions = 3 ) @@ -702,6 +702,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord memberId = "member-id", memberEpoch = -1, topic = topicName, + topicId = topicId, partition = 0, offset = 1000L, expectedError = Errors.NONE, @@ -765,7 +766,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -865,6 +866,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord memberId = memberId1, memberEpoch = 1, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + 10 * version + partitionId, expectedError = Errors.NONE, @@ -1096,7 +1098,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -1164,6 +1166,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord memberId = memberId1, memberEpoch = 1, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + 10 * version + partitionId, expectedError = Errors.NONE, diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 88733a8657614..fe4501e640a31 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -48,7 +48,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -89,6 +89,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = Errors.GROUP_ID_NOT_FOUND, diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index dce9261f519b5..b40b3179c633b 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.{TopicCollection, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse @@ -75,7 +75,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { protected def createTopic( topic: String, numPartitions: Int - ): Unit = { + ): Uuid = { val admin = cluster.admin() try { TestUtils.createTopicWithAdmin( @@ -85,6 +85,12 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { topic = topic, numPartitions = numPartitions ) + admin + .describeTopics(TopicCollection.ofTopicNames(List(topic).asJava)) + .allTopicNames() + .get() + .get(topic) + .topicId() } finally { admin.close() } @@ -169,8 +175,13 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { partition: Int, offset: Long, expectedError: Errors, - version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled), + topicId: Uuid = Uuid.ZERO_UUID ): Unit = { + if (version >= 10 && topicId == Uuid.ZERO_UUID) { + throw new IllegalArgumentException(s"Cannot call OffsetCommit API version $version without a topic id") + } + val request = new OffsetCommitRequest.Builder( new OffsetCommitRequestData() .setGroupId(groupId) @@ -178,6 +189,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { .setGenerationIdOrMemberEpoch(memberEpoch) .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(topicId) .setName(topic) .setPartitions(List( new OffsetCommitRequestData.OffsetCommitRequestPartition() @@ -191,7 +203,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { val expectedResponse = new OffsetCommitResponseData() .setTopics(List( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName(topic) + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topic else "") .setPartitions(List( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(partition) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index e490be540e2c0..958a32dbefdba 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -994,27 +994,43 @@ class KafkaApisTest extends Logging { ) } - @Test - def testHandleOffsetCommitRequest(): Unit = { - addTopicToMetadataCache("foo", numPartitions = 1) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + def testHandleOffsetCommitRequest(version: Short): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1) val offsetCommitRequest = new OffsetCommitRequestData() .setGroupId("group") .setMemberId("member") .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(topicName) .setPartitions(List( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, true, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( requestChannelRequest.context, - offsetCommitRequest, + expectedOffsetCommitRequest, RequestLocal.noCaching.bufferSupplier )).thenReturn(future) kafkaApis = createKafkaApis() @@ -1027,7 +1043,8 @@ class KafkaApisTest extends Logging { val offsetCommitResponse = new OffsetCommitResponseData() .setTopics(List( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) @@ -1038,27 +1055,43 @@ class KafkaApisTest extends Logging { assertEquals(offsetCommitResponse, response.data) } - @Test - def testHandleOffsetCommitRequestFutureFailed(): Unit = { - addTopicToMetadataCache("foo", numPartitions = 1) + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + def testHandleOffsetCommitRequestFutureFailed(version: Short): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + addTopicToMetadataCache(topicName, topicId = topicId, numPartitions = 1) val offsetCommitRequest = new OffsetCommitRequestData() .setGroupId("group") .setMemberId("member") .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(topicName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, true, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( requestChannelRequest.context, - offsetCommitRequest, + expectedOffsetCommitRequest, RequestLocal.noCaching.bufferSupplier )).thenReturn(future) @@ -1071,7 +1104,8 @@ class KafkaApisTest extends Logging { val expectedOffsetCommitResponse = new OffsetCommitResponseData() .setTopics(List( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName("foo") + .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID) + .setName(if (version < 10) topicName else "") .setPartitions(List( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) @@ -1082,6 +1116,161 @@ class KafkaApisTest extends Logging { assertEquals(expectedOffsetCommitResponse, response.data) } + @Test + def testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit = { + val fooId = Uuid.randomUuid() + val barId = Uuid.randomUuid() + val zarId = Uuid.randomUuid() + val fooName = "foo" + val barName = "bar" + addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2) + addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2) + + val offsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(fooId) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(2) + .setCommittedOffset(30)).asJava), + // bar exists. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(barId) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava), + // zar does not exist. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(zarId) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(60), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(70)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, true, true).build()) + + // This is the request expected by the group coordinator. + val expectedOffsetCommitRequest = new OffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(fooId) + .setName(fooName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20)).asJava), + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(barId) + .setName(barName) + .setPartitions(List( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava)).asJava) + + val future = new CompletableFuture[OffsetCommitResponseData]() + when(groupCoordinator.commitOffsets( + requestChannelRequest.context, + expectedOffsetCommitRequest, + RequestLocal.noCaching.bufferSupplier + )).thenReturn(future) + kafkaApis = createKafkaApis() + kafkaApis.handle( + requestChannelRequest, + RequestLocal.noCaching + ) + + // This is the response returned by the group coordinator. + val offsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(fooId) + .setName(fooName) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(barId) + .setName(barName) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + val expectedOffsetCommitResponse = new OffsetCommitResponseData() + .setTopics(List( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(fooId) + .setPartitions(List( + // foo-2 is first because partitions failing the validation + // are put in the response first. + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(2) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + // zar is before bar because topics failing the validation are + // put in the response first. + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(zarId) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)).asJava), + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(barId) + .setPartitions(List( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(offsetCommitResponse) + val response = verifyNoThrottling[OffsetCommitResponse](requestChannelRequest) + assertEquals(expectedOffsetCommitResponse, response.data) + } + @Test def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { addTopicToMetadataCache("foo", numPartitions = 2) @@ -1125,7 +1314,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(1) .setCommittedOffset(70)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build()) + val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, false).build()) // This is the request expected by the group coordinator. val expectedOffsetCommitRequest = new OffsetCommitRequestData() @@ -1228,48 +1417,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedOffsetCommitResponse, response.data) } - @Test - def testOffsetCommitWithInvalidPartition(): Unit = { - val topic = "topic" - addTopicToMetadataCache(topic, numPartitions = 1) - - def checkInvalidPartition(invalidPartitionId: Int): Unit = { - reset(replicaManager, clientRequestQuotaManager, requestChannel) - - val offsetCommitRequest = new OffsetCommitRequest.Builder( - new OffsetCommitRequestData() - .setGroupId("groupId") - .setTopics(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName(topic) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestPartition() - .setPartitionIndex(invalidPartitionId) - .setCommittedOffset(15) - .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) - .setCommittedMetadata("")) - ) - ))).build() - - val request = buildRequest(offsetCommitRequest) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - any[Long])).thenReturn(0) - val kafkaApis = createKafkaApis() - try { - kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[OffsetCommitResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, - Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode)) - } finally { - kafkaApis.close() - } - } - - checkInvalidPartition(-1) - checkInvalidPartition(1) // topic has only one partition - } - @Test def testTxnOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic" diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index f289c241d1b4b..eceb21a407787 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -16,6 +16,7 @@ */ package kafka.server +import org.apache.kafka.common.Uuid import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.test.ClusterInstance @@ -46,7 +47,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -55,7 +56,6 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator // a session long enough for the duration of the test. val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol) - // Start from version 1 because version 0 goes to ZK. for (version <- ApiKeys.OFFSET_COMMIT.oldestVersion to ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { // Commit offset. commitOffset( @@ -63,6 +63,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE, @@ -75,6 +76,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = @@ -89,6 +91,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = @@ -103,6 +106,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = "", memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = Errors.UNKNOWN_MEMBER_ID, @@ -115,6 +119,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch + 1, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = @@ -131,11 +136,27 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = "", memberEpoch = -1, topic = "foo", + topicId = topicId, partition = 0, offset = 100L, expectedError = Errors.NONE, version = version.toShort ) + + // Commit offset to a group with an unknown topic id. + if (version >= 10) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "bar", + topicId = Uuid.randomUuid(), + partition = 0, + offset = 100L, + expectedError = Errors.UNKNOWN_TOPIC_ID, + version = version.toShort + ) + } } } } diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala index c9201b24e9870..0fc414e24c99e 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala @@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -65,6 +65,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index b49de57793172..be95cef7844f9 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -71,7 +71,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -87,6 +87,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, @@ -239,7 +240,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -255,6 +256,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, @@ -348,7 +350,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB createOffsetsTopic() // Create the topic. - createTopic( + val topicId = createTopic( topic = "foo", numPartitions = 3 ) @@ -365,6 +367,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB memberId = memberId, memberEpoch = memberEpoch, topic = "foo", + topicId = topicId, partition = partitionId, offset = 100L + partitionId, expectedError = Errors.NONE, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 9f601a74917f5..affc5bc8ee4c6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -461,7 +461,9 @@ public CoordinatorResult commitOffs final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); request.topics().forEach(topic -> { - final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name()); + final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic() + .setTopicId(topic.topicId()) + .setName(topic.name()); response.topics().add(topicResponse); topic.partitions().forEach(partition -> { @@ -470,8 +472,8 @@ public CoordinatorResult commitOffs .setPartitionIndex(partition.partitionIndex()) .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); } else { - log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", - request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + log.debug("[GroupId {}] Committing offsets {} for partition {}-{}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.topicId(), topic.name(), partition.partitionIndex(), request.memberId(), partition.committedLeaderEpoch()); topicResponse.partitions().add(new OffsetCommitResponsePartition() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 6f788d84fd009..382b2a9b0e571 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -1308,6 +1308,75 @@ public void testConsumerGroupOffsetCommit() { ); } + @Test + public void testConsumerGroupOffsetCommitWithTopicIds() { + Uuid topicId = Uuid.randomUuid(); + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( + "foo", + true + ); + + // Add member. + group.updateMember(new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build() + ); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(List.of( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setTopicId(topicId) + .setName("bar") + .setPartitions(List.of( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + .setCommittedLeaderEpoch(10) + .setCommittedMetadata("metadata") + )) + )) + ); + + assertEquals( + new OffsetCommitResponseData() + .setTopics(List.of( + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setTopicId(topicId) + .setName("bar") + .setPartitions(List.of( + new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + )) + )), + result.response() + ); + + assertEquals( + List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord( + "foo", + "bar", + 0, + new OffsetAndMetadata( + 100L, + OptionalInt.of(10), + "metadata", + context.time.milliseconds(), + OptionalLong.empty() + ) + )), + result.records() + ); + } + @Test public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() From 08e6d6d541938862c2a5b6d3ee54f02240fadf82 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 14 Apr 2025 11:17:28 +0200 Subject: [PATCH 2/7] cleanup --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a97f6a9ae4943..a278810fb4128 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -286,8 +286,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - println(offsetCommitRequest.data) - val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, From 564bee160ca0c0ce92cf8591e2b16186742386d2 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 14 Apr 2025 13:48:31 +0200 Subject: [PATCH 3/7] refactor --- .../AlterConsumerGroupOffsetsHandler.java | 2 +- .../internals/CommitRequestManager.java | 2 +- .../internals/ConsumerCoordinator.java | 2 +- .../common/requests/OffsetCommitRequest.java | 23 +++++-------------- .../internals/ConsumerCoordinatorTest.java | 2 +- .../requests/OffsetCommitRequestTest.java | 4 ++-- .../common/requests/RequestResponseTest.java | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../GroupCoordinatorBaseRequestTest.scala | 6 ++--- .../unit/kafka/server/KafkaApisTest.scala | 8 +++---- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- 11 files changed, 22 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index 5ef72f327d637..99111a70d4bae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -108,7 +108,7 @@ public OffsetCommitRequest.Builder buildBatchedRequest( .setGroupId(groupId.idValue) .setTopics(new ArrayList<>(offsetData.values())); - return new OffsetCommitRequest.Builder(data); + return OffsetCommitRequest.Builder.forTopicNames(data); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 284707a812b53..62d1fe3a86638 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -727,7 +727,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { lastEpochSentOnCommit = Optional.empty(); } - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data); return buildRequestWithResponseHandling(builder); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 01fc605ea7982..1cba10ef15d97 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1327,7 +1327,7 @@ RequestFuture sendOffsetCommitRequest(final Map private final OffsetCommitRequestData data; - public Builder( - OffsetCommitRequestData data, - boolean allowTopicIds, - boolean enableUnstableLastVersion - ) { - super( - ApiKeys.OFFSET_COMMIT, - ApiKeys.OFFSET_COMMIT.oldestVersion(), - allowTopicIds ? ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion) : 9 - ); + private Builder(OffsetCommitRequestData data, short oldestAllowedVersion, short latestAllowedVersion) { + super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion, latestAllowedVersion); this.data = data; } - public Builder( - OffsetCommitRequestData data, - boolean allowTopicIds - ) { - this(data, allowTopicIds, false); + public static Builder forTopicIdsAndNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) { + return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion)); } - public Builder(OffsetCommitRequestData data) { - this(data, true); + public static Builder forTopicNames(OffsetCommitRequestData data) { + return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), (short) 9); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 5c9e06ff90d62..a34f2f1633778 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -682,7 +682,7 @@ public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() { ) ); - consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(offsetCommitRequestData)) + consumerClient.send(coordinator.checkAndGetCoordinator(), OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequestData)) .compose(new RequestFutureAdapter() { @Override public void onSuccess(ClientResponse value, RequestFuture future) {} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java index 1d768cf3d5a4a..9cd95cfec769e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java @@ -95,7 +95,7 @@ public void testConstructor() { expectedOffsets.put(new TopicPartition(topicOne, partitionOne), offset); expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), offset); - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data); for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { OffsetCommitRequest request = builder.build(version); @@ -110,7 +110,7 @@ public void testConstructor() { @Test public void testVersionSupportForGroupInstanceId() { - OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( + OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId(groupId) .setMemberId(memberId) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 005a401c95dfc..d3753fecc84cf 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2355,7 +2355,7 @@ private MetadataResponse createMetadataResponse() { } private OffsetCommitRequest createOffsetCommitRequest(short version) { - return new OffsetCommitRequest.Builder(new OffsetCommitRequestData() + return OffsetCommitRequest.Builder.forTopicNames(new OffsetCommitRequestData() .setGroupId("group1") .setMemberId("consumer1") .setGroupInstanceId(null) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0f465f098ad67..4823df3aba26e 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -371,7 +371,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } private def createOffsetCommitRequest = { - new requests.OffsetCommitRequest.Builder( + requests.OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId(group) .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index b40b3179c633b..971988b4ccea9 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -172,17 +172,17 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { memberId: String, memberEpoch: Int, topic: String, + topicId: Uuid, partition: Int, offset: Long, expectedError: Errors, - version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled), - topicId: Uuid = Uuid.ZERO_UUID + version: Short = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) ): Unit = { if (version >= 10 && topicId == Uuid.ZERO_UUID) { throw new IllegalArgumentException(s"Cannot call OffsetCommit API version $version without a topic id") } - val request = new OffsetCommitRequest.Builder( + val request = OffsetCommitRequest.Builder.forTopicIdsAndNames( new OffsetCommitRequestData() .setGroupId(groupId) .setMemberId(memberId) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 958a32dbefdba..70b2f56cdd259 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1025,7 +1025,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, true, true).build(version)) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsAndNames(offsetCommitRequest, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( @@ -1086,7 +1086,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, true, true).build(version)) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsAndNames(offsetCommitRequest, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( @@ -1164,7 +1164,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(1) .setCommittedOffset(70)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, true, true).build()) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsAndNames(offsetCommitRequest, true).build()) // This is the request expected by the group coordinator. val expectedOffsetCommitRequest = new OffsetCommitRequestData() @@ -1314,7 +1314,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(1) .setCommittedOffset(70)).asJava)).asJava) - val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest, false).build()) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build()) // This is the request expected by the group coordinator. val expectedOffsetCommitRequest = new OffsetCommitRequestData() diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 6f16f1b7a73ff..c86b1fd0f4efd 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest { .setTargetTimes(List(topic).asJava) case ApiKeys.OFFSET_COMMIT => - new OffsetCommitRequest.Builder( + OffsetCommitRequest.Builder.forTopicNames( new OffsetCommitRequestData() .setGroupId("test-group") .setGenerationIdOrMemberEpoch(1) From baa5620a21be3cb5464b2d75d917e3e7e93cb2ba Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 14 Apr 2025 20:41:42 +0200 Subject: [PATCH 4/7] fix tests --- .../kafka/common/message/MessageTest.java | 125 ++++++------------ 1 file changed, 43 insertions(+), 82 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index b28b8274f581f..0aa6067b2a811 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -59,8 +59,10 @@ import com.fasterxml.jackson.databind.JsonNode; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; import java.lang.reflect.Method; import java.nio.ByteBuffer; @@ -409,90 +411,49 @@ public void testOffsetForLeaderEpochVersions() throws Exception { new OffsetForLeaderEpochRequestData().setReplicaId(-2)); } - @Test - public void testOffsetCommitRequestVersions() throws Exception { - String groupId = "groupId"; - String topicName = "topic"; - String metadata = "metadata"; - int partition = 2; - int offset = 100; - - testAllMessageRoundTrips(new OffsetCommitRequestData() - .setGroupId(groupId) - .setTopics(Collections.singletonList( - new OffsetCommitRequestTopic() - .setName(topicName) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partition) - .setCommittedMetadata(metadata) - .setCommittedOffset(offset) - ))))); - - Supplier request = - () -> new OffsetCommitRequestData() - .setGroupId(groupId) - .setMemberId("memberId") - .setGroupInstanceId("instanceId") - .setTopics(Collections.singletonList( - new OffsetCommitRequestTopic() - .setName(topicName) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partition) - .setCommittedLeaderEpoch(10) - .setCommittedMetadata(metadata) - .setCommittedOffset(offset) - )))) - .setRetentionTimeMs(20); - - for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { - OffsetCommitRequestData requestData = request.get(); - - if (version > 4) { - requestData.setRetentionTimeMs(-1); - } - - if (version < 6) { - requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1); - } - - if (version < 7) { - requestData.setGroupInstanceId(null); - } - - if (version >= 2 && version <= 4) { - testAllMessageRoundTripsBetweenVersions(version, (short) 5, requestData, requestData); - } else { - testAllMessageRoundTripsFromVersion(version, requestData); - } - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testOffsetCommitRequestVersions(short version) throws Exception { + OffsetCommitRequestData request = new OffsetCommitRequestData() + .setGroupId("groupId") + .setMemberId(version >= 1 ? "memberId" : "") + .setGenerationIdOrMemberEpoch(version >= 1 ? 10 : -1) + .setGroupInstanceId(version >= 7 ? "instanceId" : null) + .setRetentionTimeMs((version >= 2 && version <= 4) ? 20 : -1) + .setTopics(singletonList( + new OffsetCommitRequestTopic() + .setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 10 ? "topic" : "") + .setPartitions(singletonList( + new OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedMetadata("metadata") + .setCommittedOffset(100) + .setCommittedLeaderEpoch(version >= 6 ? 10 : -1) + + )) + )); + + testMessageRoundTrip(version, request, request); } - @Test - public void testOffsetCommitResponseVersions() throws Exception { - Supplier response = - () -> new OffsetCommitResponseData() - .setTopics( - singletonList( - new OffsetCommitResponseTopic() - .setName("topic") - .setPartitions(singletonList( - new OffsetCommitResponsePartition() - .setPartitionIndex(1) - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) - )) - ) - ) - .setThrottleTimeMs(20); - - for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { - OffsetCommitResponseData responseData = response.get(); - if (version < 3) { - responseData.setThrottleTimeMs(0); - } - testAllMessageRoundTripsFromVersion(version, responseData); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testOffsetCommitResponseVersions(short version) throws Exception { + OffsetCommitResponseData response = new OffsetCommitResponseData() + .setThrottleTimeMs(version >= 3 ? 20 : 0) + .setTopics(singletonList( + new OffsetCommitResponseTopic() + .setTopicId(version >= 10 ? Uuid.randomUuid() : Uuid.ZERO_UUID) + .setName(version < 10 ? "topic" : "") + .setPartitions(singletonList( + new OffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + )) + )); + + testMessageRoundTrip(version, response, response); } @Test From cacdc65b75461bcf7905a9cf3c7e042ee6e6e2a3 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 14 Apr 2025 20:48:10 +0200 Subject: [PATCH 5/7] fix imports --- .../test/java/org/apache/kafka/common/message/MessageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 0aa6067b2a811..ce5401dc501e7 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -56,10 +56,10 @@ import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.types.RawTaggedField; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; From 57f90772464f5f4c9c1ce35a4b1d0144fb9ff16a Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 22 Apr 2025 18:19:44 +0200 Subject: [PATCH 6/7] address comments + refactor --- .../common/requests/OffsetCommitRequest.java | 18 +++++++++++++++++- .../common/message/OffsetCommitRequest.json | 2 +- .../common/message/OffsetCommitResponse.json | 2 +- .../kafka/common/message/MessageTest.java | 2 +- .../GroupCoordinatorBaseRequestTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 6 +++--- 6 files changed, 24 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 72080eeb0060d..1bd9c41f66834 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; @@ -50,7 +51,7 @@ private Builder(OffsetCommitRequestData data, short oldestAllowedVersion, short this.data = data; } - public static Builder forTopicIdsAndNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) { + public static Builder forTopicIdsOrNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) { return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion)); } @@ -64,6 +65,21 @@ public OffsetCommitRequest build(short version) { throw new UnsupportedVersionException("The broker offset commit api version " + version + " does not support usage of config group.instance.id."); } + if (version >= 10) { + data.topics().forEach(topic -> { + if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic ids."); + } + }); + } else { + data.topics().forEach(topic -> { + if (topic.name() == null || topic.name().isEmpty()) { + throw new UnsupportedVersionException("The broker offset commit api version " + + version + " does require usage of topic names."); + } + }); + } return new OffsetCommitRequest(data, version); } diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index c6d583fc49ccd..ba3c12f0e2b47 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -37,7 +37,7 @@ // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The // request is the same as version 8. // - // Version 10 adds support for topic ids (KIP-848). + // Version 10 adds support for topic ids and removes support for topic names (KIP-848). "validVersions": "2-10", "flexibleVersions": "8+", "latestVersionUnstable": true, diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json index 5450b1238774e..0228733ce6bb0 100644 --- a/clients/src/main/resources/common/message/OffsetCommitResponse.json +++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json @@ -35,7 +35,7 @@ // the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and // GROUP_ID_NOT_FOUND when the group does not exist for both protocols. // - // Version 10 adds support for topic ids (KIP-848). + // Version 10 adds support for topic ids and removes support for topic names (KIP-848). "validVersions": "2-10", "flexibleVersions": "8+", // Supported errors: diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index ce5401dc501e7..4674bf2013e3d 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -416,7 +416,7 @@ public void testOffsetForLeaderEpochVersions() throws Exception { public void testOffsetCommitRequestVersions(short version) throws Exception { OffsetCommitRequestData request = new OffsetCommitRequestData() .setGroupId("groupId") - .setMemberId(version >= 1 ? "memberId" : "") + .setMemberId("memberId") .setGenerationIdOrMemberEpoch(version >= 1 ? 10 : -1) .setGroupInstanceId(version >= 7 ? "instanceId" : null) .setRetentionTimeMs((version >= 2 && version <= 4) ? 20 : -1) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 971988b4ccea9..e816ecb786241 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -182,7 +182,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { throw new IllegalArgumentException(s"Cannot call OffsetCommit API version $version without a topic id") } - val request = OffsetCommitRequest.Builder.forTopicIdsAndNames( + val request = OffsetCommitRequest.Builder.forTopicIdsOrNames( new OffsetCommitRequestData() .setGroupId(groupId) .setMemberId(memberId) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 70b2f56cdd259..d136b407ae6b6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1025,7 +1025,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsAndNames(offsetCommitRequest, true).build(version)) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( @@ -1086,7 +1086,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(0) .setCommittedOffset(10)).asJava)).asJava) - val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsAndNames(offsetCommitRequest, true).build(version)) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version)) val future = new CompletableFuture[OffsetCommitResponseData]() when(groupCoordinator.commitOffsets( @@ -1164,7 +1164,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(1) .setCommittedOffset(70)).asJava)).asJava) - val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsAndNames(offsetCommitRequest, true).build()) + val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build()) // This is the request expected by the group coordinator. val expectedOffsetCommitRequest = new OffsetCommitRequestData() From 3b9cf841e45afb99648e89d4ccfa59badb37de62 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 22 Apr 2025 20:56:55 +0200 Subject: [PATCH 7/7] fix test --- .../org/apache/kafka/common/requests/RequestResponseTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index d3753fecc84cf..e0986c2767b90 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -329,6 +329,8 @@ // This class performs tests requests and responses for all API keys public class RequestResponseTest { + private static final Uuid TOPIC_ID = Uuid.randomUuid(); + // Exception includes a message that we verify is not included in error responses private final UnknownServerException unknownServerException = new UnknownServerException("secret"); @@ -2363,6 +2365,7 @@ private OffsetCommitRequest createOffsetCommitRequest(short version) { .setTopics(singletonList( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName("test") + .setTopicId(TOPIC_ID) .setPartitions(asList( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) @@ -2384,6 +2387,7 @@ private OffsetCommitResponse createOffsetCommitResponse() { .setTopics(singletonList( new OffsetCommitResponseData.OffsetCommitResponseTopic() .setName("test") + .setTopicId(TOPIC_ID) .setPartitions(singletonList( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0)