Skip to content

Commit 26f5a66

Browse files
Merge pull request ReactiveX#409 from zsxwing/synchronize-lock
Implemented 'Synchronize' with 'lock'
2 parents fa72a27 + 8bd705d commit 26f5a66

File tree

3 files changed

+274
-8
lines changed

3 files changed

+274
-8
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,27 @@ public Observable<T> synchronize() {
18221822
return create(OperationSynchronize.synchronize(this));
18231823
}
18241824

1825+
/**
1826+
* Accepts an Observable and wraps it in another Observable that ensures that the resulting
1827+
* Observable is chronologically well-behaved. This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter.
1828+
* <p>
1829+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/synchronize.png">
1830+
* <p>
1831+
* A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of
1832+
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
1833+
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
1834+
*
1835+
* @param lock
1836+
* The lock object to synchronize each observer call on
1837+
* @param <T>
1838+
* the type of item emitted by the source Observable
1839+
* @return an Observable that is a chronologically well-behaved version of the source
1840+
* Observable, and that synchronously notifies its {@link Observer}s
1841+
*/
1842+
public Observable<T> synchronize(Object lock) {
1843+
return create(OperationSynchronize.synchronize(this, lock));
1844+
}
1845+
18251846
/**
18261847
* @deprecated Replaced with instance method.
18271848
*/

