-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19397: Not relaying on metadata to map between topic id and name. #19964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
324230c
3f948eb
bb6d82b
584740a
b57e771
1faf543
f47ed06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.apache.kafka.common.KafkaException; | ||
import org.apache.kafka.common.MetricName; | ||
import org.apache.kafka.common.Node; | ||
import org.apache.kafka.common.TopicIdPartition; | ||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.kafka.common.Uuid; | ||
import org.apache.kafka.common.errors.AuthenticationException; | ||
|
@@ -565,7 +566,7 @@ private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType | |
/** | ||
* Handle a produce response | ||
*/ | ||
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) { | ||
private void handleProduceResponse(ClientResponse response, Map<TopicIdPartition, ProducerBatch> batches, long now) { | ||
RequestHeader requestHeader = response.requestHeader(); | ||
int correlationId = requestHeader.correlationId(); | ||
if (response.wasTimedOut()) { | ||
|
@@ -595,9 +596,6 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, | |
// This will be set by completeBatch. | ||
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); | ||
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { | ||
// Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. | ||
String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name()); | ||
TopicPartition tp = new TopicPartition(topicName, p.index()); | ||
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( | ||
Errors.forCode(p.errorCode()), | ||
p.baseOffset(), | ||
|
@@ -609,7 +607,18 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, | |
.collect(Collectors.toList()), | ||
p.errorMessage(), | ||
p.currentLeader()); | ||
ProducerBatch batch = batches.get(tp); | ||
// Version 13 drop topic name and add support to topic id. | ||
// We need to find batch based on topic id and partition index only as | ||
// topic name in the response might be empty. | ||
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name()); | ||
ProducerBatch batch = batches.entrySet().stream() | ||
.filter(entry -> entry.getKey().same(tpId)) | ||
.map(Map.Entry::getValue).findFirst().orElse(null); | ||
|
||
if (batch == null) { | ||
throw new IllegalStateException("batch created for " + tpId + " can't be found, " + | ||
"topic might be recreated after the batch creation."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, the recreation of the topic shouldn't hit this IllegalStateException, right? |
||
} | ||
completeBatch(batch, partResp, correlationId, now, partitionsWithUpdatedLeaderInfo); | ||
})); | ||
|
||
|
@@ -855,7 +864,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo | |
if (batches.isEmpty()) | ||
return; | ||
|
||
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); | ||
final Map<TopicIdPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); | ||
Map<String, Uuid> topicIds = topicIdsForBatches(batches); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one question: what happens if the topic id is unknown ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We always fall back on topic name if topic id not passed down in the produce request and sort it out down the line when the broker recieve the produce request. It should be the same as if the client not topic id aware. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
IIRC, the topic name is ignored in RPC when the version is larger than 13. So, is it possible that the broker receives the produce request without both the topic name and id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't happened but if it did the batch would fail with "unkonw topic" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sender on client side always fetch metadata before send so we should get some sort of metadata to populate topic id There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually there is a potential for this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's true that |
||
|
||
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection(); | ||
|
@@ -874,7 +883,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo | |
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData() | ||
.setIndex(tp.partition()) | ||
.setRecords(records)); | ||
recordsByPartition.put(tp, batch); | ||
recordsByPartition.put(new TopicIdPartition(topicId, tp), batch); | ||
} | ||
|
||
String transactionalId = null; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
*/ | ||
package org.apache.kafka.common; | ||
|
||
import org.apache.kafka.common.utils.Utils; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
|
@@ -78,6 +80,21 @@ public TopicPartition topicPartition() { | |
return topicPartition; | ||
} | ||
|
||
/** | ||
* Checking if TopicIdPartition meant to be the same reference to same this object but doesn't have all the data. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same this object => this object |
||
* If topic name is empty and topic id is persisted then the method will rely on topic id only | ||
* otherwise the method will rely on topic name. | ||
* @return true if topic has same topicId and partition index as topic names some time might be empty. | ||
*/ | ||
public boolean same(TopicIdPartition tpId) { | ||
if (Utils.isBlank(tpId.topic()) && !tpId.topicId.equals(Uuid.ZERO_UUID)) { | ||
return topicId.equals(tpId.topicId) && | ||
topicPartition.partition() == tpId.partition(); | ||
} else { | ||
return topicPartition.equals(tpId.topicPartition()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the rare case that |
||
} | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,20 @@ | |
package kafka.api | ||
|
||
import kafka.utils.TestUtils | ||
import kafka.utils.TestUtils.{createTopicWithAdminRaw, deleteTopicWithAdminRaw} | ||
import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment, TopicDescription} | ||
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, RecordMetadata} | ||
import org.apache.kafka.common.TopicPartition | ||
import org.apache.kafka.clients.producer.{Callback, ProducerConfig, ProducerRecord, RecordMetadata} | ||
import org.apache.kafka.common.{TopicPartition, Uuid} | ||
import org.apache.kafka.common.errors.TopicExistsException | ||
import org.apache.kafka.common.record.RecordBatch | ||
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} | ||
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals} | ||
import org.apache.kafka.test.TestUtils.waitForCondition | ||
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull} | ||
import org.junit.jupiter.api.Test | ||
|
||
import java.nio.charset.StandardCharsets | ||
import java.util | ||
import java.util.concurrent.CompletableFuture | ||
import java.util.Optional | ||
|
||
class ProducerSendWhileDeletionTest extends IntegrationTestHarness { | ||
|
@@ -81,6 +86,59 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { | |
assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic()) | ||
} | ||
|
||
@Test | ||
def testSendWhileTopicGetRecreated(): Unit = { | ||
val numRecords = 10 | ||
val topic = "topic" | ||
val admin = createAdminClient() | ||
val producer = createProducer() | ||
|
||
try { | ||
val fs = CompletableFuture.runAsync(() => { | ||
for (_ <- 1 to 20) { | ||
recreateTopic(admin, topic) | ||
} | ||
}) | ||
val producerFutures = new util.ArrayList[CompletableFuture[Void]] | ||
for (_ <- 0 until numRecords) { | ||
producerFutures.add(CompletableFuture.runAsync(() => { | ||
for (i <- 0 until numRecords) { | ||
producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8)), new Callback() { | ||
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { | ||
assertNotNull(metadata) | ||
assertNotEquals(metadata.offset, -1L) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the topic could be deleted, it's possible for the metadata to have -1, right? Also, could we verify that every record's callback is called? |
||
assertNotEquals(metadata.timestamp, RecordBatch.NO_TIMESTAMP) | ||
} | ||
}).get() | ||
} | ||
})) | ||
} | ||
fs.join() | ||
producerFutures.forEach(_.join) | ||
} catch { | ||
case e: TopicExistsException => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, where would a TopicExistsException be thrown? |
||
admin.deleteTopics(util.List.of(topic)).all().get() | ||
} finally { | ||
admin.close() | ||
producer.close() | ||
} | ||
} | ||
|
||
def recreateTopic(admin: Admin, topic: String): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be private? |
||
waitForCondition(() => { | ||
val topicId = if (admin.listTopics().names().get().contains(topic)) { | ||
topicMetadata(admin, topic).topicId() | ||
} else Uuid.ZERO_UUID | ||
// don't wait for the physical delete | ||
deleteTopicWithAdminRaw(admin, topic) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, |
||
createTopicWithAdminRaw(admin, topic) | ||
val newTopicId = if (admin.listTopics().names().get().contains(topic)) { | ||
topicMetadata(admin, topic).topicId() | ||
} else Uuid.ZERO_UUID | ||
topicId != newTopicId | ||
}, "Topic is not recreated") | ||
} | ||
|
||
/** | ||
* Tests that Producer produce to new topic id after recreation. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?