-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19435: Optimize kafka-consumer-groups.sh
to return the offset info when some partitions without leaders
#20064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
kafka-consumer-groups.sh
to return the offset info of other partitions even when the leader of some partitions are missingkafka-consumer-groups.sh
to return the offset info when some partitions without leaders
kafka-consumer-groups.sh
to return the offset info when some partitions without leaderskafka-consumer-groups.sh
to return the offset info when some partitions without leaders
// shutdown the target broker | ||
int noneLeaderPartition = 2; | ||
int shutdownBrokerId = clusterInstance.getLeaderBrokerId(new TopicPartition(topic, noneLeaderPartition)); | ||
clusterInstance.shutdownBroker(shutdownBrokerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutting down the broker could cause some admin APIs to hang, so could you please add timeout to this test in order to avoid impacting CI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xijiu for this patch, left some comments, and some tests are fail, PTAL
} | ||
|
||
// append the command | ||
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics")); | |
List<String> cgcArgs = List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics"); |
|
||
private Set<TopicPartition> filterNoneLeaderPartitions(List<TopicPartition> topicPartitions) { | ||
// collect all topics | ||
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to convert it to a set here, since AdminClient.describeTopics doesn’t handle duplicate topic names anyway
kafka/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Line 2320 in a5a54dc
for (String topicName : topicNames) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we still need to map to TopicPartition::topic, the current approach is acceptable.
&& Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(partition0content) | ||
&& Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(partition1content) | ||
&& Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(partition2content); | ||
}, "Expected a data row and no error in describe groups when a broker shutdown."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the topic has 3 partitions, so there're 3 data rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Yeah, I have fixed it, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xijiu thanks for this patch. two comments are left. PTAL
@@ -592,8 +592,8 @@ private Collection<PartitionAssignmentState> collectConsumerAssignment( | |||
getLag(Optional.empty(), Optional.empty()), consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty()) | |||
); | |||
} else { | |||
List<TopicPartition> topicPartitionsSorted = topicPartitions.stream().sorted(Comparator.comparingInt(TopicPartition::partition)).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excuse me, why do we need those changes?
Set<TopicPartition> nonLeaderTopicPartitions = filterNoneLeaderPartitions(topicPartitions); | ||
|
||
// prepare data for partitions with leaders | ||
topicPartitions.removeAll(nonLeaderTopicPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to modify the input topicPartitions
if filterNoneLeaderPartitions
returns the partitions having leaders.
@chia7712 Thanks for CR, I have fixed them, PTAL |
.flatMap(entry -> entry.getValue().partitions().stream() | ||
.filter(partitionInfo -> partitionInfo.leader() == null) | ||
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition()))) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.toList();
@@ -619,7 +618,12 @@ private Collection<PartitionAssignmentState> describePartitions( | |||
consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt, leaderEpoch); | |||
}; | |||
|
|||
return offsetsUtils.getLogEndOffsets(topicPartitions).entrySet().stream().map(logEndOffsetResult -> { | |||
List<TopicPartition> topicPartitionsWithLeader = new ArrayList<>(topicPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<TopicPartition> topicPartitionsWithoutLeader = filterNoneLeaderPartitions(topicPartitions);
List<TopicPartition> topicPartitionsWithLeader = topicPartitions.stream().filter(tp -> !topicPartitionsWithoutLeader.contains(tp)).toList();
return Stream.concat(existLeaderAssignments.stream(), noneLeaderAssignments.stream()) | ||
.sorted(Comparator.<PartitionAssignmentState, String>comparing( | ||
state -> state.topic.orElse(""), String::compareTo) | ||
.thenComparingInt(state -> state.partition.orElse(-1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the sort policy should include topic name, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the original logical ordering is invalid, and I added it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add test for this case?
@chia7712 I have fixed them, PTAL ☺ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this patch, leave some comments.
@@ -592,8 +592,7 @@ private Collection<PartitionAssignmentState> collectConsumerAssignment( | |||
getLag(Optional.empty(), Optional.empty()), consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER
is not used at Line 85 in this file, should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's useless indeed, I have removed it, PTAL
// concat the data and then sort them | ||
return Stream.concat(existLeaderAssignments.stream(), noneLeaderAssignments.stream()) | ||
.sorted(Comparator.<PartitionAssignmentState, String>comparing( | ||
state -> state.topic.orElse(""), String::compareTo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity, topic is allowed to be Optional.empty
or empty string ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It maybe not allowed, it will receive a exception org.apache.kafka.common.errors.InvalidTopicException: The given topic name '' cannot be represented in a request
when set the empty to a topic name
// org.apache.kafka.clients.admin.KafkaAdminClient#topicNameIsUnrepresentable
private static boolean topicNameIsUnrepresentable(String topicName) {
return topicName == null || topicName.isEmpty();
}
ConsumerGroupCommand
byfirst checking if a leader exists for the partition before invoking the
admin.listOffsets
. Finally, concatenate the data and returna broker and observe whether the output meets the expectations