rxjava-core/src/main/java/rx/operators/OperationSynchronize.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,46 @@ public final class OperationSynchronize<T> {
5858
* @return the wrapped synchronized observable sequence
5959
*/
6060
public static <T> OnSubscribeFunc<T> synchronize(Observable<? extends T> observable) {
61-
return new Synchronize<T>(observable);
61+
return new Synchronize<T>(observable, null);
62+
}
63+
64+
/**
65+
* Accepts an observable and wraps it in another observable which ensures that the resulting observable is well-behaved.
66+
* This is accomplished by acquiring a mutual-exclusion lock for the object provided as the lock parameter.
67+
*
68+
* A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are
69+
* not interleaved, onCompleted and onError are only called once respectively, and no
70+
* onNext calls follow onCompleted and onError calls.
71+
*
72+
* @param observable
73+
* @param lock
74+
* The lock object to synchronize each observer call on
75+
* @param <T>
76+
* @return the wrapped synchronized observable sequence
77+
*/
78+
public static <T> OnSubscribeFunc<T> synchronize(Observable<? extends T> observable, Object lock) {
79+
return new Synchronize<T>(observable, lock);
6280
}
6381

6482
private static class Synchronize<T> implements OnSubscribeFunc<T> {
6583

66-
public Synchronize(Observable<? extends T> innerObservable) {
84+
public Synchronize(Observable<? extends T> innerObservable, Object lock) {
6785
this.innerObservable = innerObservable;
86+
this.lock = lock;
6887
}
6988

7089
private Observable<? extends T> innerObservable;
7190
private SynchronizedObserver<T> atomicObserver;
91+
private Object lock;
7292

7393
public Subscription onSubscribe(Observer<? super T> observer) {
7494
SafeObservableSubscription subscription = new SafeObservableSubscription();
75-
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
95+
if(lock == null) {
96+
atomicObserver = new SynchronizedObserver<T>(observer, subscription);
97+
}
98+
else {
99+
atomicObserver = new SynchronizedObserver<T>(observer, subscription, lock);
100+
}
76101
return subscription.wrap(innerObservable.subscribe(atomicObserver));
77102
}
78103

rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java

Lines changed: 225 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.util.Random;
2223
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.Executors;
2425
import java.util.concurrent.Future;
@@ -68,10 +69,18 @@ public final class SynchronizedObserver<T> implements Observer<T> {
6869
private final SafeObservableSubscription subscription;
6970
private volatile boolean finishRequested = false;
7071
private volatile boolean finished = false;
72+
private volatile Object lock;
7173

7274
public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscription subscription) {
7375
this.observer = Observer;
7476
this.subscription = subscription;
77+
this.lock = this;
78+
}
79+
80+
public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscription subscription, Object lock) {
81+
this.observer = Observer;
82+
this.subscription = subscription;
83+
this.lock = lock;
7584
}
7685

7786
/**
@@ -80,16 +89,15 @@ public SynchronizedObserver(Observer<? super T> Observer, SafeObservableSubscrip
8089
* @param Observer
8190
*/
8291
public SynchronizedObserver(Observer<? super T> Observer) {
83-
this.observer = Observer;
84-
this.subscription = new SafeObservableSubscription();
92+
this(Observer, new SafeObservableSubscription());
8593
}
8694

8795
public void onNext(T arg) {
8896
if (finished || finishRequested || subscription.isUnsubscribed()) {
8997
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
9098
return;
9199
}
92-
synchronized (this) {
100+
synchronized (lock) {
93101
// check again since this could have changed while waiting
94102
if (finished || finishRequested || subscription.isUnsubscribed()) {
95103
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
@@ -105,7 +113,7 @@ public void onError(Throwable e) {
105113
return;
106114
}
107115
finishRequested = true;
108-
synchronized (this) {
116+
synchronized (lock) {
109117
// check again since this could have changed while waiting
110118
if (finished || subscription.isUnsubscribed()) {
111119
return;
@@ -121,7 +129,7 @@ public void onCompleted() {
121129
return;
122130
}
123131
finishRequested = true;
124-
synchronized (this) {
132+
synchronized (lock) {
125133
// check again since this could have changed while waiting
126134
if (finished || subscription.isUnsubscribed()) {
127135
return;
@@ -188,6 +196,46 @@ public void testMultiThreadedBasic() {
188196
assertEquals(1, busyObserver.maxConcurrentThreads.get());
189197
}
190198

199+
@Test
200+
public void testMultiThreadedBasicWithLock() {
201+
Subscription s = mock(Subscription.class);
202+
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three");
203+
Observable<String> w = Observable.create(onSubscribe);
204+
205+
SafeObservableSubscription as = new SafeObservableSubscription(s);
206+
BusyObserver busyObserver = new BusyObserver();
207+
208+
Object lock = new Object();
209+
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);
210+
211+
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);
212+
213+
externalBusyThread.start();
214+
215+
w.subscribe(aw);
216+
onSubscribe.waitToFinish();
217+
218+
try {
219+
externalBusyThread.join(10000);
220+
assertFalse(externalBusyThread.isAlive());
221+
assertFalse(externalBusyThread.fail);
222+
} catch (InterruptedException e) {
223+
// ignore
224+
}
225+
226+
assertEquals(3, busyObserver.onNextCount.get());
227+
assertFalse(busyObserver.onError);
228+
assertTrue(busyObserver.onCompleted);
229+
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
230+
// so commenting out for now as this is not a critical thing to test here
231+
// verify(s, times(1)).unsubscribe();
232+
233+
// we can have concurrency ...
234+
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
235+
// ... but the onNext execution should be single threaded
236+
assertEquals(1, busyObserver.maxConcurrentThreads.get());
237+
}
238+
191239
@Test
192240
public void testMultiThreadedWithNPE() {
193241
Subscription s = mock(Subscription.class);
@@ -220,6 +268,52 @@ public void testMultiThreadedWithNPE() {
220268
assertEquals(1, busyObserver.maxConcurrentThreads.get());
221269
}
222270

271+
@Test
272+
public void testMultiThreadedWithNPEAndLock() {
273+
Subscription s = mock(Subscription.class);
274+
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
275+
Observable<String> w = Observable.create(onSubscribe);
276+
277+
SafeObservableSubscription as = new SafeObservableSubscription(s);
278+
BusyObserver busyObserver = new BusyObserver();
279+
280+
Object lock = new Object();
281+
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);
282+
283+
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);
284+
285+
externalBusyThread.start();
286+
287+
w.subscribe(aw);
288+
onSubscribe.waitToFinish();
289+
290+
try {
291+
externalBusyThread.join(10000);
292+
assertFalse(externalBusyThread.isAlive());
293+
assertFalse(externalBusyThread.fail);
294+
} catch (InterruptedException e) {
295+
// ignore
296+
}
297+
298+
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
299+
300+
// we can't know how many onNext calls will occur since they each run on a separate thread
301+
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
302+
// assertEquals(3, busyObserver.onNextCount.get());
303+
assertTrue(busyObserver.onNextCount.get() < 4);
304+
assertTrue(busyObserver.onError);
305+
// no onCompleted because onError was invoked
306+
assertFalse(busyObserver.onCompleted);
307+
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
308+
// so commenting out for now as this is not a critical thing to test here
309+
//verify(s, times(1)).unsubscribe();
310+
311+
// we can have concurrency ...
312+
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
313+
// ... but the onNext execution should be single threaded
314+
assertEquals(1, busyObserver.maxConcurrentThreads.get());
315+
}
316+
223317
@Test
224318
public void testMultiThreadedWithNPEinMiddle() {
225319
Subscription s = mock(Subscription.class);
@@ -250,6 +344,50 @@ public void testMultiThreadedWithNPEinMiddle() {
250344
assertEquals(1, busyObserver.maxConcurrentThreads.get());
251345
}
252346

347+
@Test
348+
public void testMultiThreadedWithNPEinMiddleAndLock() {
349+
Subscription s = mock(Subscription.class);
350+
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
351+
Observable<String> w = Observable.create(onSubscribe);
352+
353+
SafeObservableSubscription as = new SafeObservableSubscription(s);
354+
BusyObserver busyObserver = new BusyObserver();
355+
356+
Object lock = new Object();
357+
ExternalBusyThread externalBusyThread = new ExternalBusyThread(busyObserver, lock, 10, 100);
358+
359+
SynchronizedObserver<String> aw = new SynchronizedObserver<String>(busyObserver, as, lock);
360+
361+
externalBusyThread.start();
362+
363+
w.subscribe(aw);
364+
onSubscribe.waitToFinish();
365+
366+
try {
367+
externalBusyThread.join(10000);
368+
assertFalse(externalBusyThread.isAlive());
369+
assertFalse(externalBusyThread.fail);
370+
} catch (InterruptedException e) {
371+
// ignore
372+
}
373+
374+
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
375+
// this should not be the full number of items since the error should stop it before it completes all 9
376+
System.out.println("onNext count: " + busyObserver.onNextCount.get());
377+
assertTrue(busyObserver.onNextCount.get() < 9);
378+
assertTrue(busyObserver.onError);
379+
// no onCompleted because onError was invoked
380+
assertFalse(busyObserver.onCompleted);
381+
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
382+
// so commenting out for now as this is not a critical thing to test here
383+
// verify(s, times(1)).unsubscribe();
384+
385+
// we can have concurrency ...
386+
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
387+
// ... but the onNext execution should be single threaded
388+
assertEquals(1, busyObserver.maxConcurrentThreads.get());
389+
}
390+
253391
/**
254392
* A non-realistic use case that tries to expose thread-safety issues by throwing lots of out-of-order
255393
* events on many threads.
@@ -617,14 +755,32 @@ private static class BusyObserver implements Observer<String> {
617755

618756
@Override
619757
public void onCompleted() {
758+
threadsRunning.incrementAndGet();
759+
620760
System.out.println(">>> BusyObserver received onCompleted");
621761
onCompleted = true;
762+
763+
int concurrentThreads = threadsRunning.get();
764+
int maxThreads = maxConcurrentThreads.get();
765+
if (concurrentThreads > maxThreads) {
766+
maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads);
767+
}
768+
threadsRunning.decrementAndGet();
622769
}
623770

624771
@Override
625772
public void onError(Throwable e) {
773+
threadsRunning.incrementAndGet();
774+
626775
System.out.println(">>> BusyObserver received onError: " + e.getMessage());
627776
onError = true;
777+
778+
int concurrentThreads = threadsRunning.get();
779+
int maxThreads = maxConcurrentThreads.get();
780+
if (concurrentThreads > maxThreads) {
781+
maxConcurrentThreads.compareAndSet(maxThreads, concurrentThreads);
782+
}
783+
threadsRunning.decrementAndGet();
628784
}
629785

630786
@Override
@@ -652,6 +808,70 @@ public void onNext(String args) {
652808

653809
}
654810

811+
private static class ExternalBusyThread extends Thread {
812+
813+
private BusyObserver observer;
814+
private Object lock;
815+
private int lockTimes;
816+
private int waitTime;
817+
public volatile boolean fail;
818+
819+
public ExternalBusyThread(BusyObserver observer, Object lock, int lockTimes, int waitTime) {
820+
this.observer = observer;
821+
this.lock = lock;
822+
this.lockTimes = lockTimes;
823+
this.waitTime = waitTime;
824+
this.fail = false;
825+
}
826+
827+
@Override
828+
public void run() {
829+
Random r = new Random();
830+
for (int i = 0; i < lockTimes; i++) {
831+
synchronized (lock) {
832+
int oldOnNextCount = observer.onNextCount.get();
833+
boolean oldOnCompleted = observer.onCompleted;
834+
boolean oldOnError = observer.onError;
835+
try {
836+
Thread.sleep(r.nextInt(waitTime));
837+
} catch (InterruptedException e) {
838+
// ignore
839+
}
840+
// Since we own the lock, onNextCount, onCompleted and
841+
// onError must not be changed.
842+
int newOnNextCount = observer.onNextCount.get();
843+
boolean newOnCompleted = observer.onCompleted;
844+
boolean newOnError = observer.onError;
845+
if (oldOnNextCount != newOnNextCount) {
846+
System.out.println(">>> ExternalBusyThread received different onNextCount: "
847+
+ oldOnNextCount
848+
+ " -> "
849+
+ newOnNextCount);
850+
fail = true;
851+
break;
852+
}
853+
if (oldOnCompleted != newOnCompleted) {
854+
System.out.println(">>> ExternalBusyThread received different onCompleted: "
855+
+ oldOnCompleted
856+
+ " -> "
857+
+ newOnCompleted);
858+
fail = true;
859+
break;
860+
}
861+
if (oldOnError != newOnError) {
862+
System.out.println(">>> ExternalBusyThread received different onError: "
863+
+ oldOnError
864+
+ " -> "
865+
+ newOnError);
866+
fail = true;
867+
break;
868+
}
869+
}
870+
}
871+
}
872+
873+
}
874+
655875
}
656876

657877
}

0 commit comments

Comments
 (0)