Skip to content

Commit 0fb829b

Browse files
Martin Damartembilan
authored andcommitted
spring-projectsGH-112: Commit offsets even when fetch is paused
Fixes spring-projectsGH-112 (spring-projects#112) Increase timeout on unrelated failing tests * Polishing according PR comments
1 parent 06616f7 commit 0fb829b

File tree

3 files changed

+58
-7
lines changed

3 files changed

+58
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
* @author Gary Russell
7373
* @author Murali Reddy
7474
* @author Marius Bogoevici
75+
* @author Martin Dam
7576
*/
7677
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
7778

@@ -428,7 +429,7 @@ public void run() {
428429
}
429430
}
430431
this.unsent = checkPause(this.unsent);
431-
if (!this.paused && !this.autoCommit) {
432+
if (!this.autoCommit) {
432433
processCommits();
433434
}
434435
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class EnableKafkaIntegrationTests {
113113
public void testSimple() throws Exception {
114114
template.send("annotated1", 0, "foo");
115115
template.flush();
116-
assertThat(this.listener.latch1.await(20, TimeUnit.SECONDS)).isTrue();
116+
assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
117117

118118
template.send("annotated2", 0, 123, "foo");
119119
template.flush();
@@ -182,21 +182,21 @@ public void testAutoStartup() throws Exception {
182182
public void testInterface() throws Exception {
183183
template.send("annotated7", 0, "foo");
184184
template.flush();
185-
assertThat(this.ifaceListener.getLatch1().await(20, TimeUnit.SECONDS)).isTrue();
185+
assertThat(this.ifaceListener.getLatch1().await(60, TimeUnit.SECONDS)).isTrue();
186186
}
187187

188188
@Test
189189
public void testMulti() throws Exception {
190190
template.send("annotated8", 0, "foo");
191191
template.flush();
192-
assertThat(this.multiListener.latch1.await(20, TimeUnit.SECONDS)).isTrue();
192+
assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
193193
}
194194

195195
@Test
196196
public void testTx() throws Exception {
197197
template.send("annotated9", 0, "foo");
198198
template.flush();
199-
assertThat(this.ifaceListener.getLatch2().await(20, TimeUnit.SECONDS)).isTrue();
199+
assertThat(this.ifaceListener.getLatch2().await(60, TimeUnit.SECONDS)).isTrue();
200200
}
201201

202202
@Test

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
* Tests for the listener container.
6060
*
6161
* @author Gary Russell
62-
*
62+
* @author Martin Dam
6363
*/
6464
public class KafkaMessageListenerContainerTests {
6565

@@ -73,8 +73,10 @@ public class KafkaMessageListenerContainerTests {
7373

7474
private static String topic4 = "testTopic4";
7575

76+
private static String topic5 = "testTopic5";
77+
7678
@ClassRule
77-
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4);
79+
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5);
7880

7981
@Rule
8082
public TestName testName = new TestName();
@@ -192,6 +194,54 @@ public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ac
192194
logger.info("Stop " + this.testName.getMethodName() + ackMode);
193195
}
194196

197+
@Test
198+
public void testSlowConsumerCommitsAreProcessed() throws Exception {
199+
Map<String, Object> props = KafkaTestUtils.consumerProps("slow", "false", embeddedKafka);
200+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
201+
ContainerProperties containerProps = new ContainerProperties(topic5);
202+
containerProps.setAckCount(1);
203+
containerProps.setPauseAfter(100);
204+
containerProps.setAckMode(AckMode.MANUAL);
205+
KafkaMessageListenerContainer<Integer, String> container =
206+
new KafkaMessageListenerContainer<>(cf, containerProps);
207+
final CountDownLatch latch = new CountDownLatch(3);
208+
containerProps.setMessageListener((AcknowledgingMessageListener<Integer, String>) (message, ack) -> {
209+
logger.info("slow: " + message);
210+
try {
211+
Thread.sleep(1000);
212+
}
213+
catch (InterruptedException e) {
214+
Thread.currentThread().interrupt();
215+
}
216+
ack.acknowledge();
217+
latch.countDown();
218+
});
219+
container.setBeanName("testSlow");
220+
container.start();
221+
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
222+
Consumer<?, ?> consumer = spyOnConsumer(container);
223+
224+
225+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
226+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
227+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
228+
template.setDefaultTopic(topic5);
229+
template.sendDefault(0, "foo");
230+
template.sendDefault(2, "bar");
231+
template.flush();
232+
Thread.sleep(300);
233+
template.sendDefault(0, "fiz");
234+
template.sendDefault(2, "buz");
235+
template.flush();
236+
237+
// Verify that commitSync is called when paused
238+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
239+
verify(consumer, atLeastOnce()).pause(any(TopicPartition.class), any(TopicPartition.class));
240+
verify(consumer, atLeastOnce()).commitSync(any());
241+
verify(consumer, atLeastOnce()).resume(any(TopicPartition.class), any(TopicPartition.class));
242+
container.stop();
243+
}
244+
195245
@Test
196246
public void testSlowConsumerWithException() throws Exception {
197247
logger.info("Start " + this.testName.getMethodName());

0 commit comments

Comments
 (0)