-
Notifications
You must be signed in to change notification settings - Fork 14.4k
MINOR: introduce structure to keep member assignment with topic Ids #19645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm: Thanks for the enhancement!
Love this new structure 😸
} | ||
|
||
/** | ||
* Add a new topic (id+name) and partition. This will keep it, and also save references to the topic ID, topic name and partition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Add a new topic (id+name) and partition. This will keep it, and also save references to the topic ID, topic name and partition, | |
* Add a new topic (id+name) and partition. This will keep it, and also save references to the topic ID, topic name and partition. |
this.localEpoch = localEpoch; | ||
this.partitions = new HashMap<>(); | ||
if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { | ||
throw new IllegalArgumentException("Local epoch must be set if there are partitions"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider checking the value before assigning it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
@@ -376,8 +370,8 @@ protected void processAssignmentReceived(Map<Uuid, SortedSet<Integer>> assignmen | |||
*/ | |||
private void replaceTargetAssignmentWithNewAssignment(Map<Uuid, SortedSet<Integer>> assignment) { | |||
currentTargetAssignment.updateWith(assignment).ifPresent(updatedAssignment -> { | |||
log.debug("Target assignment updated from {} to {}. Member will reconcile it on the next poll.", | |||
currentTargetAssignment, updatedAssignment); | |||
log.debug("Member {} updated it's target assignment from {} to {}. Member will reconcile it on the next poll.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "its"
"be reconciled.", memberId, memberEpoch, MemberState.RECONCILING); | ||
"to ack a previous reconciliation. \n" + | ||
"\t\tCurrent assignment: {} \n" + | ||
"\t\tNew assignment to reconcile: {}\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why this is not "Target assignment"? I wonder if there's a subtle difference that I've failed to grasp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no reason, it's the Target assignment indeed. Fixed.
Thanks for the feedback @frankvicky / @AndrewJSchofield! All comments addressed. |
and partitions), to easily access the data as needed. This will be used
in following PR to integrate assignment with topic IDs into the
subscription state.
No changes in logic.
Reviewers: TengYao Chi [email protected], Andrew Schofield
[email protected]