-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19397: Not relaying on metadata to map between topic id and name. #19964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
- Metadata doesn't have the full view of topicNames to ids during rebootstrap of client or when topic has been deleted/recreated. The solution is to pass down topic id and stop trying to figure it out later in the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
|
||
if (matchedBatchesForTopicId.size() > 1) { | ||
matchedBatchesForTopicId.forEach(matchedBatch -> | ||
failBatch(matchedBatch, new RuntimeException("More than one batch with same topic id and partition."), false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is unexpected, we want to throw IllegalStateException as in other places in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this now in favour of using first find. I am just thinking should we in general return IllegalStateException if batch is null?
@junrao I redeployed the soak with the fix. I will report back if the problem reoccurs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM thanks for this patch. a couple of comments are left. PTAL
// topic name in the response might be empty. | ||
ProducerBatch batch = batches.entrySet().stream() | ||
.filter(entry -> | ||
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create the TopicIdPartition
outside the stream to avoid creating many temporary objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the initalisation out of stream
ProducerBatch batch = batches.entrySet().stream() | ||
.filter(entry -> | ||
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name())) | ||
).map(Map.Entry::getValue).findFirst().orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to have null batch, right? For example, the topic is recreated after the batch is generated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always had this potentail of batch is null this why I raised the comment here #19964 (comment) that should we have IllegalStateException. I updated this to fail with IllegalStateException instead of leaving it like this
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
@@ -855,7 +860,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo | |||
if (batches.isEmpty()) | |||
return; | |||
|
|||
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); | |||
final Map<TopicIdPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); | |||
Map<String, Uuid> topicIds = topicIdsForBatches(batches); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one question: what happens if the topic id is unknown (Uuid.ZERO_UUID
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always fall back on topic name if topic id not passed down in the produce request and sort it out down the line when the broker recieve the produce request. It should be the same as if the client not topic id aware.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always fall back on topic name if topic id not passed down
IIRC, the topic name is ignored in RPC when the version is larger than 13. So, is it possible that the broker receives the produce request without both the topic name and id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't happened but if it did the batch would fail with "unkonw topic"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sender on client side always fetch metadata before send so we should get some sort of metadata to populate topic id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there is a potential for this in Sender::topicIdsForBatches
default to Uuid zero if metadata doesn't have the topic. Am thinking now that maybe ProducerBatch
should use TopicIdPartition
instead of TopicPartition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true that Sender::topicIdsForBatches
could return zero topicId in rare cases. It seems that we could just pass along the mapping to zero topicId when handling the produce response.
I have added some test in ProducerSendWhileDeletionTest to cover recreation while producing as well hope this will be enough to cover these cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
….java Co-authored-by: Kirk True <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a rebase, but LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. A few more comments.
@@ -78,6 +80,21 @@ public TopicPartition topicPartition() { | |||
return topicPartition; | |||
} | |||
|
|||
/** | |||
* Checking if TopicIdPartition meant to be the same reference to same this object but doesn't have all the data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same this object => this object
return topicId.equals(tpId.topicId) && | ||
topicPartition.partition() == tpId.partition(); | ||
} else { | ||
return topicPartition.equals(tpId.topicPartition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rare case that Sender::topicIdsForBatches
returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.
} | ||
} | ||
|
||
def recreateTopic(admin: Admin, topic: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be private?
fs.join() | ||
producerFutures.forEach(_.join) | ||
} catch { | ||
case e: TopicExistsException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, where would a TopicExistsException be thrown?
producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8)), new Callback() { | ||
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { | ||
assertNotNull(metadata) | ||
assertNotEquals(metadata.offset, -1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the topic could be deleted, it's possible for the metadata to have -1, right?
Also, could we verify that every record's callback is called?
|
||
if (batch == null) { | ||
throw new IllegalStateException("batch created for " + tpId + " can't be found, " + | ||
"topic might be recreated after the batch creation."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the recreation of the topic shouldn't hit this IllegalStateException, right?
topicMetadata(admin, topic).topicId() | ||
} else Uuid.ZERO_UUID | ||
// don't wait for the physical delete | ||
deleteTopicWithAdminRaw(admin, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, deleteTopicWithAdminRaw()
doesn't wait for the metadata propagation to the brokers. However, the producer only sees the deleted topic after the metadata is propagated. Is this test effective?
// We need to find batch based on topic id and partition index only as | ||
// topic name in the response might be empty. | ||
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name()); | ||
ProducerBatch batch = batches.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?
rebootstrap of client or when topic has been deleted/recreated. The
solution is to pass down topic id and stop trying to figure it out later
in the logic.