Skip to content

KAFKA-18170: Add scheduled job to snapshot cold share partitions. #19443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 23, 2025

Conversation

smjn
Copy link
Collaborator

@smjn smjn commented Apr 11, 2025

  • There could be scenarios where share partition records in
    __share_group_state internal topic are not updated for a while
    implying these partitions are basically cold.
  • In this situation, the presence of these holds back the
    pruner from keeping the topic clean and of manageable size.
  • To remedy the situation, we have added a periodic
    setupSnapshotColdPartitions in ShareCoordinatorService which does a
    writeAll operation on the associated shards in the coordinator and
    forces snapshot creation for any cold partitions. In this way the pruner
    can continue.
    This job has been added as a timer task.
  • A new internal config
    share.coordinator.cold.partition.snapshot.interval.ms has been
    introduced to set the period of the job.
  • Any failures are logged and ignored.
  • New tests have been added to verify the feature.

@smjn smjn requested a review from AndrewJSchofield April 11, 2025 08:23
@github-actions github-actions bot added triage PRs from the community KIP-932 Queues for Kafka labels Apr 11, 2025
@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Apr 11, 2025
@smjn
Copy link
Collaborator Author

smjn commented Apr 11, 2025

Sample run

{"key":{"type":0,"data":{"groupId":"gs1","topicId":"usMKwc_eQ5214UuoShlXIg","partition":0}},"value":{"version":0,"data":{"snapshotEpoch":0,"stateEpoch":1,"leaderEpoch":-1,"startOffset":-1,"createTimestamp":1744356362818,"writeTimestamp":1744356362818,"stateBatches":[]}}}

{"key":{"type":1,"data":{"groupId":"gs1","topicId":"usMKwc_eQ5214UuoShlXIg","partition":0}},"value":{"version":0,"data":{"snapshotEpoch":0,"leaderEpoch":0,"startOffset":-1,"stateBatches":[]}}}

{"key":{"type":1,"data":{"groupId":"gs1","topicId":"usMKwc_eQ5214UuoShlXIg","partition":0}},"value":{"version":0,"data":{"snapshotEpoch":0,"leaderEpoch":0,"startOffset":16,"stateBatches":[{"firstOffset":16,"lastOffset":20,"deliveryState":2,"deliveryCount":1}]}}}
...
// Job run

{"key":{"type":0,"data":{"groupId":"gs1","topicId":"usMKwc_eQ5214UuoShlXIg","partition":0}},"value":{"version":0,"data":{"snapshotEpoch":1,"stateEpoch":1,"leaderEpoch":0,"startOffset":16,"createTimestamp":1744356362818,"writeTimestamp":1744356377744,"stateBatches":[{"firstOffset":16,"lastOffset":20,"deliveryState":2,"deliveryCount":1}]}}}

.setStateEpoch(stateEpoch)
.setLeaderEpoch(leaderEpoch)
.setStartOffset(startOffset)
.setStateBatches(new ArrayList<>(stateBatches))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smjn, one question, why stateBatches needs to be wrapped with ArrayList here? Thank you.

Copy link
Collaborator Author

@smjn smjn Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise they will point to the same internal state batches list. But I understand what you mean, will close class for modification.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been playing with the code and generally it looks stable. However, I do observe that in a very quiet system, it will take snapshots every 5 minutes, even if that's all that it's doing. Surely, there comes a point where it's not worth writing a cold snapshot, because the amount of data to be replayed doesn't decrease at all by doing it.

@smjn
Copy link
Collaborator Author

smjn commented Apr 14, 2025

I've been playing with the code and generally it looks stable. However, I do observe that in a very quiet system, it will take snapshots every 5 minutes, even if that's all that it's doing. Surely, there comes a point where it's not worth writing a cold snapshot, because the amount of data to be replayed doesn't decrease at all by doing it.

We can modify the logic to stop cold snapshotting if all existing share partitions are already snapshotted. This means that after last snapshot they have not been written to organically.

so if we end up in state:

snap1-force
snap2-organic
snap3-force

cold snapshotting will happen at 5 minute intervals

but if we end up with

snap1-force
snap2-force
snap3-force

cold snapshotting will stop

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I've been playing with this for a few hours and it seems to work properly. I have a query about one part of it.


// If all share partitions are snapshotted, it means that
// system is quiet and cold snapshotting will not help much.
if (coldSnapshottedPartitionsCount != 0 && coldSnapshottedPartitionsCount == shareStateMap.size()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm uncertain about this test. If the share state map is empty, then this if condition will be false, and we will proceed to do zero snapshots. Surely, this is a situation which should skip the snapshotting. What's wrong with just coldSnapshottedPartitionsCount == shareStateMap.size()? I know the code previously did that, but it seems a better test than what is there now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated

@smjn smjn requested a review from AndrewJSchofield April 16, 2025 14:48
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one additional comment. I will continue to exercise this branch on my local environment, but I think it can be merged soon.

shard.replay(offset + 2, producerId, producerEpoch, records.get(0));

assertEquals(timestamp + delta, shard.getShareStateMapValue(key1).writeTimestamp());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I suppose you should assertEquals(timestamp + sleep, shard.getShareStateMapValue(key0).writeTimestamp()) also.

@AndrewJSchofield
Copy link
Member

When I delete the __share_group_state topic I get the following exception information:

[2025-04-17 09:41:17,524] INFO [ShareCoordinator id=1] Pruning records in __share_group_state-0 till offset 3. (org.apache.kafka.coordinator.share.ShareCoordinatorService)
[2025-04-17 09:41:17,527] ERROR [ShareCoordinator id=1] Received error in share-group state topic prune. (org.apache.kafka.coordinator.share.ShareCoordinatorService)
java.util.concurrent.CompletionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) [?:?]
	at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) [?:?]
	at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419) [?:?]
	at org.apache.kafka.coordinator.share.ShareCoordinatorService$1.run(ShareCoordinatorService.java:281) [kafka-share-coordinator-4.1.0-SNAPSHOT.jar:?]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.

The background tasks perpetually throw exceptions in this situation, and I suspect that a more orderly leadership change could similarly make the code sad. While the coordinator runtime is properly able to handle unfortunate leadership events, I think the error handling of the background tasks in the share coordinator needs a little refinement.

@smjn
Copy link
Collaborator Author

smjn commented Apr 18, 2025

When I delete the __share_group_state topic I get the following exception information:

[2025-04-17 09:41:17,524] INFO [ShareCoordinator id=1] Pruning records in __share_group_state-0 till offset 3. (org.apache.kafka.coordinator.share.ShareCoordinatorService)
[2025-04-17 09:41:17,527] ERROR [ShareCoordinator id=1] Received error in share-group state topic prune. (org.apache.kafka.coordinator.share.ShareCoordinatorService)
java.util.concurrent.CompletionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) [?:?]
	at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) [?:?]
	at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419) [?:?]
	at org.apache.kafka.coordinator.share.ShareCoordinatorService$1.run(ShareCoordinatorService.java:281) [kafka-share-coordinator-4.1.0-SNAPSHOT.jar:?]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.

The background tasks perpetually throw exceptions in this situation, and I suspect that a more orderly leadership change could similarly make the code sad. While the coordinator runtime is properly able to handle unfortunate leadership events, I think the error handling of the background tasks in the share coordinator needs a little refinement.

@AndrewJSchofield I do not understand this use case. The leadership changes are handled properly by the job as the topic partitions used by the job (activeTopicPartitions()) is maintained by the runtime.

If an internal TP is moved from broker 1 -> 2 then the corresponding active topic partition lists will be different for broker 1 and broker 2 (maintained by runtime). If you are talking about logging the exception - we can handle that in this case.

We have a similar test in ShareConsumerTest.testShareConsumerAfterCoordinatorMovement as well.

The exception is due to trying to delete the record offsets from the replicaManager, there is no exception being thrown from the runtime here.

In this specific instance, the runtime continues returning all TPs as active even after internal topic deletion.

@smjn smjn requested a review from AndrewJSchofield April 18, 2025 17:06
@AndrewJSchofield
Copy link
Member

I have opened an issue to handle __share_group_state topic deletion (https://issues.apache.org/jira/browse/KAFKA-19189). In particular the record pruning throws an exception on every iteration without specifically handling NOT_LEADER_OR_FOLLOWER. Apart from that, this PR looks good to me.

@AndrewJSchofield AndrewJSchofield merged commit 6fe1598 into apache:trunk Apr 23, 2025
21 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants