Skip to content

KAFKA-14690; Add TopicId to OffsetCommit API #19461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public OffsetCommitRequest.Builder buildBatchedRequest(
.setGroupId(groupId.idValue)
.setTopics(new ArrayList<>(offsetData.values()));

return new OffsetCommitRequest.Builder(data);
return OffsetCommitRequest.Builder.forTopicNames(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
lastEpochSentOnCommit = Optional.empty();
}

OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for the record, this I expect is temporary and will change to forTopicIdsOrNames once we include client support for the new RPC versions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're absolutely right.


return buildRequestWithResponseHandling(builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM
groupInstanceId = null;
}

OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
OffsetCommitRequest.Builder builder = OffsetCommitRequest.Builder.forTopicNames(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationIdOrMemberEpoch(generation.generationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
Expand Down Expand Up @@ -45,20 +46,39 @@ public static class Builder extends AbstractRequest.Builder<OffsetCommitRequest>

private final OffsetCommitRequestData data;

public Builder(OffsetCommitRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.OFFSET_COMMIT, enableUnstableLastVersion);
private Builder(OffsetCommitRequestData data, short oldestAllowedVersion, short latestAllowedVersion) {
super(ApiKeys.OFFSET_COMMIT, oldestAllowedVersion, latestAllowedVersion);
this.data = data;
}

public Builder(OffsetCommitRequestData data) {
this(data, false);
public static Builder forTopicIdsOrNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) {
return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion));
}

public static Builder forTopicNames(OffsetCommitRequestData data) {
return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), (short) 9);
}

@Override
public OffsetCommitRequest build(short version) {
if (data.groupInstanceId() != null && version < 7) {
throw new UnsupportedVersionException("The broker offset commit protocol version " +
version + " does not support usage of config group.instance.id.");
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does not support usage of config group.instance.id.");
}
if (version >= 10) {
data.topics().forEach(topic -> {
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic ids.");
}
});
} else {
data.topics().forEach(topic -> {
if (topic.name() == null || topic.name().isEmpty()) {
throw new UnsupportedVersionException("The broker offset commit api version " +
version + " does require usage of topic names.");
}
});
}
return new OffsetCommitRequest(data, version);
}
Expand Down Expand Up @@ -97,6 +117,7 @@ public static OffsetCommitResponseData getErrorResponse(
OffsetCommitResponseData response = new OffsetCommitResponseData();
request.topics().forEach(topic -> {
OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic()
.setTopicId(topic.topicId())
.setName(topic.name());
response.topics().add(responseTopic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
Expand Down Expand Up @@ -123,43 +124,56 @@ public boolean shouldClientThrottle(short version) {
return version >= 4;
}

public static class Builder {
OffsetCommitResponseData data = new OffsetCommitResponseData();
HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
public static boolean useTopicIds(short version) {
return version >= 10;
}

private OffsetCommitResponseTopic getOrCreateTopic(
String topicName
) {
OffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
topic = new OffsetCommitResponseTopic().setName(topicName);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
return topic;
public static Builder newBuilder(boolean useTopicIds) {
if (useTopicIds) {
return new TopicIdBuilder();
} else {
return new TopicNameBuilder();
}
}

public abstract static class Builder {
protected OffsetCommitResponseData data = new OffsetCommitResponseData();

protected abstract void add(
OffsetCommitResponseTopic topic
);

protected abstract OffsetCommitResponseTopic get(
Uuid topicId,
String topicName
);

protected abstract OffsetCommitResponseTopic getOrCreate(
Uuid topicId,
String topicName
);

public Builder addPartition(
Uuid topicId,
String topicName,
int partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);

final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName);
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));

return this;
}

public <P> Builder addPartitions(
Uuid topicId,
String topicName,
List<P> partitions,
Function<P, Integer> partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
final OffsetCommitResponseTopic topicResponse = getOrCreate(topicId, topicName);
partitions.forEach(partition ->
topicResponse.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
Expand All @@ -177,11 +191,10 @@ public Builder merge(
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
OffsetCommitResponseTopic existingTopic = get(newTopic.topicId(), newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new topic data.
data.topics().add(newTopic);
byTopicName.put(newTopic.name(), newTopic);
add(newTopic);
} else {
// Otherwise, we add the partitions to the existing one. Note we
// expect non-overlapping partitions here as we don't verify
Expand All @@ -190,12 +203,85 @@ public Builder merge(
}
});
}

return this;
}

public OffsetCommitResponse build() {
return new OffsetCommitResponse(data);
}
}

public static class TopicIdBuilder extends Builder {
private final HashMap<Uuid, OffsetCommitResponseTopic> byTopicId = new HashMap<>();

@Override
protected void add(OffsetCommitResponseTopic topic) {
throwIfTopicIdIsNull(topic.topicId());
data.topics().add(topic);
byTopicId.put(topic.topicId(), topic);
}

@Override
protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) {
throwIfTopicIdIsNull(topicId);
return byTopicId.get(topicId);
}

@Override
protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) {
throwIfTopicIdIsNull(topicId);
OffsetCommitResponseTopic topic = byTopicId.get(topicId);
if (topic == null) {
topic = new OffsetCommitResponseTopic()
.setName(topicName)
.setTopicId(topicId);
data.topics().add(topic);
byTopicId.put(topicId, topic);
}
return topic;
}

private static void throwIfTopicIdIsNull(Uuid topicId) {
if (topicId == null) {
throw new IllegalArgumentException("TopicId cannot be null.");
}
}
}

