-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19357: AsyncConsumer#close hangs during closing because the commitAsync request never completes due to a missing coordinator #19914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
@@ -99,7 +99,7 @@ public void signalClose() { | |||
*/ | |||
@Override | |||
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { | |||
if (closing || this.coordinator != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes the consumer find the coordinator during closing, right? If the consumer doesn't have a coordinator running, does it make sense to find one during closing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we do need to be careful. There is also code to make sure that finding a coordinator does not block the progress of closing. If you start a consumer when there are no running brokers, close needs to complete promptly. If you stop the brokers when a consumer has been running and then attempt a close, it also needs to complete promptly.
@@ -99,7 +99,7 @@ public void signalClose() { | |||
*/ | |||
@Override | |||
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { | |||
if (closing || this.coordinator != null) | |||
if (this.coordinator != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, left a comment.
Should we add an IT for this scenario: when closing the consumer, commitAsync, and verify that the consumer can shut down properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @Mirai1129! As @DL1231 mentioned, because closing the AsyncKafkaConsumer
is such a tricky area, we really do need to have some tests to assure us that this works and doesn't break something else.
Merge branch 'trunk' into KAFKA-19357
…inatorRequestManager`
Hi everyone, Thanks for all the feedback and suggestions! And sorry for late reply. I've updated the PR with the timeout handling solution - when I'm currently working on adding comprehensive integration tests to cover this scenario and will push them shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the coordinator might keep an unknown state and probably won't recover in time.
To prevent blocking the closing process, we should complete the pending commits request exceptionally (e.g., CommitFailedException
) if the consumer is closing and the coordinator is unknown.
This could inform the user that the last commits have not been submitted correctly and also ensure a smooth closing process.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Mirai1129: Thanks for the update.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @Mirai1129 for this patch, left some comments
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
Outdated
Show resolved
Hide resolved
...ation-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming CI pass
// Try without looking up the coordinator first | ||
var callback = new CountConsumerCommitCallback(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this callback need this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually idk why, cuz I just duplicated testCommitAsyncCompletedBeforeConsumerCloses
and modified it. But if it doesn't need let me remove it 🥹.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Just a couple of questions on the test. Thanks!
var callback = new CountConsumerCommitCallback(); | ||
|
||
// Close the coordinator before committing because otherwise the commit will fail to find the coordinator. | ||
cluster.brokerIds().forEach(cluster::shutdownBroker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to have some sort of wiatFor()
check after this to ensure the brokers are down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much! I'll add waitFor()
ensure the brokers are down.
|
||
consumer.poll(Duration.ofMillis(500)); | ||
consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), callback); | ||
consumer.close(CloseOptions.timeout(Duration.ofMillis(500))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for close()
to take more than 500 milliseconds? Should we time the method call and add an assert to ensure it's not more than, say, 1000 milliseconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change to avoid the test being incorrect due to the shutdown taking too long then we add an assert to ensure that?
Problem:
When AsyncConsumer is closing, CoordinatorRequestManager stops looking for coordinator by returning EMPTY in poll() method when closing flag is true.
This prevents commitAsync() and other coordinator-dependent operations from completing, causing close() to hang indefinitely.
Solution:
Remove the closing flag check in poll() method of CoordinatorRequestManager, so it continues to look for coordinator when needed, even during closing state. This ensures pending coordinator-dependent operations can complete during shutdown.