Skip to content

Acknowledge outside of consumeMany and consumeOne #220

Open
@tripletk

Description

@tripletk

Is there support to acknowledge outside of the consumeOne and consumeMany of AdaptedReactiveMessageConsumer?

My usecase is I want to consume the messages but don't want the acknowledgement to happen within the consumeOne, consumeMany. I want to separately acknowledge the pulsar message later on in the reactive pipeline after consumeOne, consumeMany.

e.g.

ReactiveMessageConsumer<String> messageConsumer=
		reactivePulsarClient.messageConsumer(Schema.STRING)
				.topic(topicName)
				.subscriptionName("sub")
				.build();
messageConsumer.consumeMany(messageFlux ->
			messageFlux.map(message ->
			 doSomeProcessing(message)))
		.flatMap(msgResult -> {
			MessageId msgId = msgResult.getMessageId();
			// Acknowledge here outside of consumeMany or consumeOne
			handleAcknowledgement(msgId);
		})
		// for demonstration
		.subscribe(System.out::println);

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions