-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-18170: Add scheduled job to snapshot cold share partitions. #19443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Sample run
|
.setStateEpoch(stateEpoch) | ||
.setLeaderEpoch(leaderEpoch) | ||
.setStartOffset(startOffset) | ||
.setStateBatches(new ArrayList<>(stateBatches)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@smjn, one question, why stateBatches
needs to be wrapped with ArrayList
here? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise they will point to the same internal state batches list. But I understand what you mean, will close class for modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been playing with the code and generally it looks stable. However, I do observe that in a very quiet system, it will take snapshots every 5 minutes, even if that's all that it's doing. Surely, there comes a point where it's not worth writing a cold snapshot, because the amount of data to be replayed doesn't decrease at all by doing it.
We can modify the logic to stop cold snapshotting if all existing share partitions are already snapshotted. This means that after last snapshot they have not been written to organically. so if we end up in state:
cold snapshotting will happen at 5 minute intervals but if we end up with
cold snapshotting will stop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I've been playing with this for a few hours and it seems to work properly. I have a query about one part of it.
|
||
// If all share partitions are snapshotted, it means that | ||
// system is quiet and cold snapshotting will not help much. | ||
if (coldSnapshottedPartitionsCount != 0 && coldSnapshottedPartitionsCount == shareStateMap.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm uncertain about this test. If the share state map is empty, then this if condition will be false, and we will proceed to do zero snapshots. Surely, this is a situation which should skip the snapshotting. What's wrong with just coldSnapshottedPartitionsCount == shareStateMap.size()
? I know the code previously did that, but it seems a better test than what is there now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorporated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one additional comment. I will continue to exercise this branch on my local environment, but I think it can be merged soon.
shard.replay(offset + 2, producerId, producerEpoch, records.get(0)); | ||
|
||
assertEquals(timestamp + delta, shard.getShareStateMapValue(key1).writeTimestamp()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I suppose you should assertEquals(timestamp + sleep, shard.getShareStateMapValue(key0).writeTimestamp())
also.
When I delete the
The background tasks perpetually throw exceptions in this situation, and I suspect that a more orderly leadership change could similarly make the code sad. While the coordinator runtime is properly able to handle unfortunate leadership events, I think the error handling of the background tasks in the share coordinator needs a little refinement. |
@AndrewJSchofield I do not understand this use case. The leadership changes are handled properly by the job as the topic partitions used by the job (activeTopicPartitions()) is maintained by the runtime. If an internal TP is moved from broker 1 -> 2 then the corresponding active topic partition lists will be different for broker 1 and broker 2 (maintained by runtime). If you are talking about logging the exception - we can handle that in this case. We have a similar test in The exception is due to trying to delete the record offsets from the replicaManager, there is no exception being thrown from the runtime here. In this specific instance, the runtime continues returning all TPs as active even after internal topic deletion. |
I have opened an issue to handle |
__share_group_state
internal topic are not updated for a whileimplying these partitions are basically cold.
pruner from keeping the topic clean and of manageable size.
setupSnapshotColdPartitions
inShareCoordinatorService
which does awriteAll operation on the associated shards in the coordinator and
forces snapshot creation for any cold partitions. In this way the pruner
can continue.
This job has been added as a timer task.
share.coordinator.cold.partition.snapshot.interval.ms
has beenintroduced to set the period of the job.