Skip to content

KAFKA-19457: Make share group init retry interval configurable. #20104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 9, 2025

Conversation

smjn
Copy link
Collaborator

@smjn smjn commented Jul 4, 2025

  • While creating share group init requests in
    GroupMetadataManager.shareGroupHeartbeat, we check for topics in
    initializing state and if they are a certain amount of time old, we
    issue retry requests for the same.
  • The interval for considering initializing topics as old was based of
    offsetsCommitTimeoutMs and was not configurable.
  • In this PR, we remedy the situation by introducing a new config to
    supply the value. The default is 30_000 which is a
    heuristic based on the fact that the share coordinator persister
    retries request with exponential backoff, with upper cap of 30_000
    seconds.
  • Tests have been updated wherever applicable.

Reviewers: Apoorv Mittal [email protected], Lan Ding
[email protected], TaiJuWu [email protected], Andrew Schofield
[email protected]

@smjn smjn requested a review from AndrewJSchofield July 4, 2025 09:23
@github-actions github-actions bot added triage PRs from the community group-coordinator small Small PRs labels Jul 4, 2025
@smjn smjn added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Jul 4, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM!

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just one comment to address.

@@ -484,6 +493,9 @@ public GroupCoordinatorConfig(AbstractConfig config) {
require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equal to %s",
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupInitializeRetryIntervalMs >= offsetCommitTimeoutMs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be changed a little. The problem is that you've added a new internal (and thus undocumented) configuration to allow configuration of something which should in principle not need tweaking. That's fine. However, the validation of the configs can now break if someone changes a documented configuration, and they will not know about the internal configuration. It would be better to ensure in the code that the shareGroupInitializeRetryIntervalMs is not smaller than the offsetCommitTimeoutMs just using assignment to an appropriate value.

@smjn
Copy link
Collaborator Author

smjn commented Jul 5, 2025

@AndrewJSchofield Thanks for the comments, incorporated.

@smjn smjn requested a review from AndrewJSchofield July 5, 2025 08:47
Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, LGTM!

Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch, leave a comment.

@@ -437,6 +445,8 @@ public GroupCoordinatorConfig(AbstractConfig config) {
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
this.shareGroupAssignors = shareGroupAssignors(config);
int initializeRetryMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG);
this.shareGroupInitializeRetryIntervalMs = Math.max(initializeRetryMs, this.offsetCommitTimeoutMs);
Copy link
Collaborator

@TaiJuWu TaiJuWu Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comparison add to SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC for clarification ?

@AndrewJSchofield
Copy link
Member

@smjn The code change looks good to me. Please could you triage the failed tests and make sure they are appropriately tracked. Thanks.

@smjn
Copy link
Collaborator Author

smjn commented Jul 8, 2025

Unrelated Flaky/failed tests:

FAILED ❌ LogRecoveryTest > testHWCheckpointWithFailuresMultipleLogSegments() - already tracked in https://issues.apache.org/jira/browse/KAFKA-19452

FLAKY ⚠️ AuthorizerIntegrationTest > testConsumerGroupHeartbeatWithRegex() - https://issues.apache.org/jira/browse/KAFKA-19481
FLAKY ⚠️ KafkaStreamsTelemetryIntegrationTest > "shouldPassMetrics(String, boolean, String).topologyType=complex, stateUpdaterEnabled=true, groupProtocol=streams" -
https://issues.apache.org/jira/browse/KAFKA-19482
FLAKY ⚠️ RemoteIndexCacheTest > testConcurrentCacheDeletedFileExists() - https://issues.apache.org/jira/browse/KAFKA-19483

cc: @AndrewJSchofield

@smjn smjn requested a review from TaiJuWu July 8, 2025 18:37
Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for this patch.

@AndrewJSchofield AndrewJSchofield merged commit 8aa5eae into apache:trunk Jul 9, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants