Skip to content

Commit 3242843

Browse files
cpettitt-confluentguozhangwang
authored andcommitted
KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (apache#7223)
Make offsets immutable to users of RecordCollector.offsets. Fix up an existing case where offsets could be modified in this way. Add a simple test to verify offsets cannot be changed externally. Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang <[email protected]>, Matthias J. Sax <[email protected]>
1 parent 6560203 commit 3242843

File tree

4 files changed

+33
-3
lines changed

4 files changed

+33
-3
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ <K, V> void send(final String topic,
6363
/**
6464
* The last acked offsets from the internal {@link Producer}.
6565
*
66-
* @return the map from TopicPartition to offset
66+
* @return an immutable map from TopicPartition to offset
6767
*/
6868
Map<TopicPartition, Long> offsets();
6969

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.streams.processor.internals;
1818

19+
import java.util.Collections;
1920
import org.apache.kafka.clients.producer.Callback;
2021
import org.apache.kafka.clients.producer.Producer;
2122
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -267,7 +268,7 @@ public void close() {
267268

268269
@Override
269270
public Map<TopicPartition, Long> offsets() {
270-
return offsets;
271+
return Collections.unmodifiableMap(offsets);
271272
}
272273

273274
// for testing only

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,8 @@ void commit(final boolean startNewTransaction) {
479479

480480
@Override
481481
protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
482-
final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
482+
final Map<TopicPartition, Long> checkpointableOffsets =
483+
new HashMap<>(recordCollector.offsets());
483484
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
484485
checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
485486
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@
5050
import java.util.Map;
5151
import java.util.concurrent.Future;
5252

53+
import static org.hamcrest.MatcherAssert.assertThat;
54+
import static org.hamcrest.Matchers.equalTo;
5355
import static org.junit.Assert.assertEquals;
56+
import static org.junit.Assert.assertThrows;
5457
import static org.junit.Assert.assertTrue;
5558
import static org.junit.Assert.fail;
5659

@@ -145,6 +148,31 @@ public void testStreamPartitioner() {
145148
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
146149
}
147150

151+
@Test
152+
public void shouldNotAllowOffsetsToBeUpdatedExternally() {
153+
final String topic = "topic1";
154+
final TopicPartition topicPartition = new TopicPartition(topic, 0);
155+
156+
final RecordCollectorImpl collector = new RecordCollectorImpl(
157+
"RecordCollectorTest-TestSpecificPartition",
158+
new LogContext("RecordCollectorTest-TestSpecificPartition "),
159+
new DefaultProductionExceptionHandler(),
160+
new Metrics().sensor("skipped-records")
161+
);
162+
collector.init(new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer));
163+
164+
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer);
165+
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer);
166+
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer);
167+
168+
final Map<TopicPartition, Long> offsets = collector.offsets();
169+
170+
assertThat(offsets.get(topicPartition), equalTo(2L));
171+
assertThrows(UnsupportedOperationException.class, () -> offsets.put(new TopicPartition(topic, 0), 50L));
172+
173+
assertThat(collector.offsets().get(topicPartition), equalTo(2L));
174+
}
175+
148176
@SuppressWarnings("unchecked")
149177
@Test(expected = StreamsException.class)
150178
public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {

0 commit comments

Comments
 (0)