Open
Description
Is there support to acknowledge outside of the consumeOne and consumeMany of AdaptedReactiveMessageConsumer?
My usecase is I want to consume the messages but don't want the acknowledgement to happen within the consumeOne, consumeMany. I want to separately acknowledge the pulsar message later on in the reactive pipeline after consumeOne, consumeMany.
e.g.
ReactiveMessageConsumer<String> messageConsumer=
reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("sub")
.build();
messageConsumer.consumeMany(messageFlux ->
messageFlux.map(message ->
doSomeProcessing(message)))
.flatMap(msgResult -> {
MessageId msgId = msgResult.getMessageId();
// Acknowledge here outside of consumeMany or consumeOne
handleAcknowledgement(msgId);
})
// for demonstration
.subscribe(System.out::println);
Metadata
Metadata
Assignees
Labels
No labels