Skip to content

KAFKA-19357: AsyncConsumer#close hangs during closing because the commitAsync request never completes due to a missing coordinator #19914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from

Conversation

Mirai1129
Copy link
Contributor

@Mirai1129 Mirai1129 commented Jun 6, 2025

Problem:
When AsyncConsumer is closing, CoordinatorRequestManager stops looking for coordinator by returning EMPTY in poll() method when closing flag is true.
This prevents commitAsync() and other coordinator-dependent operations from completing, causing close() to hang indefinitely.

Solution:
Remove the closing flag check in poll() method of CoordinatorRequestManager, so it continues to look for coordinator when needed, even during closing state. This ensures pending coordinator-dependent operations can complete during shutdown.

@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels Jun 6, 2025
@@ -99,7 +99,7 @@ public void signalClose() {
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (closing || this.coordinator != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the consumer find the coordinator during closing, right? If the consumer doesn't have a coordinator running, does it make sense to find one during closing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we do need to be careful. There is also code to make sure that finding a coordinator does not block the progress of closing. If you start a consumer when there are no running brokers, close needs to complete promptly. If you stop the brokers when a consumer has been running and then attempt a close, it also needs to complete promptly.

@@ -99,7 +99,7 @@ public void signalClose() {
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (closing || this.coordinator != null)
if (this.coordinator != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, left a comment.
Should we add an IT for this scenario: when closing the consumer, commitAsync, and verify that the consumer can shut down properly?

@github-actions github-actions bot removed the triage PRs from the community label Jun 7, 2025
Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @Mirai1129! As @DL1231 mentioned, because closing the AsyncKafkaConsumer is such a tricky area, we really do need to have some tests to assure us that this works and doesn't break something else.

@Mirai1129
Copy link
Contributor Author

Hi everyone,

Thanks for all the feedback and suggestions! And sorry for late reply. I've updated the PR with the timeout handling solution - when CommitRequestManager times out, it will now immediately clear remaining callbacks to prevent system deadlock.

I'm currently working on adding comprehensive integration tests to cover this scenario and will push them shortly.

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the coordinator might keep an unknown state and probably won't recover in time.
To prevent blocking the closing process, we should complete the pending commits request exceptionally (e.g., CommitFailedException) if the consumer is closing and the coordinator is unknown.
This could inform the user that the last commits have not been submitted correctly and also ensure a smooth closing process.
Thoughts?

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mirai1129: Thanks for the update.

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @Mirai1129 for this patch, left some comments

@Mirai1129 Mirai1129 requested a review from chia7712 July 14, 2025 09:24
Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming CI pass

Comment on lines 467 to 468
// Try without looking up the coordinator first
var callback = new CountConsumerCommitCallback();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this callback need this comment?

Copy link
Contributor Author

@Mirai1129 Mirai1129 Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually idk why, cuz I just duplicated testCommitAsyncCompletedBeforeConsumerCloses and modified it. But if it doesn't need let me remove it 🥹.

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Just a couple of questions on the test. Thanks!

var callback = new CountConsumerCommitCallback();

// Close the coordinator before committing because otherwise the commit will fail to find the coordinator.
cluster.brokerIds().forEach(cluster::shutdownBroker);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to have some sort of wiatFor() check after this to ensure the brokers are down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much! I'll add waitFor() ensure the brokers are down.


consumer.poll(Duration.ofMillis(500));
consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), callback);
consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for close() to take more than 500 milliseconds? Should we time the method call and add an assert to ensure it's not more than, say, 1000 milliseconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change to avoid the test being incorrect due to the shutdown taking too long then we add an assert to ensure that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants