-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-18105 Fix flaky PlaintextAdminIntegrationTest#testElectPreferredLeaders #20068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…r to improve test stability
@@ -2909,6 +2909,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
TestUtils.assertLeader(client, partition2, 0) | |||
|
|||
// Now change the preferred leader to 1 | |||
Thread.sleep(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please use "wait for condition" instead of time-based waiting?
() => { | ||
prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id() | ||
prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id() | ||
var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give this variable a meaningful name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @frankvicky,
I've already addressed this issue
Thanks for pointing it out!
@@ -1335,11 +1335,14 @@ object TestUtils extends Logging { | |||
case Failure(e) => throw e | |||
} | |||
|
|||
assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " + | |||
assertTrue(isLeaderElected, s"Timed out waiting for leader to become expectedLeaderOpt. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change?
// but shut it down... | ||
killBroker(1) | ||
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) | ||
TestUtils.waitUntilTrue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all we need is the waitForBrokersOutOfIsr
, right?
// Check the leader hasn't moved | ||
TestUtils.assertLeader(client, partition1, prior1) | ||
TestUtils.assertLeader(client, partition2, prior2) | ||
TestUtils.waitUntilTrue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the way to stabilize the test is to ensure "all" brokers metadata get up-to-date, right?
def sleepMillis(durationMs: Long): Unit = { | ||
val startTime = System.currentTimeMillis() | ||
TestUtils.waitUntilTrue( | ||
() => System.currentTimeMillis() - startTime >= durationMs, | ||
s"Waited less than $durationMs ms", | ||
durationMs | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity, why do we need this method instead of using TimeUnit.MILLISECONDS.sleep()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @m1a2st , Thank you for pointing out this!
I've changed the first parameter of waitUntilTrue to check if metadata has been propagated to each broker
@@ -1298,11 +1298,11 @@ object TestUtils extends Logging { | |||
adminClient.incrementalAlterConfigs(configs) | |||
} | |||
|
|||
def assertLeader(client: Admin, topicPartition: TopicPartition, expectedLeader: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these changes in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @TaiJuWu, to use assertLeader
as the first parameter of sleepMillisToPropagateMetadata
, I think we need to change the return type of assertLeader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @TaiJuWu , I've fixed this issue
…waitForLeaderToBecome to Unit
var prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id() | ||
|
||
def sleepMillisToPropagateMetadata(durationMs: Long): Unit = { | ||
TimeUnit.MILLISECONDS.sleep(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the following TestUtils.waitUntilTrue
, it already do sleep if the condition is not matched. Do we need TimeUnit.MILLISECONDS.sleep(100)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @FrankYang0529 , I think we can move TimeUnit.MILLISECONDS.sleep(100) here
.
Thank you for pointing out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this patch, leave two nit.
@@ -1517,4 +1517,4 @@ object TestUtils extends Logging { | |||
timedOut.set(true) | |||
} | |||
} | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change.
@@ -4295,4 +4320,4 @@ object PlaintextAdminIntegrationTest { | |||
|
|||
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value) | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @TaiJuWu , I've addressed this issue.
3537c84
to
335442c
Compare
ef22986
to
335442c
Compare
8eead47
to
30e2350
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, left some comments
TimeUnit.MILLISECONDS.sleep(1000) | ||
val prior = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).get.id() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment explaining why we need to call sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we ensure that the first broker’s metadata is updated after 1000ms?
leaderId.contains(prior) | ||
} | ||
allSynced | ||
}, s"Waited less than $durationMs ms", durationMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the error message to indicate which condition was not met?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, I've added the comment and rewrite the error message in sleepMillisToPropagateMetadata.
@@ -2850,6 +2851,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
} | |||
} | |||
|
|||
def sleepMillisToPropagateMetadata(durationMs: Long, partition: TopicPartition): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since durationMs
is always set to 10000
, we might consider removing this parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion.
I think it might be useful to keep durationMs as a parameter in case future changes require tuning the wait time.
This gives a bit more flexibility for potential modifications.
@@ -2850,6 +2851,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
} | |||
} | |||
|
|||
def sleepMillisToPropagateMetadata(durationMs: Long, partition: TopicPartition): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, this function ensures that the leader update has propagated to all brokers’ metadata caches.
Is there a reason why this check isn't integrated directly into changePreferredLeader?
Please correct me if I'm wrong or miss something .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @TaiJuWu ,I extracted this logic into a separate method so that it's easier to reuse in other places if needed. Keeping it outside makes the code more modular and improves readability.
Please let me know if you have any other questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there is no any real conditions to wait metadata propagation (you just wait a constant time), I think we can get real condition to wait.
For example, we can wait all brokers' partition leader is same.
We can do that by merging this function into changePreferredLeader
.
@@ -2963,9 +2981,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { | |||
TestUtils.assertLeader(client, partition2, 2) | |||
|
|||
// Now change the preferred leader to 1 | |||
sleepMillisToPropagateMetadata(10000, partition2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it could be replaced by TestUtils.waitForPartitionMetadata(brokers, partition2.topic(), partition2.partition())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @chia7712 , after reviewing the waitForPartitionMetadata code, it seems that this API is used to check whether the partition leader broker is valid — that is, not -1 or unknown. It doesn't guarantee that all leader brokers are the same. When I run the code, the test still fails.
Changes
This PR improves the stability of the
PlaintextAdminIntegrationTest.testElectPreferredLeaders test by
introducing short Thread.sleep( ) delays before invoking:
changePreferredLeader( )
waitForBrokersOutOfIsr( )
Reasons
Metadata propagation for partition2 :
Kafka requires time to propagate the updated leader metadata across all
brokers. Without waiting, metadataCache may return outdated leader
information for partition2.
Eviction of broker1 from the ISR :
To simulate a scenario where broker1 is no longer eligible as leader,
the test relies on broker1 being removed from the ISR (e.g., due to
intentional shutdown). This eviction is not instantaneous and requires a
brief delay before Kafka reflects the change.