Skip to content

KAFKA-18105 Fix flaky PlaintextAdminIntegrationTest#testElectPreferredLeaders #20068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from

Conversation

jim0987795064
Copy link
Contributor

@jim0987795064 jim0987795064 commented Jun 30, 2025

Changes

This PR improves the stability of the
PlaintextAdminIntegrationTest.testElectPreferredLeaders test by
introducing short Thread.sleep( ) delays before invoking:

  • changePreferredLeader( )

  • waitForBrokersOutOfIsr( )

Reasons

  • Metadata propagation for partition2 :
    Kafka requires time to propagate the updated leader metadata across all
    brokers. Without waiting, metadataCache may return outdated leader
    information for partition2.

  • Eviction of broker1 from the ISR :
    To simulate a scenario where broker1 is no longer eligible as leader,
    the test relies on broker1 being removed from the ISR (e.g., due to
    intentional shutdown). This eviction is not instantaneous and requires a
    brief delay before Kafka reflects the change.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) small Small PRs labels Jun 30, 2025
@@ -2909,6 +2909,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 0)

// Now change the preferred leader to 1
Thread.sleep(1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use "wait for condition" instead of time-based waiting?

@github-actions github-actions bot removed the triage PRs from the community label Jul 1, 2025
() => {
prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id()
prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id()
var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give this variable a meaningful name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @frankvicky,
I've already addressed this issue
Thanks for pointing it out!

@@ -1335,11 +1335,14 @@ object TestUtils extends Logging {
case Failure(e) => throw e
}

assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " +
assertTrue(isLeaderElected, s"Timed out waiting for leader to become expectedLeaderOpt. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

// but shut it down...
killBroker(1)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
TestUtils.waitUntilTrue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all we need is the waitForBrokersOutOfIsr, right?

// Check the leader hasn't moved
TestUtils.assertLeader(client, partition1, prior1)
TestUtils.assertLeader(client, partition2, prior2)
TestUtils.waitUntilTrue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the way to stabilize the test is to ensure "all" brokers metadata get up-to-date, right?

Comment on lines 2867 to 2874
def sleepMillis(durationMs: Long): Unit = {
val startTime = System.currentTimeMillis()
TestUtils.waitUntilTrue(
() => System.currentTimeMillis() - startTime >= durationMs,
s"Waited less than $durationMs ms",
durationMs
)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, why do we need this method instead of using TimeUnit.MILLISECONDS.sleep()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @m1a2st , Thank you for pointing out this!
I've changed the first parameter of waitUntilTrue to check if metadata has been propagated to each broker

@@ -1298,11 +1298,11 @@ object TestUtils extends Logging {
adminClient.incrementalAlterConfigs(configs)
}

def assertLeader(client: Admin, topicPartition: TopicPartition, expectedLeader: Int): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these changes in this file?

Copy link
Contributor Author

@jim0987795064 jim0987795064 Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @TaiJuWu, to use assertLeader as the first parameter of sleepMillisToPropagateMetadata, I think we need to change the return type of assertLeader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @TaiJuWu , I've fixed this issue

var prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id()

def sleepMillisToPropagateMetadata(durationMs: Long): Unit = {
TimeUnit.MILLISECONDS.sleep(100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the following TestUtils.waitUntilTrue, it already do sleep if the condition is not matched. Do we need TimeUnit.MILLISECONDS.sleep(100) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @FrankYang0529 , I think we can move TimeUnit.MILLISECONDS.sleep(100) here.
Thank you for pointing out!

Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch, leave two nit.

@@ -1517,4 +1517,4 @@ object TestUtils extends Logging {
timedOut.set(true)
}
}
}
}
Copy link
Collaborator

@TaiJuWu TaiJuWu Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

@@ -4295,4 +4320,4 @@ object PlaintextAdminIntegrationTest {

assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @TaiJuWu , I've addressed this issue.

@github-actions github-actions bot removed the small Small PRs label Jul 9, 2025
@github-actions github-actions bot added the small Small PRs label Jul 9, 2025
@github-actions github-actions bot removed the small Small PRs label Jul 9, 2025
@jim0987795064 jim0987795064 force-pushed the KAFKA-18105 branch 2 times, most recently from 8eead47 to 30e2350 Compare July 9, 2025 22:30
@github-actions github-actions bot added the small Small PRs label Jul 9, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, left some comments

Comment on lines +2855 to +2856
TimeUnit.MILLISECONDS.sleep(1000)
val prior = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).get.id()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explaining why we need to call sleep here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure that the first broker’s metadata is updated after 1000ms?

leaderId.contains(prior)
}
allSynced
}, s"Waited less than $durationMs ms", durationMs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the error message to indicate which condition was not met?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I've added the comment and rewrite the error message in sleepMillisToPropagateMetadata.

@@ -2850,6 +2851,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}

def sleepMillisToPropagateMetadata(durationMs: Long, partition: TopicPartition): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since durationMs is always set to 10000, we might consider removing this parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion.
I think it might be useful to keep durationMs as a parameter in case future changes require tuning the wait time.
This gives a bit more flexibility for potential modifications.

@@ -2850,6 +2851,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}

def sleepMillisToPropagateMetadata(durationMs: Long, partition: TopicPartition): Unit = {
Copy link
Collaborator

@TaiJuWu TaiJuWu Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this function ensures that the leader update has propagated to all brokers’ metadata caches.
Is there a reason why this check isn't integrated directly into changePreferredLeader?
Please correct me if I'm wrong or miss something .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @TaiJuWu ,I extracted this logic into a separate method so that it's easier to reuse in other places if needed. Keeping it outside makes the code more modular and improves readability.
Please let me know if you have any other questions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is no any real conditions to wait metadata propagation (you just wait a constant time), I think we can get real condition to wait.

For example, we can wait all brokers' partition leader is same.
We can do that by merging this function into changePreferredLeader .

@@ -2963,9 +2981,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 2)

// Now change the preferred leader to 1
sleepMillisToPropagateMetadata(10000, partition2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it could be replaced by TestUtils.waitForPartitionMetadata(brokers, partition2.topic(), partition2.partition())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @chia7712 , after reviewing the waitForPartitionMetadata code, it seems that this API is used to check whether the partition leader broker is valid — that is, not -1 or unknown. It doesn't guarantee that all leader brokers are the same. When I run the code, the test still fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker small Small PRs tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants