Skip to content

Commit b743f03

Browse files
garyrussellartembilan
authored andcommitted
spring-projectsGH-2352: Expose KLERegistrar Getter on BPP
Resolves spring-projects#2352 **cherry-pick to 2.9.x**
1 parent 6003346 commit b743f03

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,96 @@ public class Config extends RetryTopicConfigurationSupport {
227227
The parameters to the function are the consumer record and the name of the next topic.
228228
You can return a specific partition number, or `null` to indicate that the `KafkaProducer` should determine the partition.
229229

230+
==== Programmatic Construction
231+
232+
The feature is designed to be used with `@KafkaListener`; however, several users have requested information on how to configure non-blocking retries programmatically.
233+
The following Spring Boot application provides an example of how to do so.
234+
235+
====
236+
[source, java]
237+
----
238+
@SpringBootApplication
239+
public class Application extends RetryTopicConfigurationSupport {
240+
241+
public static void main(String[] args) {
242+
SpringApplication.run(2Application.class, args);
243+
}
244+
245+
@Bean
246+
RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
247+
return RetryTopicConfigurationBuilder.newInstance()
248+
.maxAttempts(4)
249+
.autoCreateTopicsWith(2, (short) 1)
250+
.create(template);
251+
}
252+
253+
@Bean
254+
TaskScheduler scheduler() {
255+
return new ThreadPoolTaskScheduler();
256+
}
257+
258+
@Bean
259+
@Order(0)
260+
SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
261+
KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
262+
Listener listener, KafkaListenerEndpointRegistry registry) {
263+
264+
return () -> {
265+
KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
266+
MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
267+
EndpointProcessor endpointProcessor = endpoint -> {
268+
// customize as needed (e.g. apply attributes to retry endpoints).
269+
if (!endpoint.equals(mainEndpoint)) {
270+
endpoint.setConcurrency(1);
271+
}
272+
// these are required
273+
endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
274+
endpoint.setTopics("topic");
275+
endpoint.setId("id");
276+
endpoint.setGroupId("group");
277+
};
278+
mainEndpoint.setBean(listener);
279+
try {
280+
mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
281+
}
282+
catch (NoSuchMethodException | SecurityException ex) {
283+
throw new IllegalStateException(ex);
284+
}
285+
mainEndpoint.setConcurrency(2);
286+
mainEndpoint.setTopics("topic");
287+
mainEndpoint.setId("id");
288+
mainEndpoint.setGroupId("group");
289+
configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
290+
"kafkaListenerContainerFactory");
291+
};
292+
}
293+
294+
295+
@Bean
296+
ApplicationRunner runner(KafkaTemplate<String, String> template) {
297+
return args -> {
298+
template.send("topic", "test");
299+
};
300+
}
301+
302+
}
303+
304+
@Component
305+
class Listener implements MessageListener<String, String> {
306+
307+
@Override
308+
public void onMessage(ConsumerRecord<String, String> record) {
309+
System.out.println(KafkaUtils.format(record));
310+
throw new RuntimeException("test");
311+
}
312+
313+
}
314+
----
315+
====
316+
317+
IMPORTANT: Auto creation of topics will only occur if the configuration is processed before the application context is refereshed, as in the above example.
318+
To configure containers at runtime, the topics will need to be created using some other technique.
319+
230320
==== Features
231321

232322
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
240240
return this.messageHandlerMethodFactory;
241241
}
242242

243+
/**
244+
* Return the {@link KafkaListenerEndpointRegistrar}.
245+
* @return the registrar.
246+
* @since 2.9.3
247+
*/
248+
public KafkaListenerEndpointRegistrar getEndpointRegistrar() {
249+
return this.registrar;
250+
}
251+
243252
@Override
244253
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
245254
this.applicationContext = applicationContext;

0 commit comments

Comments
 (0)