Skip to content

KAFKA-19842: Fix flaky KafkaStreamsTelemetryIntegrationTest #20147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jul 10, 2025

The new "streams" protocol behaves slightly different to the "classic"
protocol, and thus we need to update the test to avoid race conditions.
In particular, the first call to poll() won't "block" and return after
task assignment completed if we need to create internal topics, but
returns early without a task assignment, and only a consecutive
rebalance will assign tasks.

This implies, that KafkaStreams transits to RUNNING state even if the
group is still in NOT_READY state broker side, but this NOT_READY state
is not reflected in the client side state machine.

Disabling the combination of "complex-topology + streams" for now,
until this difference in behavior of the client state machine is fixed.

Reviewers: Lucas Brutschy [email protected]

@mjsax mjsax added streams tests Test fixes (including flaky tests) labels Jul 10, 2025
@github-actions github-actions bot added the small Small PRs label Jul 10, 2025


final AtomicInteger runningStateCount = new AtomicInteger(0);
final int expectedRunningStateCount = DEFAULT_GROUP_PROTOCOL.equals(groupProtocol) || "simple".equals(topologyType) ? 1 : 2;
Copy link
Member Author

@mjsax mjsax Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to my, why "simple" topologies show different behavior, compare to "complex" ones.

When I run the code for "simple" I get the follow log from the consumer:

	Member:                        dL_1PoEdQ4C9BhYUVUzFUQ
	Assigned active tasks:         [0_0]
	Owned active tasks:            []
	Active tasks to revoke:        []
	Assigned standby tasks:        []
	Owned standby tasks:           []
	Assigned warm-up tasks:        []
	Owned warm-up tasks:           []
 (org.apache.kafka.clients.consumer.internals.StreamsMembershipManager:1029)

As you can see, the task is assigned and we only get a single rebalance.

However, when the test executed for "complex" topology, the consumer first logs:

[2025-07-10 14:29:23,384] INFO [Consumer clientId=shouldPassMetricsx1__GHUPQvS0AkXE48xleQ-3f598d1a-1f8a-4923-9332-0d947bb38ab2-StreamThread-1-consumer, groupId=shouldPassMetricsx1__GHUPQvS0AkXE48xleQ] Assigned tasks with local epoch 0
   Member:                        mcJBdaR5TF2QjDoMvLXKBw
   Assigned active tasks:         []
   Owned active tasks:            []
   Active tasks to revoke:        []
   Assigned standby tasks:        []
   Owned standby tasks:           []
   Assigned warm-up tasks:        []
   Owned warm-up tasks:           []
(org.apache.kafka.clients.consumer.internals.StreamsMembershipManager:1029)

and there is no task assignment yet. Only a second rebalance will do the assignment:

[2025-07-10 14:29:28,393] INFO [Consumer clientId=shouldPassMetricsx1__GHUPQvS0AkXE48xleQ-3f598d1a-1f8a-4923-9332-0d947bb38ab2-StreamThread-1-consumer, groupId=shouldPassMetricsx1__GHUPQvS0AkXE48xleQ] Assigned tasks with local epoch 1
   Member:                        mcJBdaR5TF2QjDoMvLXKBw
   Assigned active tasks:         [0_0, 0_1, 1_0, 1_1]
   Owned active tasks:            []
   Active tasks to revoke:        []
   Assigned standby tasks:        []
   Owned standby tasks:           []
   Assigned warm-up tasks:        []
   Owned warm-up tasks:           []
(org.apache.kafka.clients.consumer.internals.StreamsMembershipManager:1029)

Why do we get the assignment for the simple case in the first rebalance already, but not for the second case? Is it because the complex topology needs to create a repartition topic, and thus, does not even try to compute an assignment on the first HB?

I am worried that there might be some race condition that is not covered by this test fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's exactly about creating the internal topic

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

mjsax added 3 commits July 11, 2025 12:01
The new "streams" protocol behaves slightly different to the "classic" protocol, and thus we need to update the test to avoid race conditions.
In particular, the first call to `poll()` won't "block" and return after task assignment completed, but return early without a task assignment, and only a consective rebalance will assign tasks.
@mjsax mjsax force-pushed the kafka-19482-KafkaStreamsTelemetryIntegrationTest branch from 94b90ab to 492612a Compare July 11, 2025 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
small Small PRs streams tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants