-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC #20049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
*/ | ||
public KafkaFuture<Void> all() { | ||
return this.future.thenApply(topicPartitionErrorsMap -> { | ||
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() | ||
.stream() | ||
.filter(e -> e.getValue() != Errors.NONE) | ||
.filter(e -> e.getValue() != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, just out of curiosity, would the AlterConsumerGroupOffsets RPC have the same issue?
kafka/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
Lines 68 to 79 in 05b6e81
return this.future.thenApply(topicPartitionErrorsMap -> { | |
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() | |
.stream() | |
.filter(e -> e.getValue() != Errors.NONE) | |
.map(Map.Entry::getKey) | |
.collect(Collectors.toList()); | |
for (Errors error : topicPartitionErrorsMap.values()) { | |
if (error != Errors.NONE) { | |
throw error.exception( | |
"Failed altering group offsets for the following partitions: " + partitionsFailed); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check it out before I merge, but the important difference here is in KafkaApis.scala
. For DeleteSGO, it already handled a non-zero error code. For AlterSGO, that code was missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry. I didn't read your comment accurately. The difference is that AlterShareGroupOffsets can successfully pass back an error code, which is why this is in terms of ApiException
rather than Errors
. For AlterConsumerGroupOffsets
, the RPC is actually OffsetCommit
and this does not have an ErrorMessage
field at all. So, it cannot be fixed for consumer groups until we have a version bump on the OffsetCommit
RPC.
*/ | ||
public KafkaFuture<Void> all() { | ||
return this.future.thenApply(topicPartitionErrorsMap -> { | ||
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() | ||
.stream() | ||
.filter(e -> e.getValue() != Errors.NONE) | ||
.filter(e -> e.getValue() != null) | ||
.map(Map.Entry::getKey) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this change to immutable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it matters here. The list in internal to this method, and it is just converted to a string and appended to the exception message.
@AndrewJSchofield The last build failed with |
Interesting. I fixed a mistake and I expect the test needs fixing too. Will do. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, LGTM!
…pache#20049) While testing the code in apache#19820, it became clear that the error handling problems were due to the underlying Admin API. This PR fixes the error handling for top-level errors in the AlterShareGroupOffsets RPC. Reviewers: Apoorv Mittal <[email protected]>, Lan Ding <[email protected]>, TaiJuWu <[email protected]>
} else { | ||
response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { | ||
if (partition.errorCode() != Errors.NONE.code()) { | ||
final Errors partitionError = Errors.forCode(partition.errorCode()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can reuse the Errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the debug message should use placeholder for partitionErrorMessage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for example:
var partitionError = Errors.forCode(partition.errorCode());
var partitionErrorMessage = partition.errorMessage();
if (partitionError != Errors.NONE) {
log.debug("AlterShareGroupOffsets request for group id {} and topic-partition {}-{} failed and returned error {}. {}",
groupId.idValue, topic.topicName(), partition.partitionIndex(), partitionError, partitionErrorMessage);
}
partitionResults.put(new TopicPartition(topic.topicName(), partition.partitionIndex()), partitionError.exception(partitionErrorMessage));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @chia7712 . I'll put in a minor PR today.
Minor tidying up in AlterShareGroupOffsetsHandler based on review comment #20049 (comment). Reviewers: Jimmy Wang <[email protected]>, Lan Ding <[email protected]>, TaiJuWu <[email protected]>, Ken Huang <[email protected]>, Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai <[email protected]>
While testing the code in #19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.
Reviewers: Apoorv Mittal [email protected], Lan Ding
[email protected], TaiJuWu [email protected]