Skip to content

KAFKA-19471: Enable acknowledgement for a record which could not be deserialized #20148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

DL1231
Copy link
Contributor

@DL1231 DL1231 commented Jul 11, 2025

This patch mainly includes two improvements:

  1. Update currentFetch when pollForFetches() throws an exception.
  2. Add an override KafkaShareConsumer.acknowledge(String topic, int partition, long offset, AcknowledgeType type) .

@github-actions github-actions bot added triage PRs from the community core Kafka Broker consumer clients labels Jul 11, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Jul 11, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I've only taken a very cursory look so far and have one comment. I'll review more deeply soon.

return new KafkaShareConsumer<>(configs, null, null);
}

default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
Map<String, Object> props = new HashMap<>(configs);
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're providing instances of the deserializers, I don't think you should be setting the class names in the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (keyDeserializer != null)
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());

Thanks for your review. During initialization, if the keyDeserializer is set, it will override the class name in the config.
Of course, the explicit check here makes the logic clearer. I'll fix it later.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I think it's along the right lines, but the validation of the parameters is too loose as it stands. I suggest introducing some kind of exceptional batch that contains a single record, and then validating the parameters against this.

for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
TopicIdPartition tip = tipBatch.getKey();
if (tip.topic().equals(topic) && (tip.partition() == partition)) {
tipBatch.getValue().addAcknowledgement(offset, type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checking here is not tight enough. If the user calls acknowledge(ConsumerRecord, AcknowledgeType), the code makes sure that the in-flight records include the offset (see ShareInFlightBatch.acknowledge(ConsumerRecord, AcknowledgeType)). However, in this case, there is no validation of the offset. The handling of an exceptional batch needs to be a bit more sophisticated I think. It should not be possible to use ShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) with an exceptional batch. It should only be possible to use ShareConsumer.acknowlege(String, int, long, AcknowledgeType) and only for the specific parameters that were retrieved from the RecordDeserializationException.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants