Skip to content

KAFKA-19397: Not relaying on metadata to map between topic id and name. #19964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -565,7 +566,7 @@ private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType
/**
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
private void handleProduceResponse(ClientResponse response, Map<TopicIdPartition, ProducerBatch> batches, long now) {
RequestHeader requestHeader = response.requestHeader();
int correlationId = requestHeader.correlationId();
if (response.wasTimedOut()) {
Expand Down Expand Up @@ -595,9 +596,6 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
// This will be set by completeBatch.
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
// Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name());
TopicPartition tp = new TopicPartition(topicName, p.index());
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
Expand All @@ -609,7 +607,18 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
.collect(Collectors.toList()),
p.errorMessage(),
p.currentLeader());
ProducerBatch batch = batches.get(tp);
// Version 13 drop topic name and add support to topic id.
// We need to find batch based on topic id and partition index only as
// topic name in the response might be empty.
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name());
ProducerBatch batch = batches.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?

.filter(entry -> entry.getKey().same(tpId))
.map(Map.Entry::getValue).findFirst().orElse(null);

if (batch == null) {
throw new IllegalStateException("batch created for " + tpId + " can't be found, " +
"topic might be recreated after the batch creation.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the recreation of the topic shouldn't hit this IllegalStateException, right?

}
completeBatch(batch, partResp, correlationId, now, partitionsWithUpdatedLeaderInfo);
}));

Expand Down Expand Up @@ -855,7 +864,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
if (batches.isEmpty())
return;

final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
final Map<TopicIdPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
Map<String, Uuid> topicIds = topicIdsForBatches(batches);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question: what happens if the topic id is unknown (Uuid.ZERO_UUID)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always fall back on topic name if topic id not passed down in the produce request and sort it out down the line when the broker recieve the produce request. It should be the same as if the client not topic id aware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always fall back on topic name if topic id not passed down

IIRC, the topic name is ignored in RPC when the version is larger than 13. So, is it possible that the broker receives the produce request without both the topic name and id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't happened but if it did the batch would fail with "unkonw topic"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sender on client side always fetch metadata before send so we should get some sort of metadata to populate topic id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is a potential for this in Sender::topicIdsForBatches default to Uuid zero if metadata doesn't have the topic. Am thinking now that maybe ProducerBatch should use TopicIdPartition instead of TopicPartition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that Sender::topicIdsForBatches could return zero topicId in rare cases. It seems that we could just pass along the mapping to zero topicId when handling the produce response.


ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
Expand All @@ -874,7 +883,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
recordsByPartition.put(new TopicIdPartition(topicId, tp), batch);
}

String transactionalId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.common;

import org.apache.kafka.common.utils.Utils;

import java.util.Objects;

/**
Expand Down Expand Up @@ -78,6 +80,21 @@ public TopicPartition topicPartition() {
return topicPartition;
}

/**
* Checking if TopicIdPartition meant to be the same reference to same this object but doesn't have all the data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same this object => this object

* If topic name is empty and topic id is persisted then the method will rely on topic id only
* otherwise the method will rely on topic name.
* @return true if topic has same topicId and partition index as topic names some time might be empty.
*/
public boolean same(TopicIdPartition tpId) {
if (Utils.isBlank(tpId.topic()) && !tpId.topicId.equals(Uuid.ZERO_UUID)) {
return topicId.equals(tpId.topicId) &&
topicPartition.partition() == tpId.partition();
} else {
return topicPartition.equals(tpId.topicPartition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rare case that Sender::topicIdsForBatches returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.

}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
package kafka.api

import kafka.utils.TestUtils
import kafka.utils.TestUtils.{createTopicWithAdminRaw, deleteTopicWithAdminRaw}
import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment, TopicDescription}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.producer.{Callback, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals}
import org.apache.kafka.test.TestUtils.waitForCondition
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.Test

import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.CompletableFuture
import java.util.Optional

class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
Expand Down Expand Up @@ -81,6 +86,59 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic())
}

@Test
def testSendWhileTopicGetRecreated(): Unit = {
val numRecords = 10
val topic = "topic"
val admin = createAdminClient()
val producer = createProducer()

try {
val fs = CompletableFuture.runAsync(() => {
for (_ <- 1 to 20) {
recreateTopic(admin, topic)
}
})
val producerFutures = new util.ArrayList[CompletableFuture[Void]]
for (_ <- 0 until numRecords) {
producerFutures.add(CompletableFuture.runAsync(() => {
for (i <- 0 until numRecords) {
producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8)), new Callback() {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
assertNotNull(metadata)
assertNotEquals(metadata.offset, -1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the topic could be deleted, it's possible for the metadata to have -1, right?

Also, could we verify that every record's callback is called?

assertNotEquals(metadata.timestamp, RecordBatch.NO_TIMESTAMP)
}
}).get()
}
}))
}
fs.join()
producerFutures.forEach(_.join)
} catch {
case e: TopicExistsException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, where would a TopicExistsException be thrown?

admin.deleteTopics(util.List.of(topic)).all().get()
} finally {
admin.close()
producer.close()
}
}

def recreateTopic(admin: Admin, topic: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private?

waitForCondition(() => {
val topicId = if (admin.listTopics().names().get().contains(topic)) {
topicMetadata(admin, topic).topicId()
} else Uuid.ZERO_UUID
// don't wait for the physical delete
deleteTopicWithAdminRaw(admin, topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, deleteTopicWithAdminRaw() doesn't wait for the metadata propagation to the brokers. However, the producer only sees the deleted topic after the metadata is propagated. Is this test effective?

createTopicWithAdminRaw(admin, topic)
val newTopicId = if (admin.listTopics().names().get().contains(topic)) {
topicMetadata(admin, topic).topicId()
} else Uuid.ZERO_UUID
topicId != newTopicId
}, "Topic is not recreated")
}

/**
* Tests that Producer produce to new topic id after recreation.
*
Expand Down
14 changes: 10 additions & 4 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,15 +465,21 @@ object TestUtils extends Logging {
brokers: Seq[B],
controllers: Seq[ControllerServer]
): Unit = {
deleteTopicWithAdminRaw(admin, topic)
waitForAllPartitionsMetadata(brokers, topic, 0)
controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller))
}

def deleteTopicWithAdminRaw(admin: Admin, topic: String): Unit = {
try {
admin.deleteTopics(util.List.of(topic)).all().get()
if (admin.listTopics().names().get().contains(topic)) {
admin.deleteTopics(util.List.of(topic)).all().get()
}
} catch {
case e: ExecutionException if e.getCause != null &&
e.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
// ignore
// ignore
}
waitForAllPartitionsMetadata(brokers, topic, 0)
controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller))
}

/**
Expand Down