public static class TopicNameBuilder extends Builder {
private final HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();

@Override
protected void add(OffsetCommitResponseTopic topic) {
throwIfTopicNameIsNull(topic.name());
data.topics().add(topic);
byTopicName.put(topic.name(), topic);
}

@Override
protected OffsetCommitResponseTopic get(Uuid topicId, String topicName) {
throwIfTopicNameIsNull(topicName);
return byTopicName.get(topicName);
}

@Override
protected OffsetCommitResponseTopic getOrCreate(Uuid topicId, String topicName) {
throwIfTopicNameIsNull(topicName);
OffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
topic = new OffsetCommitResponseTopic()
.setName(topicName)
.setTopicId(topicId);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
return topic;
}

private void throwIfTopicNameIsNull(String topicName) {
if (topicName == null) {
throw new IllegalArgumentException("TopicName cannot be null.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The
// request is the same as version 8.
"validVersions": "2-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "2-10",
"flexibleVersions": "8+",
"latestVersionUnstable": true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a Jira to mark this as stable later on? (I guess we'll wait to complete the broker side, and the client side, just in case we decide to adjust something along the way?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can mark is as stable once we are confident on the client side. Opened https://issues.apache.org/jira/browse/KAFKA-19186.

"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." },
Expand All @@ -52,8 +55,10 @@
"about": "The time period in ms to retain the offset." },
{ "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
"about": "The topics to commit offsets for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]OffsetCommitRequestPartition", "versions": "0+",
"about": "Each partition to commit offsets for.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and
// GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
"validVersions": "2-9",
//
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
"validVersions": "2-10",
"flexibleVersions": "8+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
Expand All @@ -47,13 +49,16 @@
// - FENCED_MEMBER_EPOCH (version 7+)
// - GROUP_ID_NOT_FOUND (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
// - UNKNOWN_TOPIC_ID (version 10+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+",
"about": "The responses for each topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0-9", "entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]OffsetCommitResponsePartition", "versions": "0+",
"about": "The responses for each partition in the topic.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() {
)
);

consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(offsetCommitRequestData))
consumerClient.send(coordinator.checkAndGetCoordinator(), OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequestData))
.compose(new RequestFutureAdapter<ClientResponse, Object>() {
@Override
public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}
Expand Down
Loading