Skip to content

Commit 9ff3624

Browse files
Refactored ObserveOn without ScheduledObserver
1 parent b8b8334 commit 9ff3624

File tree

4 files changed

+75
-144
lines changed

4 files changed

+75
-144
lines changed

rxjava-core/src/main/java/rx/Notification.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,16 @@ public boolean isOnNext() {
116116
return getKind() == Kind.OnNext;
117117
}
118118

119+
public void accept(Observer<? super T> observer) {
120+
if (isOnNext()) {
121+
observer.onNext(getValue());
122+
} else if (isOnCompleted()) {
123+
observer.onCompleted();
124+
} else if (isOnError()) {
125+
observer.onError(getThrowable());
126+
}
127+
}
128+
119129
public static enum Kind {
120130
OnNext, OnError, OnCompleted
121131
}

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,20 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import rx.Notification;
1822
import rx.Observable;
1923
import rx.Observable.OnSubscribeFunc;
2024
import rx.Observer;
2125
import rx.Scheduler;
2226
import rx.Subscription;
27+
import rx.concurrency.CurrentThreadScheduler;
2328
import rx.concurrency.ImmediateScheduler;
2429
import rx.subscriptions.CompositeSubscription;
30+
import rx.util.functions.Action0;
31+
import rx.util.functions.Action1;
2532

2633
/**
2734
* Asynchronously notify Observers on the specified Scheduler.
@@ -38,6 +45,9 @@ private static class ObserveOn<T> implements OnSubscribeFunc<T> {
3845
private final Observable<? extends T> source;
3946
private final Scheduler scheduler;
4047

48+
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
49+
final AtomicInteger counter = new AtomicInteger(0);
50+
4151
public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
4252
this.source = source;
4353
this.scheduler = scheduler;
@@ -48,11 +58,55 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
4858
if (scheduler instanceof ImmediateScheduler) {
4959
// do nothing if we request ImmediateScheduler so we don't invoke overhead
5060
return source.subscribe(observer);
61+
} else if (scheduler instanceof CurrentThreadScheduler) {
62+
// do nothing if we request CurrentThreadScheduler so we don't invoke overhead
63+
return source.subscribe(observer);
5164
} else {
52-
CompositeSubscription s = new CompositeSubscription();
53-
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
54-
return s;
65+
return observeOn(observer, scheduler);
5566
}
5667
}
68+
69+
public Subscription observeOn(final Observer<? super T> observer, Scheduler scheduler) {
70+
final CompositeSubscription s = new CompositeSubscription();
71+
72+
s.add(source.materialize().subscribe(new Action1<Notification<? extends T>>() {
73+
74+
@Override
75+
public void call(Notification<? extends T> e) {
76+
// this must happen before 'counter' is used to provide synchronization between threads
77+
queue.offer(e);
78+
79+
// we now use counter to atomically determine if we need to start processing or not
80+
// it will be 0 if it's the first notification or the scheduler has finished processing work
81+
// and we need to start doing it again
82+
if (counter.getAndIncrement() == 0) {
83+
processQueue(s, observer);
84+
}
85+
86+
}
87+
}));
88+
89+
return s;
90+
}
91+
92+
private void processQueue(CompositeSubscription s, final Observer<? super T> observer) {
93+
s.add(scheduler.schedule(new Action1<Action0>() {
94+
@Override
95+
public void call(Action0 self) {
96+
Notification<? extends T> not = queue.poll();
97+
if (not != null) {
98+
not.accept(observer);
99+
}
100+
101+
// decrement count and if we still have work to do
102+
// recursively schedule ourselves to process again
103+
if (counter.decrementAndGet() > 0) {
104+
self.call();
105+
}
106+
107+
}
108+
}));
109+
}
57110
}
111+
58112
}

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 0 additions & 138 deletions
This file was deleted.

rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,17 @@ public void testUnsubscribeOfNewThread() throws InterruptedException {
2828
public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException {
2929
testUnSubscribeForScheduler(Schedulers.threadPoolForIO());
3030
}
31-
31+
3232
@Test
3333
public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException {
3434
testUnSubscribeForScheduler(Schedulers.threadPoolForComputation());
3535
}
36-
36+
37+
@Test
38+
public void testUnsubscribeOfImmediateThread() throws InterruptedException {
39+
testUnSubscribeForScheduler(Schedulers.immediate());
40+
}
41+
3742
@Test
3843
public void testUnsubscribeOfCurrentThread() throws InterruptedException {
3944
testUnSubscribeForScheduler(Schedulers.currentThread());
@@ -56,7 +61,7 @@ public Long call(Long aLong) {
5661
}
5762
})
5863
.subscribeOn(scheduler)
59-
.observeOn(Schedulers.currentThread())
64+
.observeOn(scheduler)
6065
.subscribe(new Observer<Long>() {
6166
@Override
6267
public void onCompleted() {

0 commit comments

Comments
 (0)