-
Notifications
You must be signed in to change notification settings - Fork 1.6k
@RetryableTopic annotation feature for dynamic container #2352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Currently, the non-blocking retries mechanism is built around We hope to make it easier for such use cases in future, perhaps in 3.0. |
So in the current state you think it is possible with dedicated configuration ? |
I am pretty sure it is possible, but I don't have the time to document the steps; hence I have put this on the backlog for consideration in 3.0. |
While the annotation way is nice and easy, ability to use the underlying classes to setup consumer (and/or producer) is a bit of geeky fun and provides great flexibility for custom stuff, which I am currently doing at work. Real bummer that retry topic configuration cannot be wired to the custom container factory setup. Great to hear that it is planned for 3.0. Sad thing is that major version upgrade just for this feature sounds a little daunting. |
Ok; I figured out what is needed to programmatically configure retry topics, without using IMPORTANT: Auto creation of topics will only work if the configuration is performed before the application is refreshed (as is the case in this example). We need to make a small change to the This uses Spring for Apache Kafka 2.9.2. Hopefully, it's self-explanatory. @SpringBootApplication
public class Kgh2352Application extends RetryTopicConfigurationSupport {
public static void main(String[] args) {
SpringApplication.run(Kgh2352Application.class, args);
}
@Bean
RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(4)
.autoCreateTopicsWith(2, (short) 1)
.doNotRetryOnDltFailure()
.create(template);
}
@Bean
TaskScheduler scheduler() {
return new ThreadPoolTaskScheduler();
}
@Bean
@Order(0)
SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
Listener listener, KafkaListenerEndpointRegistry registry) {
return () -> {
// We need to add a getter to the bpp for the registrar.
KafkaListenerEndpointRegistrar registrar = (KafkaListenerEndpointRegistrar) new DirectFieldAccessor(bpp)
.getPropertyValue("registrar");
MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
EndpointProcessor endpointProcessor = endpoint -> {
// customize as needed (e.g. apply attributes to retry endpoints).
if (!endpoint.equals(mainEndpoint)) {
endpoint.setConcurrency(1);
}
endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
endpoint.setTopics("kgh2352");
endpoint.setId("kgh2352");
endpoint.setGroupId("kgh2352");
};
mainEndpoint.setBean(listener);
try {
mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
}
catch (NoSuchMethodException | SecurityException ex) {
throw new IllegalStateException(ex);
}
mainEndpoint.setConcurrency(2);
mainEndpoint.setTopics("kgh2352");
mainEndpoint.setId("kgh2352");
mainEndpoint.setGroupId("kgh2352");
configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
"kafkaListenerContainerFactory");
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("kgh2352", "test");
};
}
}
@Component
class Listener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(KafkaUtils.format(record));
throw new RuntimeException("test");
}
} Result:
|
Resolves spring-projects#2352 **cherry-pick to 2.9.x**
Resolves #2352 **cherry-pick to 2.9.x**
Resolves #2352 **cherry-pick to 2.9.x**
I would like to use the functionalities offered by the new @RetryableTopic annotation but I do not have any @KafkaListener annotation.
I use the method createContainer(final String... topics) from ConcurrentKafkaListenerContainerFactory<String, Serializable> and setup a listener using setupMessageListener from the obtained container.
Thus I would like to have the same functionalities in the case of a dynamic container.
The text was updated successfully, but these errors were encountered: