Skip to content

Commit 7da6110

Browse files
Merge pull request ReactiveX#2237 from benjchristensen/publish-ring-buffer-usage
Make Publish Operator Release RingBuffer
2 parents 4a08644 + e39599c commit 7da6110

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

src/main/java/rx/internal/operators/OperatorPublish.java

+21-9
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void call() {
9999
public void connect(Action1<? super Subscription> connection) {
100100
// each time we connect we create a new Subscription
101101
boolean shouldSubscribe = false;
102-
102+
103103
// subscription is the state of whether we are connected or not
104104
OriginSubscriber<T> origin = requestHandler.state.getOrigin();
105105
if (origin == null) {
@@ -113,7 +113,7 @@ public void connect(Action1<? super Subscription> connection) {
113113
connection.call(Subscriptions.create(new Action0() {
114114
@Override
115115
public void call() {
116-
Subscription s = requestHandler.state.getOrigin();
116+
OriginSubscriber<T> s = requestHandler.state.getOrigin();
117117
requestHandler.state.setOrigin(null);
118118
if (s != null) {
119119
s.unsubscribe();
@@ -135,9 +135,11 @@ private static class OriginSubscriber<T> extends Subscriber<T> {
135135
private final RequestHandler<T> requestHandler;
136136
private final AtomicLong originOutstanding = new AtomicLong();
137137
private final long THRESHOLD = RxRingBuffer.SIZE / 4;
138+
private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();
138139

139140
OriginSubscriber(RequestHandler<T> requestHandler) {
140141
this.requestHandler = requestHandler;
142+
add(buffer);
141143
}
142144

143145
@Override
@@ -199,6 +201,8 @@ public void onNext(T t) {
199201
* with a complicated state machine so I'm sticking with mutex locks and just trying to make sure the work done while holding the
200202
* lock is small (such as never emitting data).
201203
*
204+
* This does however mean we can't rely on a reference to State being consistent. For example, it can end up with a null OriginSubscriber.
205+
*
202206
* @param <T>
203207
*/
204208
private static class State<T> {
@@ -288,7 +292,7 @@ private long resetAfterSubscriberUpdate() {
288292

289293
private static class RequestHandler<T> {
290294
private final NotificationLite<T> notifier = NotificationLite.instance();
291-
private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();
295+
292296
private final State<T> state = new State<T>();
293297
@SuppressWarnings("unused")
294298
volatile long wip;
@@ -297,16 +301,24 @@ private static class RequestHandler<T> {
297301

298302
public void requestFromChildSubscriber(Subscriber<? super T> subscriber, Long request) {
299303
state.requestFromSubscriber(subscriber, request);
300-
drainQueue();
304+
OriginSubscriber<T> originSubscriber = state.getOrigin();
305+
if(originSubscriber != null) {
306+
drainQueue(originSubscriber);
307+
}
301308
}
302309

303310
public void emit(Object t) throws MissingBackpressureException {
311+
OriginSubscriber<T> originSubscriber = state.getOrigin();
312+
if(originSubscriber == null) {
313+
// unsubscribed so break ... we are done
314+
return;
315+
}
304316
if (notifier.isCompleted(t)) {
305-
buffer.onCompleted();
317+
originSubscriber.buffer.onCompleted();
306318
} else {
307-
buffer.onNext(notifier.getValue(t));
319+
originSubscriber.buffer.onNext(notifier.getValue(t));
308320
}
309-
drainQueue();
321+
drainQueue(originSubscriber);
310322
}
311323

312324
private void requestMoreAfterEmission(int emitted) {
@@ -319,7 +331,7 @@ private void requestMoreAfterEmission(int emitted) {
319331
}
320332
}
321333

322-
public void drainQueue() {
334+
public void drainQueue(OriginSubscriber<T> originSubscriber) {
323335
if (WIP.getAndIncrement(this) == 0) {
324336
int emitted = 0;
325337
do {
@@ -338,7 +350,7 @@ public void drainQueue() {
338350
if (!shouldEmit) {
339351
break;
340352
}
341-
Object o = buffer.poll();
353+
Object o = originSubscriber.buffer.poll();
342354
if (o == null) {
343355
// nothing in buffer so increment outstanding back again
344356
state.incrementOutstandingAfterFailedEmit();

src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void call(Integer l) {
154154
s2.unsubscribe(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
155155
s1.unsubscribe();
156156

157-
System.out.println("onNext: " + nextCount.get());
157+
System.out.println("onNext Count: " + nextCount.get());
158158

159159
// it will emit twice because it is synchronous
160160
assertEquals(nextCount.get(), receivedCount.get() * 2);

0 commit comments

Comments
 (0)