Skip to content

KAFKA-19471: Enable acknowledgement for a record which could not be deserialized #20148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand All @@ -67,6 +69,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -843,6 +846,141 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
}
}

@ClusterTest
public void testExplicitOverrideAcknowledgeCorruptedMessage() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
null,
mockErrorDeserializer(3))) {

ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.send(record3);
producer.flush();

shareConsumer.subscribe(Set.of(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(2, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();

ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer.acknowledge(firstRecord);
shareConsumer.acknowledge(secondRecord);

RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(2, rde.offset());
shareConsumer.commitSync();

// The corrupted record was automatically released, so we can still obtain it.
rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(2, rde.offset());

// Reject this record
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
shareConsumer.commitSync();

records = shareConsumer.poll(Duration.ZERO);
assertEquals(0, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
}

@ClusterTest
public void testExplicitAcknowledgeOffsetThrowsNotException() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();

shareConsumer.subscribe(Set.of(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
assertEquals(0L, consumedRecord.offset());

assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT));

shareConsumer.acknowledge(consumedRecord);
verifyShareGroupStateTopicRecordsProduced();
}
}

@ClusterTest
public void testExplicitAcknowledgeOffsetThrowsParametersError() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
null,
mockErrorDeserializer(2))) {

ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record1);
producer.send(record2);
producer.flush();

shareConsumer.subscribe(Set.of(tp.topic()));

ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
assertEquals(1, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();

ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
shareConsumer.acknowledge(firstRecord);

final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
assertEquals(1, rde.offset());

assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT));
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT));

// Reject this record
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
shareConsumer.commitSync();

records = shareConsumer.poll(Duration.ZERO);
assertEquals(0, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
}

private ByteArrayDeserializer mockErrorDeserializer(int recordNumber) {
int recordIndex = recordNumber - 1;
return new ByteArrayDeserializer() {
int i = 0;

@Override
public byte[] deserialize(String topic, Headers headers, ByteBuffer data) {
if (i == recordIndex) {
throw new SerializationException();
} else {
i++;
return super.deserialize(topic, headers, data);
}
}
};
}

@ClusterTest
public void testImplicitAcknowledgeFailsExplicit() {
alterShareAutoOffsetReset("group1", "earliest");
Expand Down Expand Up @@ -2794,13 +2932,22 @@ private <K, V> ShareConsumer<K, V> createShareConsumer(String groupId) {
private <K, V> ShareConsumer<K, V> createShareConsumer(
String groupId,
Map<?, ?> additionalProperties
) {
return createShareConsumer(groupId, additionalProperties, null, null);
}

private <K, V> ShareConsumer<K, V> createShareConsumer(
String groupId,
Map<?, ?> additionalProperties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer
) {
Properties props = new Properties();
props.putAll(additionalProperties);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Map<String, Object> conf = new HashMap<>();
props.forEach((k, v) -> conf.put((String) k, v));
return cluster.shareConsumer(conf);
return cluster.shareConsumer(conf, keyDeserializer, valueDeserializer);
}

private void warmup() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,30 @@ public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
delegate.acknowledge(record, type);
}

/**
* Acknowledge delivery of a specific record by its topic, partition, and offset, indicating whether
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
* {@link #commitAsync()} or {@link #poll(Duration)} call.
* <p>
* This method provides an alternative to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} when
* the full record is unavailable. It is typically used for manual offset management scenarios.
* <p>
* This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
*
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record to acknowledge
* @param offset The offset of the record to acknowledge
* @param type The acknowledgement type which indicates whether it was processed successfully
*
* @throws IllegalStateException if the specified record is not pending acknowledgement,
* or the consumer is not configured for explicit acknowledgement
*/

@Override
public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
delegate.acknowledge(topic, partition, offset, type);
}

/**
* Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
* the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public synchronized void acknowledge(ConsumerRecord<K, V> record) {
public synchronized void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
}

@Override
public synchronized void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
}

@Override
public synchronized Map<TopicIdPartition, Optional<KafkaException>> commitSync() {
return new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public interface ShareConsumer<K, V> extends Closeable {
*/
void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type);

/**
* @see KafkaShareConsumer#acknowledge(String, int, long, AcknowledgeType)
*/
void acknowledge(String topic, int partition, long offset, AcknowledgeType type);

/**
* @see KafkaShareConsumer#commitSync()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ public void unsubscribe() {
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
Timer timer = time.timer(timeout);

Expand Down Expand Up @@ -601,6 +602,9 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
} while (timer.notExpired());

return ConsumerRecords.empty();
} catch (ShareFetchException e) {
currentFetch = (ShareFetch<K, V>) e.shareFetch();
throw e.origin();
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
release();
Expand Down Expand Up @@ -692,6 +696,19 @@ public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType
}
}

/**
* {@inheritDoc}
*/
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
acquireAndEnsureOpen();
try {
ensureExplicitAcknowledgement();
currentFetch.acknowledge(topic, partition, offset, type);
} finally {
release();
}
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;

import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -110,7 +112,7 @@ public boolean isEmpty() {
* Acknowledge a single record in the current batch.
*
* @param record The record to acknowledge
* @param type The acknowledge type which indicates whether it was processed successfully
* @param type The acknowledgment type which indicates whether it was processed successfully
*/
public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType type) {
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
Expand All @@ -123,6 +125,29 @@ public void acknowledge(final ConsumerRecord<K, V> record, AcknowledgeType type)
throw new IllegalStateException("The record cannot be acknowledged.");
}

/**
* Acknowledge a single record by its topic, partition and offset in the current batch.
*
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record
* @param offset The offset of the record
* @param type The acknowledgment type which indicates whether it was processed successfully
*/
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
TopicIdPartition tip = tipBatch.getKey();
KafkaException shareException = tipBatch.getValue().getException();
if (tip.topic().equals(topic) && (tip.partition() == partition) &&
shareException instanceof RecordDeserializationException &&
((RecordDeserializationException) shareException).offset() == offset) {

tipBatch.getValue().addAcknowledgement(offset, type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checking here is not tight enough. If the user calls acknowledge(ConsumerRecord, AcknowledgeType), the code makes sure that the in-flight records include the offset (see ShareInFlightBatch.acknowledge(ConsumerRecord, AcknowledgeType)). However, in this case, there is no validation of the offset. The handling of an exceptional batch needs to be a bit more sophisticated I think. It should not be possible to use ShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) with an exceptional batch. It should only be possible to use ShareConsumer.acknowlege(String, int, long, AcknowledgeType) and only for the specific parameters that were retrieved from the RecordDeserializationException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Appreciate the suggestion.

return;
}
}
throw new IllegalStateException("The record cannot be acknowledged.");
}

/**
* Acknowledge all records in the current batch. If any records in the batch already have
* been acknowledged, those acknowledgements are not overwritten.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public ShareFetch<K, V> collect(final ShareFetchBuffer fetchBuffer) {
fetch.add(tp, batch);

if (batch.getException() != null) {
throw batch.getException();
throw new ShareFetchException(batch.getException(), fetch);
} else if (batch.hasCachedException()) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.SerializationException;

public class ShareFetchException extends SerializationException {

private final KafkaException origin;

private final ShareFetch<?, ?> shareFetch;

public ShareFetchException(KafkaException exception, ShareFetch<?, ?> shareFetch) {
this.origin = exception;
this.shareFetch = shareFetch;
}

public KafkaException origin() {
return origin;
}

public ShareFetch<?, ?> shareFetch() {
return shareFetch;
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3324,7 +3324,7 @@ class KafkaApis(val requestChannel: RequestChannel,

val interestedTopicPartitions = new util.ArrayList[TopicIdPartition]

erroneousAndValidPartitionData.validTopicIdPartitions.forEach { case topicIdPartition =>
erroneousAndValidPartitionData.validTopicIdPartitions.forEach { topicIdPartition =>
if (!authorizedTopics.contains(topicIdPartition.topicPartition.topic))
erroneous += topicIdPartition -> ShareFetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
Expand Down
Loading