-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19190: Handle shutdown application correctly #19544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3a0c62c
to
1e8b847
Compare
PTAL @aliehsaeedii |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @lucasbru !
I had only minor comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
Outdated
Show resolved
Hide resolved
…als/StreamsRebalanceData.java Co-authored-by: Bruno Cadonna <[email protected]>
…als/StreamsRebalanceData.java Co-authored-by: Bruno Cadonna <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru - overall LGTM with a couple of minor comments
String statusDetails = statuses.stream() | ||
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail()) | ||
.collect(Collectors.joining(", ")); | ||
logger.warn("Membership is in the following statuses: {}", statusDetails); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a warn? Seems more like an INFO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good suggestion, that I was thinking about as well. The thing is, right now the streams application keeps running when a source topic is missing, and this membership log is the only client-side thing that is pointing out why no processing is happening. So I would like to keep it at WARN
for now. It was intended in the KIP to be like this, but I discussed with Matthias as well, that we want to fail in these cases instead - so if a source topic doesn't exist, internal topics aren't created within a certain time, or topics are malpartitioned, we actually want to throw an exception (inside StreamThread). This PR is preparing the change by propagating the status to the StreamThread.
Lets make this an INFO
log, once we treat the statuses more explicitly inside StreamThread
.
@@ -518,7 +517,7 @@ private void handleStreamsUncaughtException(final Throwable throwable, | |||
break; | |||
case SHUTDOWN_CLIENT: | |||
log.error( | |||
"Encountered the following exception during processing and the registered exception handler" + | |||
"Encountered the following exception during processing and the registered exception handler " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Encountered the following exception during processing and the registered exception handler " + | |
"Encountered the following exception during processing and the registered exception handler" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually an intentional fix. The error message was "handleropted" before
public void sendShutdownRequest(final AssignorError assignorError) { | ||
assignmentErrorCode.set(assignorError.code()); | ||
public void sendShutdownRequest() { | ||
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean a shutdown is always considered an error? What about cases where users proactively shutdown? Or is this only executed in error cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not intend to change the behavior of Kafka Streams here. I believe, the SHUTDOWN_REQUESTED
propagated through rebalances is always handled as an error, yes.
The change in this line doesn't change anything functionally, since we'd always pass the same assignorError
to this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Leave some minor comments.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
Outdated
Show resolved
Hide resolved
…als/StreamsRebalanceDataTest.java Co-authored-by: PoAn Yang <[email protected]>
…als/StreamsRebalanceData.java Co-authored-by: PoAn Yang <[email protected]>
…als/StreamsRebalanceData.java Co-authored-by: PoAn Yang <[email protected]>
If the streams rebalance protocol is enabled in
StreamsUncaughtExceptionHandlerIntegrationTest, the streams application
does not shut down correctly upon error.
There are two causes for this - Sometimes, the SHUTDOWN_APPLICATION
code only sent with the leave heartbeat, but that is not handled broker
side. - The SHUTDOWN_APPLICATION code wasn't properly handled
client-side at all
Reviewers: Bruno Cadonna [email protected], Bill Bejeck
[email protected], PoAn Yang [email protected]