Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2bc2af6

Browse files
pgwhalenrhauch
authored andcommittedAug 13, 2019
KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (apache#6283)
When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates. Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix. Author: Paul Whalen <pgwhalen@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
1 parent e20bc24 commit 2bc2af6

File tree

7 files changed

+112
-12
lines changed

7 files changed

+112
-12
lines changed
 

‎clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
5858
private final Set<TopicPartition> paused;
5959

6060
private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
61-
private KafkaException exception;
61+
private KafkaException pollException;
62+
private KafkaException offsetsException;
6263
private AtomicBoolean wakeup;
6364
private boolean closed;
6465

@@ -71,7 +72,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
7172
this.beginningOffsets = new HashMap<>();
7273
this.endOffsets = new HashMap<>();
7374
this.pollTasks = new LinkedList<>();
74-
this.exception = null;
75+
this.pollException = null;
7576
this.wakeup = new AtomicBoolean(false);
7677
this.committed = new HashMap<>();
7778
}
@@ -170,9 +171,9 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
170171
throw new WakeupException();
171172
}
172173

173-
if (exception != null) {
174-
RuntimeException exception = this.exception;
175-
this.exception = null;
174+
if (pollException != null) {
175+
RuntimeException exception = this.pollException;
176+
this.pollException = null;
176177
throw exception;
177178
}
178179

@@ -213,8 +214,20 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
213214
recs.add(record);
214215
}
215216

217+
/**
218+
* @deprecated Use {@link #setPollException(KafkaException)} instead
219+
*/
220+
@Deprecated
216221
public synchronized void setException(KafkaException exception) {
217-
this.exception = exception;
222+
setPollException(exception);
223+
}
224+
225+
public synchronized void setPollException(KafkaException exception) {
226+
this.pollException = exception;
227+
}
228+
229+
public synchronized void setOffsetsException(KafkaException exception) {
230+
this.offsetsException = exception;
218231
}
219232

220233
@Override
@@ -382,6 +395,11 @@ public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<
382395

383396
@Override
384397
public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
398+
if (offsetsException != null) {
399+
RuntimeException exception = this.offsetsException;
400+
this.offsetsException = null;
401+
throw exception;
402+
}
385403
Map<TopicPartition, Long> result = new HashMap<>();
386404
for (TopicPartition tp : partitions) {
387405
Long beginningOffset = beginningOffsets.get(tp);
@@ -394,6 +412,11 @@ public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicP
394412

395413
@Override
396414
public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
415+
if (offsetsException != null) {
416+
RuntimeException exception = this.offsetsException;
417+
this.offsetsException = null;
418+
throw exception;
419+
}
397420
Map<TopicPartition, Long> result = new HashMap<>();
398421
for (TopicPartition tp : partitions) {
399422
Long endOffset = getEndOffset(endOffsets.get(tp));

‎connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.KafkaException;
2929
import org.apache.kafka.common.PartitionInfo;
3030
import org.apache.kafka.common.TopicPartition;
31+
import org.apache.kafka.common.errors.TimeoutException;
3132
import org.apache.kafka.common.errors.WakeupException;
3233
import org.apache.kafka.common.utils.Time;
3334
import org.apache.kafka.common.utils.Utils;
@@ -312,6 +313,10 @@ public void run() {
312313
try {
313314
readToLogEnd();
314315
log.trace("Finished read to end log for topic {}", topic);
316+
} catch (TimeoutException e) {
317+
log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
318+
"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
319+
continue;
315320
} catch (WakeupException e) {
316321
// Either received another get() call and need to retry reading to end of log or stop() was
317322
// called. Both are handled by restarting this loop.

‎connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.kafka.common.PartitionInfo;
3131
import org.apache.kafka.common.TopicPartition;
3232
import org.apache.kafka.common.errors.LeaderNotAvailableException;
33+
import org.apache.kafka.common.errors.TimeoutException;
3334
import org.apache.kafka.common.errors.WakeupException;
3435
import org.apache.kafka.common.protocol.Errors;
3536
import org.apache.kafka.common.record.TimestampType;
@@ -369,7 +370,7 @@ public void run() {
369370
}
370371

371372
@Test
372-
public void testConsumerError() throws Exception {
373+
public void testPollConsumerError() throws Exception {
373374
expectStart();
374375
expectStop();
375376

@@ -387,7 +388,7 @@ public void run() {
387388
consumer.schedulePollTask(new Runnable() {
388389
@Override
389390
public void run() {
390-
consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
391+
consumer.setPollException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
391392
}
392393
});
393394

@@ -422,6 +423,77 @@ public void run() {
422423
PowerMock.verifyAll();
423424
}
424425

426+
@Test
427+
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
428+
expectStart();
429+
430+
// Producer flushes when read to log end is called
431+
producer.flush();
432+
PowerMock.expectLastCall();
433+
434+
expectStop();
435+
436+
PowerMock.replayAll();
437+
final CountDownLatch finishedLatch = new CountDownLatch(1);
438+
Map<TopicPartition, Long> endOffsets = new HashMap<>();
439+
endOffsets.put(TP0, 0L);
440+
endOffsets.put(TP1, 0L);
441+
consumer.updateEndOffsets(endOffsets);
442+
store.start();
443+
final AtomicBoolean getInvoked = new AtomicBoolean(false);
444+
final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
445+
@Override
446+
public void onCompletion(Throwable error, Void result) {
447+
getInvoked.set(true);
448+
}
449+
});
450+
consumer.schedulePollTask(new Runnable() {
451+
@Override
452+
public void run() {
453+
// Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events
454+
// that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without
455+
// returning any data.
456+
Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
457+
newEndOffsets.put(TP0, 1L);
458+
newEndOffsets.put(TP1, 1L);
459+
consumer.updateEndOffsets(newEndOffsets);
460+
// Set exception to occur when getting offsets to read log to end. It'll be caught in the work thread,
461+
// which will retry and eventually get the correct offsets and read log to end.
462+
consumer.setOffsetsException(new TimeoutException("Failed to get offsets by times"));
463+
store.readToEnd(readEndFutureCallback);
464+
465+
// Should keep polling until it reaches current log end offset for all partitions
466+
consumer.scheduleNopPollTask();
467+
consumer.scheduleNopPollTask();
468+
consumer.schedulePollTask(new Runnable() {
469+
@Override
470+
public void run() {
471+
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
472+
consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
473+
}
474+
});
475+
476+
consumer.schedulePollTask(new Runnable() {
477+
@Override
478+
public void run() {
479+
finishedLatch.countDown();
480+
}
481+
});
482+
}
483+
});
484+
readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
485+
assertTrue(getInvoked.get());
486+
assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
487+
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
488+
assertEquals(1L, consumer.position(TP0));
489+
490+
store.stop();
491+
492+
assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
493+
assertTrue(consumer.closed());
494+
PowerMock.verifyAll();
495+
}
496+
425497
@Test
426498
public void testProducerError() throws Exception {
427499
expectStart();

‎streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public void shouldRestoreRecordsUpToHighwatermark() {
243243
@Test
244244
public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
245245
initializeConsumer(2, 1, t1);
246-
consumer.setException(new InvalidOffsetException("Try Again!") {
246+
consumer.setPollException(new InvalidOffsetException("Try Again!") {
247247
public Set<TopicPartition> partitions() {
248248
return Collections.singleton(t1);
249249
}

‎streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public boolean conditionMet() {
242242
}
243243
}, 10 * 1000, "Input record never consumed");
244244

245-
mockConsumer.setException(new InvalidOffsetException("Try Again!") {
245+
mockConsumer.setPollException(new InvalidOffsetException("Try Again!") {
246246
@Override
247247
public Set<TopicPartition> partitions() {
248248
return Collections.singleton(topicPartition);

‎streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
134134
public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
135135
final int messages = 10;
136136
setupConsumer(messages, topicPartition);
137-
consumer.setException(new InvalidOffsetException("Try Again!") {
137+
consumer.setPollException(new InvalidOffsetException("Try Again!") {
138138
@Override
139139
public Set<TopicPartition> partitions() {
140140
return Collections.singleton(topicPartition);

‎streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1407,7 +1407,7 @@ public boolean conditionMet() {
14071407
}
14081408
}, "Never restore first record");
14091409

1410-
mockRestoreConsumer.setException(new InvalidOffsetException("Try Again!") {
1410+
mockRestoreConsumer.setPollException(new InvalidOffsetException("Try Again!") {
14111411
@Override
14121412
public Set<TopicPartition> partitions() {
14131413
return changelogPartitionSet;

0 commit comments

Comments
 (0)
Failed to load comments.