Skip to content

KAFKA-19397: Not relaying on metadata to map between topic id and name. #19964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from

Conversation

OmniaGM
Copy link
Contributor

@OmniaGM OmniaGM commented Jun 13, 2025

  • Metadata doesn't have the full view of topicNames to ids during
    rebootstrap of client or when topic has been deleted/recreated. The
    solution is to pass down topic id and stop trying to figure it out later
    in the logic.

- Metadata doesn't have the full view of topicNames to ids during rebootstrap of client or when topic has been deleted/recreated. The solution is to pass down topic id and stop trying to figure it out later in the logic.
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the PR. Overall, it looks good. A few comments below. Also, could we add a unit test?

@lucasbru : Could you test this PR with the stream job?


if (matchedBatchesForTopicId.size() > 1) {
matchedBatchesForTopicId.forEach(matchedBatch ->
failBatch(matchedBatch, new RuntimeException("More than one batch with same topic id and partition."), false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is unexpected, we want to throw IllegalStateException as in other places in this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this now in favour of using first find. I am just thinking should we in general return IllegalStateException if batch is null?

@github-actions github-actions bot removed the triage PRs from the community label Jun 15, 2025
@lucasbru
Copy link
Member

@junrao I redeployed the soak with the fix. I will report back if the problem reoccurs

@lucasbru
Copy link
Member

@OmniaGM @junrao I have been running the soak for 24h, and it's looking good.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM thanks for this patch. a couple of comments are left. PTAL

// topic name in the response might be empty.
ProducerBatch batch = batches.entrySet().stream()
.filter(entry ->
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create the TopicIdPartition outside the stream to avoid creating many temporary objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the initalisation out of stream

ProducerBatch batch = batches.entrySet().stream()
.filter(entry ->
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name()))
).map(Map.Entry::getValue).findFirst().orElse(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to have null batch, right? For example, the topic is recreated after the batch is generated

Copy link
Contributor Author

@OmniaGM OmniaGM Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always had this potentail of batch is null this why I raised the comment here #19964 (comment) that should we have IllegalStateException. I updated this to fail with IllegalStateException instead of leaving it like this

@@ -855,7 +860,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
if (batches.isEmpty())
return;

final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
final Map<TopicIdPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
Map<String, Uuid> topicIds = topicIdsForBatches(batches);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question: what happens if the topic id is unknown (Uuid.ZERO_UUID)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always fall back on topic name if topic id not passed down in the produce request and sort it out down the line when the broker recieve the produce request. It should be the same as if the client not topic id aware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always fall back on topic name if topic id not passed down

IIRC, the topic name is ignored in RPC when the version is larger than 13. So, is it possible that the broker receives the produce request without both the topic name and id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't happened but if it did the batch would fail with "unkonw topic"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sender on client side always fetch metadata before send so we should get some sort of metadata to populate topic id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is a potential for this in Sender::topicIdsForBatches default to Uuid zero if metadata doesn't have the topic. Am thinking now that maybe ProducerBatch should use TopicIdPartition instead of TopicPartition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that Sender::topicIdsForBatches could return zero topicId in rare cases. It seems that we could just pass along the mapping to zero topicId when handling the produce response.

@github-actions github-actions bot added core Kafka Broker and removed small Small PRs labels Jun 17, 2025
@OmniaGM
Copy link
Contributor Author

OmniaGM commented Jun 17, 2025

I have added some test in ProducerSendWhileDeletionTest to cover recreation while producing as well hope this will be enough to cover these cases.

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @OmniaGM!

A few very minor comments.

Thanks!

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a rebase, but LGTM

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the updated PR. A few more comments.

@@ -78,6 +80,21 @@ public TopicPartition topicPartition() {
return topicPartition;
}

/**
* Checking if TopicIdPartition meant to be the same reference to same this object but doesn't have all the data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same this object => this object

return topicId.equals(tpId.topicId) &&
topicPartition.partition() == tpId.partition();
} else {
return topicPartition.equals(tpId.topicPartition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rare case that Sender::topicIdsForBatches returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.

}
}

def recreateTopic(admin: Admin, topic: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

fs.join()
producerFutures.forEach(_.join)
} catch {
case e: TopicExistsException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, where would a TopicExistsException be thrown?

producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8)), new Callback() {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
assertNotNull(metadata)
assertNotEquals(metadata.offset, -1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the topic could be deleted, it's possible for the metadata to have -1, right?

Also, could we verify that every record's callback is called?


if (batch == null) {
throw new IllegalStateException("batch created for " + tpId + " can't be found, " +
"topic might be recreated after the batch creation.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the recreation of the topic shouldn't hit this IllegalStateException, right?

topicMetadata(admin, topic).topicId()
} else Uuid.ZERO_UUID
// don't wait for the physical delete
deleteTopicWithAdminRaw(admin, topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, deleteTopicWithAdminRaw() doesn't wait for the metadata propagation to the brokers. However, the producer only sees the deleted topic after the metadata is propagated. Is this test effective?

// We need to find batch based on topic id and partition index only as
// topic name in the response might be empty.
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name());
ProducerBatch batch = batches.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants