-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19471: Enable acknowledgement for a record which could not be deserialized #20148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I've only taken a very cursory look so far and have one comment. I'll review more deeply soon.
return new KafkaShareConsumer<>(configs, null, null); | ||
} | ||
|
||
default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { | ||
Map<String, Object> props = new HashMap<>(configs); | ||
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're providing instances of the deserializers, I don't think you should be setting the class names in the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
Lines 743 to 744 in 2346c0e
if (keyDeserializer != null) | |
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); |
Thanks for your review. During initialization, if the keyDeserializer is set, it will override the class name in the config.
Of course, the explicit check here makes the logic clearer. I'll fix it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I think it's along the right lines, but the validation of the parameters is too loose as it stands. I suggest introducing some kind of exceptional batch that contains a single record, and then validating the parameters against this.
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) { | ||
TopicIdPartition tip = tipBatch.getKey(); | ||
if (tip.topic().equals(topic) && (tip.partition() == partition)) { | ||
tipBatch.getValue().addAcknowledgement(offset, type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checking here is not tight enough. If the user calls acknowledge(ConsumerRecord, AcknowledgeType)
, the code makes sure that the in-flight records include the offset (see ShareInFlightBatch.acknowledge(ConsumerRecord, AcknowledgeType)
). However, in this case, there is no validation of the offset. The handling of an exceptional batch needs to be a bit more sophisticated I think. It should not be possible to use ShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType)
with an exceptional batch. It should only be possible to use ShareConsumer.acknowlege(String, int, long, AcknowledgeType)
and only for the specific parameters that were retrieved from the RecordDeserializationException
.
This patch mainly includes two improvements:
pollForFetches()
throws an exception.KafkaShareConsumer.acknowledge(String topic, int partition, long offset, AcknowledgeType type)
.