Skip to content

Commit cd4f112

Browse files
committed
Avoiding OperatorObserveOn from calling subscriber.onNext(..) after unsubscribe().
The OperatorObserveOn operator uses a scheduler to cancel subscriptions as well as to deliver the objects passing through it's onNext(..) in the right context. Calling unsubscribe will schedule the actual unsubscription while not making sure that the child subscriber will no longer receive calls to onNext(..) after unsubscribe() returns. This fix makes sure that after unsubscribe() returns no more onNext(..) calls will be made on the child subscribers. Signed-off-by: David Marques <[email protected]>
1 parent ad35778 commit cd4f112

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7272
= AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
7373

7474
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber) {
75-
super(subscriber);
7675
this.observer = subscriber;
7776
this.recursiveScheduler = scheduler.createWorker();
7877
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
79-
subscriber.add(scheduledUnsubscribe);
78+
add(scheduledUnsubscribe);
79+
80+
subscriber.add(recursiveScheduler);
81+
subscriber.add(this);
8082
}
8183

8284
@Override

rxjava-core/src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import rx.Observable;
4141
import rx.Observer;
4242
import rx.Scheduler;
43+
import rx.Subscription;
4344
import rx.exceptions.TestException;
4445
import rx.functions.Action0;
4546
import rx.functions.Action1;
@@ -389,4 +390,21 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
389390
inOrder.verify(o, never()).onNext(anyInt());
390391
inOrder.verify(o, never()).onCompleted();
391392
}
393+
394+
@Test
395+
public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() {
396+
final TestScheduler testScheduler = new TestScheduler();
397+
final Observer<Integer> observer = mock(Observer.class);
398+
final Subscription subscription = Observable.from(1, 2, 3)
399+
.observeOn(testScheduler)
400+
.subscribe(observer);
401+
subscription.unsubscribe();
402+
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
403+
404+
final InOrder inOrder = inOrder(observer);
405+
406+
inOrder.verify(observer, never()).onNext(anyInt());
407+
inOrder.verify(observer, never()).onError(any(Exception.class));
408+
inOrder.verify(observer, never()).onCompleted();
409+
}
392410
}

0 commit comments

Comments
 (0)