From ba872aad144a4611ef9e6a42d7c034d99dfbb37d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 16 Apr 2025 13:45:30 +0100 Subject: [PATCH] KAFKA-19160: Improve performance of fetching stable offsets When fetching stable offsets in the group coordinator, we iterate over all requested partitions. For each partition, we iterate over the group's ongoing transactions to check if there is a pending transactional offset commit for that partition. This can get slow when there are a large number of partitions and a large number of pending transactions. Instead, maintain a list of pending transactions per partition to speed up lookups. --- .../group/OffsetMetadataManager.java | 124 ++++++++++++------ 1 file changed, 85 insertions(+), 39 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 2b50071a7f771..ab3cd89c63fe5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -194,9 +194,16 @@ public OffsetMetadataManager build() { /** * The open transactions (producer ids) keyed by group. + * Tracks whether groups have any open transactions. */ private final TimelineHashMap> openTransactionsByGroup; + /** + * The open transactions (producer ids) keyed by group id, topic name and partition id. + * Tracks whether partitions have any pending transactional offsets. + */ + private final TimelineHashMap>>> openTransactionsByGroupTopicAndPartition; + private class Offsets { /** * The offsets keyed by group id, topic name and partition id. @@ -281,6 +288,7 @@ private OffsetAndMetadata remove( this.offsets = new Offsets(); this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0); this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + this.openTransactionsByGroupTopicAndPartition = new TimelineHashMap<>(snapshotRegistry, 0); } /** @@ -650,24 +658,18 @@ public int deleteAllOffsets( // Delete all the pending transactional offsets too. Here we only write a tombstone // if the topic-partition was not in the main storage because we don't need to write // two consecutive tombstones. - TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.forEach(producerId -> { - Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId); - if (pendingOffsets != null) { - TimelineHashMap> pendingGroupOffsets = - pendingOffsets.offsetsByGroup.get(groupId); - if (pendingGroupOffsets != null) { - pendingGroupOffsets.forEach((topic, offsetsByPartition) -> { - offsetsByPartition.keySet().forEach(partition -> { - if (!hasCommittedOffset(groupId, topic, partition)) { - records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); - numDeletedOffsets.getAndIncrement(); - } - }); - }); - } - } + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic != null) { + openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> { + openTransactionsByPartition.forEach((partition, producerIds) -> { + producerIds.forEach(producerId -> { + if (!hasCommittedOffset(groupId, topic, partition)) { + records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); + numDeletedOffsets.getAndIncrement(); + } + }); + }); }); } @@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets( String topic, int partition ) { - final TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions == null) return false; + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic == null) return false; - for (Long producerId : openTransactions) { - Offsets offsets = pendingTransactionalOffsets.get(producerId); - if (offsets != null && offsets.get(groupId, topic, partition) != null) { - return true; - } - } + TimelineHashMap> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition == null) return false; - return false; + TimelineHashSet openTransactions = openTransactionsByPartition.get(partition); + return openTransactions != null && !openTransactions.isEmpty(); } /** @@ -1005,6 +1005,11 @@ public void replay( openTransactionsByGroup .computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) .add(producerId); + openTransactionsByGroupTopicAndPartition + .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(producerId); } } else { if (offsets.remove(groupId, topic, partition) != null) { @@ -1012,14 +1017,29 @@ public void replay( } // Remove all the pending offset commits related to the tombstone. - TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.forEach(openProducerId -> { - Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); - if (pendingOffsets != null) { - pendingOffsets.remove(groupId, topic, partition); + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic != null) { + TimelineHashMap> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition != null) { + TimelineHashSet openTransactions = openTransactionsByPartition.get(partition); + if (openTransactions != null) { + openTransactions.forEach(openProducerId -> { + Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); + if (pendingOffsets != null) { + pendingOffsets.remove(groupId, topic, partition); + } + }); + + openTransactionsByPartition.remove(partition); + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + } + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroupTopicAndPartition.remove(groupId); + } } - }); + } } } } @@ -1031,6 +1051,7 @@ public void replay( * @param result The result of the transaction. * @throws RuntimeException if the transaction can not be completed. */ + @SuppressWarnings("NPathComplexity") public void replayEndTransactionMarker( long producerId, TransactionResult result @@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker( return; } - pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> { - TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.remove(producerId); - if (openTransactions.isEmpty()) { + pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { + TimelineHashSet groupTransactions = openTransactionsByGroup.get(groupId); + if (groupTransactions != null) { + groupTransactions.remove(producerId); + if (groupTransactions.isEmpty()) { openTransactionsByGroup.remove(groupId); } } + + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic == null) return; + + topicOffsets.forEach((topic, partitionOffsets) -> { + TimelineHashMap> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition == null) return; + + partitionOffsets.keySet().forEach(partitionId -> { + TimelineHashSet partitionTransactions = openTransactionsByPartition.get(partitionId); + if (partitionTransactions != null) { + partitionTransactions.remove(producerId); + if (partitionTransactions.isEmpty()) { + openTransactionsByPartition.remove(partitionId); + } + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + } + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroupTopicAndPartition.remove(groupId); + } + } + }); + }); }); if (result == TransactionResult.COMMIT) {