Skip to content

Commit 20e2bfc

Browse files
authored
Merge pull request #39 from rabbitmq/rabbitmq-stream-java-client-38-subscription-hook
Add SubscriptionListener
2 parents ac7d418 + 5862193 commit 20e2bfc

File tree

11 files changed

+476
-20
lines changed

11 files changed

+476
-20
lines changed

src/docs/asciidoc/api.adoc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,3 +881,43 @@ entry, which has its own offset.
881881
This means one must be careful when basing some decision on offset values, like
882882
a modulo to perform an operation every X messages. As the message offsets have
883883
no guarantee to be contiguous, the operation may not happen exactly every X messages.
884+
885+
====== Subscription Listener
886+
887+
The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
888+
This callback can be used to customize the offset the client computed for the subscription.
889+
The callback is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection or a topology change).
890+
891+
It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides.
892+
The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
893+
894+
.Using an external store for offset tracking with a subscription listener
895+
[source,java,indent=0]
896+
--------
897+
include::{test-examples}/ConsumerUsage.java[tag=subscription-listener]
898+
--------
899+
<1> Set subscription listener
900+
<2> Get offset from external store
901+
<3> Set offset to use for the subscription
902+
<4> Store the offset in the external store after processing
903+
904+
When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use.
905+
906+
Using a subscription listener can also be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex.
907+
This requires a good understanding on how and when subscription occurs in the client, and so when the subscription listener is called:
908+
909+
* for a consumer with no name (server-side offset tracking _disabled_)
910+
** on the first subscription (when the consumer is created): the offset specification is the one specified with `ConsumerBuilder#offset(OffsetSpecification)`, the default being `OffsetSpecification#next()`
911+
** on re-subscription (after a disconnection or topology change): the offset specification is the offset of the last dispatched message
912+
* for a consumer with a name (server-side offset tracking _enabled_)
913+
** on the first subscription (when the consumer is created): the server-side stored offset (if any) overrides the value specified with `ConsumerBuilder#offset(OffsetSpecification)`
914+
** on re-subscription (after a disconnection or topology change): the server-side stored offset is used
915+
916+
The subscription listener comes in handy on re-subscription.
917+
The application can track the last processed offset in-memory, with an `AtomicLong` for example.
918+
The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the client may not be perfectly appropriate on re-subscription.
919+
920+
Let's take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval.
921+
When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed.
922+
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
923+
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ public interface ConsumerBuilder {
6464
*/
6565
ConsumerBuilder name(String name);
6666

67+
/**
68+
* Callback on subscription.
69+
*
70+
* <p>Can be used to set the offset specification before subscribing to the stream.
71+
*
72+
* @see SubscriptionListener
73+
* @param subscriptionListener the listener
74+
* @return this builder instance
75+
*/
76+
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
77+
6778
/**
6879
* Enable {@link ManualTrackingStrategy}.
6980
*
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
/**
17+
* Callback interface to customize a subscription.
18+
*
19+
* <p>It is possible to change the computed {@link OffsetSpecification} in {@link
20+
* #preSubscribe(SubscriptionContext)} by using a custom offset tracking strategy.
21+
*/
22+
public interface SubscriptionListener {
23+
24+
/**
25+
* Callback called before the subscription is created.
26+
*
27+
* <p>The method is called when a {@link Consumer} is created and it registers to broker, and also
28+
* when the subscription must be re-created (after a disconnection or when the subscription must
29+
* moved because the stream member it was connection becomes unavailable).
30+
*
31+
* <p>Application code can set the {@link OffsetSpecification} that will be used with the {@link
32+
* SubscriptionContext#offsetSpecification(OffsetSpecification)} method.
33+
*
34+
* @param subscriptionContext
35+
*/
36+
void preSubscribe(SubscriptionContext subscriptionContext);
37+
38+
/** Context object for the subscription. */
39+
interface SubscriptionContext {
40+
41+
/**
42+
* The offset specification computed by the library.
43+
*
44+
* <p>If the consumer has no name, the value is the value set with {@link
45+
* ConsumerBuilder#offset(OffsetSpecification)} on the first subscription and the offset of the
46+
* last dispatched message on subsequent calls (e.g. when the client re-subscribes after a
47+
* disconnection).
48+
*
49+
* <p>If the consumer has a name, the value is the last stored if any.
50+
*
51+
* @see ConsumerBuilder#name(String)
52+
* @return the computed offset specification
53+
*/
54+
OffsetSpecification offsetSpecification();
55+
56+
/**
57+
* Set the offset specification to use for the subscription.
58+
*
59+
* <p>It overrides the value computed by the client.
60+
*
61+
* @param offsetSpecification the offset specification to use
62+
*/
63+
void offsetSpecification(OffsetSpecification offsetSpecification);
64+
}
65+
}

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.rabbitmq.stream.OffsetSpecification;
2424
import com.rabbitmq.stream.StreamDoesNotExistException;
2525
import com.rabbitmq.stream.StreamException;
26+
import com.rabbitmq.stream.SubscriptionListener;
27+
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
2628
import com.rabbitmq.stream.impl.Client.ChunkListener;
2729
import com.rabbitmq.stream.impl.Client.CreditNotification;
2830
import com.rabbitmq.stream.impl.Client.MessageListener;
@@ -83,6 +85,7 @@ Runnable subscribe(
8385
String stream,
8486
OffsetSpecification offsetSpecification,
8587
String trackingReference,
88+
SubscriptionListener subscriptionListener,
8689
MessageHandler messageHandler) {
8790
// FIXME fail immediately if there's no locator (can provide a supplier that does not retry)
8891
List<Client.Broker> candidates = findBrokersForStream(stream);
@@ -95,7 +98,12 @@ Runnable subscribe(
9598
// we keep this instance when we move the subscription from a client to another one
9699
SubscriptionTracker subscriptionTracker =
97100
new SubscriptionTracker(
98-
consumer, stream, offsetSpecification, trackingReference, messageHandler);
101+
consumer,
102+
stream,
103+
offsetSpecification,
104+
trackingReference,
105+
subscriptionListener,
106+
messageHandler);
99107

100108
String key = keyForClientSubscription(newNode);
101109

@@ -212,6 +220,7 @@ private static class SubscriptionTracker {
212220
private final String offsetTrackingReference;
213221
private final MessageHandler messageHandler;
214222
private final StreamConsumer consumer;
223+
private final SubscriptionListener subscriptionListener;
215224
private volatile long offset;
216225
private volatile boolean hasReceivedSomething = false;
217226
private volatile byte subscriptionIdInClient;
@@ -223,11 +232,13 @@ private SubscriptionTracker(
223232
String stream,
224233
OffsetSpecification initialOffsetSpecification,
225234
String offsetTrackingReference,
235+
SubscriptionListener subscriptionListener,
226236
MessageHandler messageHandler) {
227237
this.consumer = consumer;
228238
this.stream = stream;
229239
this.initialOffsetSpecification = initialOffsetSpecification;
230240
this.offsetTrackingReference = offsetTrackingReference;
241+
this.subscriptionListener = subscriptionListener;
231242
this.messageHandler = messageHandler;
232243
}
233244

@@ -635,7 +646,7 @@ synchronized void add(
635646
update(previousSubscriptions, subscriptionId, subscriptionTracker);
636647

637648
String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
638-
if (subscriptionTracker.offsetTrackingReference != null) {
649+
if (offsetTrackingReference != null) {
639650
long trackedOffset =
640651
client.queryOffset(offsetTrackingReference, subscriptionTracker.stream);
641652
if (trackedOffset != 0) {
@@ -666,12 +677,20 @@ synchronized void add(
666677
subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference);
667678
}
668679

680+
SubscriptionContext subscriptionContext =
681+
new DefaultSubscriptionContext(offsetSpecification);
682+
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
683+
LOGGER.info(
684+
"Computed offset specification {}, offset specification used after subscription listener {}",
685+
offsetSpecification,
686+
subscriptionContext.offsetSpecification());
687+
669688
// FIXME consider using fewer initial credits
670689
Client.Response subscribeResponse =
671690
client.subscribe(
672691
subscriptionId,
673692
subscriptionTracker.stream,
674-
offsetSpecification,
693+
subscriptionContext.offsetSpecification(),
675694
10,
676695
subscriptionProperties);
677696
if (!subscribeResponse.isOk()) {
@@ -767,4 +786,28 @@ synchronized void close() {
767786
}
768787
}
769788
}
789+
790+
private static final class DefaultSubscriptionContext implements SubscriptionContext {
791+
792+
private volatile OffsetSpecification offsetSpecification;
793+
794+
private DefaultSubscriptionContext(OffsetSpecification computedOffsetSpecification) {
795+
this.offsetSpecification = computedOffsetSpecification;
796+
}
797+
798+
@Override
799+
public OffsetSpecification offsetSpecification() {
800+
return this.offsetSpecification;
801+
}
802+
803+
@Override
804+
public void offsetSpecification(OffsetSpecification offsetSpecification) {
805+
this.offsetSpecification = offsetSpecification;
806+
}
807+
808+
@Override
809+
public String toString() {
810+
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';
811+
}
812+
}
770813
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.rabbitmq.stream.MessageHandler;
1818
import com.rabbitmq.stream.MessageHandler.Context;
1919
import com.rabbitmq.stream.OffsetSpecification;
20+
import com.rabbitmq.stream.SubscriptionListener;
2021
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
2122
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
2223
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,7 +58,8 @@ class StreamConsumer implements Consumer {
5758
String name,
5859
StreamEnvironment environment,
5960
TrackingConfiguration trackingConfiguration,
60-
boolean lazyInit) {
61+
boolean lazyInit,
62+
SubscriptionListener subscriptionListener) {
6163

6264
try {
6365
this.name = name;
@@ -100,6 +102,7 @@ class StreamConsumer implements Consumer {
100102
stream,
101103
offsetSpecification,
102104
this.name,
105+
subscriptionListener,
103106
messageHandlerWithOrWithoutTracking);
104107

105108
this.status = Status.RUNNING;

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.stream.MessageHandler;
1919
import com.rabbitmq.stream.OffsetSpecification;
2020
import com.rabbitmq.stream.StreamException;
21+
import com.rabbitmq.stream.SubscriptionListener;
2122
import java.lang.reflect.Field;
2223
import java.lang.reflect.Modifier;
2324
import java.time.Duration;
@@ -34,6 +35,7 @@ class StreamConsumerBuilder implements ConsumerBuilder {
3435
private DefaultAutoTrackingStrategy autoTrackingStrategy;
3536
private DefaultManualTrackingStrategy manualTrackingStrategy;
3637
private boolean lazyInit = false;
38+
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
3739

3840
public StreamConsumerBuilder(StreamEnvironment environment) {
3941
this.environment = environment;
@@ -77,6 +79,15 @@ public ConsumerBuilder name(String name) {
7779
return this;
7880
}
7981

82+
@Override
83+
public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) {
84+
if (subscriptionListener == null) {
85+
throw new IllegalArgumentException("The subscription listener cannot be null");
86+
}
87+
this.subscriptionListener = subscriptionListener;
88+
return this;
89+
}
90+
8091
@Override
8192
public ManualTrackingStrategy manualTrackingStrategy() {
8293
this.manualTrackingStrategy = new DefaultManualTrackingStrategy(this);
@@ -142,7 +153,8 @@ public Consumer build() {
142153
this.name,
143154
this.environment,
144155
trackingConfiguration,
145-
this.lazyInit);
156+
this.lazyInit,
157+
this.subscriptionListener);
146158
environment.addConsumer((StreamConsumer) consumer);
147159
} else {
148160
consumer =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.rabbitmq.stream.ProducerBuilder;
2929
import com.rabbitmq.stream.StreamCreator;
3030
import com.rabbitmq.stream.StreamException;
31+
import com.rabbitmq.stream.SubscriptionListener;
3132
import com.rabbitmq.stream.compression.CompressionCodecFactory;
3233
import com.rabbitmq.stream.impl.Client.ClientParameters;
3334
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
@@ -467,10 +468,16 @@ Runnable registerConsumer(
467468
String stream,
468469
OffsetSpecification offsetSpecification,
469470
String trackingReference,
471+
SubscriptionListener subscriptionListener,
470472
MessageHandler messageHandler) {
471473
Runnable closingCallback =
472474
this.consumersCoordinator.subscribe(
473-
consumer, stream, offsetSpecification, trackingReference, messageHandler);
475+
consumer,
476+
stream,
477+
offsetSpecification,
478+
trackingReference,
479+
subscriptionListener,
480+
messageHandler);
474481
return closingCallback;
475482
}
476483

src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,30 @@ void manualTrackingWithSettings() {
127127
// end::manual-tracking-with-settings[]
128128
}
129129

130+
void subscriptionListener() {
131+
Environment environment = Environment.builder().build();
132+
// tag::subscription-listener[]
133+
Consumer consumer = environment.consumerBuilder()
134+
.stream("my-stream")
135+
.subscriptionListener(subscriptionContext -> { // <1>
136+
long offset = getOffsetFromExternalStore(); // <2>
137+
subscriptionContext.offsetSpecification(OffsetSpecification.offset(offset)); // <3>
138+
})
139+
.messageHandler((context, message) -> {
140+
// message handling code...
141+
142+
storeOffsetInExternalStore(context.offset()); // <4>
143+
})
144+
.build();
145+
// end::subscription-listener[]
146+
}
147+
148+
void storeOffsetInExternalStore(long offset) {
149+
150+
}
151+
152+
long getOffsetFromExternalStore() {
153+
return 0L;
154+
}
155+
130156
}

0 commit comments

Comments
 (0)