Skip to content

@RetryableTopic annotation feature for dynamic container #2352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
loogies06 opened this issue Jul 13, 2022 · 5 comments · Fixed by #2447
Closed

@RetryableTopic annotation feature for dynamic container #2352

loogies06 opened this issue Jul 13, 2022 · 5 comments · Fixed by #2447

Comments

@loogies06
Copy link

I would like to use the functionalities offered by the new @RetryableTopic annotation but I do not have any @KafkaListener annotation.

I use the method createContainer(final String... topics) from ConcurrentKafkaListenerContainerFactory<String, Serializable> and setup a listener using setupMessageListener from the obtained container.

Thus I would like to have the same functionalities in the case of a dynamic container.

@garyrussell
Copy link
Contributor

Currently, the non-blocking retries mechanism is built around @KafkaListener; you would have to do a lot of manual wiring to set it up with manually created container(s).

We hope to make it easier for such use cases in future, perhaps in 3.0.

@garyrussell garyrussell added this to the 3.0 Backlog milestone Jul 13, 2022
@loogies06
Copy link
Author

So in the current state you think it is possible with dedicated configuration ?

@garyrussell
Copy link
Contributor

I am pretty sure it is possible, but I don't have the time to document the steps; hence I have put this on the backlog for consideration in 3.0.

@assayire
Copy link

assayire commented Aug 5, 2022

While the annotation way is nice and easy, ability to use the underlying classes to setup consumer (and/or producer) is a bit of geeky fun and provides great flexibility for custom stuff, which I am currently doing at work. Real bummer that retry topic configuration cannot be wired to the custom container factory setup.

Great to hear that it is planned for 3.0. Sad thing is that major version upgrade just for this feature sounds a little daunting.

@garyrussell
Copy link
Contributor

Ok; I figured out what is needed to programmatically configure retry topics, without using @KafkaListener.

IMPORTANT: Auto creation of topics will only work if the configuration is performed before the application is refreshed (as is the case in this example).

We need to make a small change to the BeanPostProcessor, to avoid the reflection in this example.

This uses Spring for Apache Kafka 2.9.2.

Hopefully, it's self-explanatory.

@SpringBootApplication
public class Kgh2352Application extends RetryTopicConfigurationSupport {

	public static void main(String[] args) {
		SpringApplication.run(Kgh2352Application.class, args);
	}

	@Bean
	RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
		return RetryTopicConfigurationBuilder.newInstance()
				.maxAttempts(4)
				.autoCreateTopicsWith(2, (short) 1)
				.doNotRetryOnDltFailure()
				.create(template);
	}

	@Bean
	TaskScheduler scheduler() {
		return new ThreadPoolTaskScheduler();
	}

	@Bean
	@Order(0)
	SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
			KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
			Listener listener, KafkaListenerEndpointRegistry registry) {

		return () -> {
			// We need to add a getter to the bpp for the registrar.
			KafkaListenerEndpointRegistrar registrar = (KafkaListenerEndpointRegistrar) new DirectFieldAccessor(bpp)
					.getPropertyValue("registrar");

			MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
			EndpointProcessor endpointProcessor = endpoint -> {
				// customize as needed (e.g. apply attributes to retry endpoints).
				if (!endpoint.equals(mainEndpoint)) {
					endpoint.setConcurrency(1);
				}
				endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
				endpoint.setTopics("kgh2352");
				endpoint.setId("kgh2352");
				endpoint.setGroupId("kgh2352");
			};
			mainEndpoint.setBean(listener);
			try {
				mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
			}
			catch (NoSuchMethodException | SecurityException ex) {
				throw new IllegalStateException(ex);
			}
			mainEndpoint.setConcurrency(2);
			mainEndpoint.setTopics("kgh2352");
			mainEndpoint.setId("kgh2352");
			mainEndpoint.setGroupId("kgh2352");
			configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
					"kafkaListenerContainerFactory");
		};
	}


	@Bean
	ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> {
			template.send("kgh2352", "test");
		};
	}

}

@Component
class Listener implements MessageListener<String, String> {

	@Override
	public void onMessage(ConsumerRecord<String, String> record) {
		System.out.println(KafkaUtils.format(record));
		throw new RuntimeException("test");
	}

}

Result:

kgh2352-1@0
kgh2352-retry-0-1@0
kgh2352-retry-1-1@0
kgh2352-retry-2-1@0
Record: topic = kgh2352-retry-2, partition = 1, offset = 0, main topic = kgh2352 threw an error at topic kgh2352-retry-2 and won't be retried. Sending to DLT with name kgh2352-dlt.
Received message in dlt listener: kgh2352-dlt-1@0

@garyrussell garyrussell modified the milestones: 3.0 Backlog, 3.0.0-RC2 Oct 19, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Oct 19, 2022
artembilan pushed a commit that referenced this issue Oct 19, 2022
Resolves #2352

**cherry-pick to 2.9.x**
artembilan pushed a commit that referenced this issue Oct 19, 2022
Resolves #2352

**cherry-pick to 2.9.x**
garyrussell added a commit that referenced this issue Oct 19, 2022
garyrussell added a commit that referenced this issue Oct 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants