From c968effdbba6e9d8e885f8316c288ae36cb9e86b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 4 Sep 2013 16:14:08 -0700 Subject: [PATCH] Remove unnecessary Observable constructor - fixed unit tests that relied on it --- rxjava-core/src/main/java/rx/Observable.java | 18 ++-- .../rx/observables/BlockingObservable.java | 8 -- .../rx/operators/OperationCombineLatest.java | 12 +-- .../java/rx/operators/OperationConcat.java | 28 +++--- .../rx/operators/OperationMaterialize.java | 10 +- .../java/rx/operators/OperationMerge.java | 46 ++++----- .../operators/OperationMergeDelayError.java | 74 +++++++-------- .../rx/operators/OperationMostRecent.java | 47 ++-------- .../java/rx/operators/OperationMulticast.java | 94 ++++--------------- .../main/java/rx/operators/OperationNext.java | 57 +++-------- ...OperationOnErrorResumeNextViaFunction.java | 8 +- ...erationOnErrorResumeNextViaObservable.java | 14 +-- .../rx/operators/OperationOnErrorReturn.java | 14 +-- ...ionOnExceptionResumeNextViaObservable.java | 31 +++--- .../rx/operators/OperationSynchronize.java | 18 ++-- .../main/java/rx/operators/OperationTake.java | 13 +-- .../java/rx/operators/OperationTakeUntil.java | 14 +-- .../java/rx/operators/OperationTakeWhile.java | 8 +- .../main/java/rx/operators/OperationZip.java | 8 +- 19 files changed, 197 insertions(+), 325 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 724552c8f0..dad2ca7286 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -102,17 +102,15 @@ *

* *

- * For more information see the RxJava - * Wiki + * For more information see the RxJava Wiki * * @param */ public class Observable { - //TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable") - - private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); - + /** + * Executed when 'subscribe' is invoked. + */ private final OnSubscribeFunc onSubscribe; /** @@ -125,7 +123,7 @@ public static interface OnSubscribeFunc extends Function { public Subscription onSubscribe(Observer t1); } - + /** * Observable with Function to execute when subscribed to. *

@@ -139,10 +137,8 @@ protected Observable(OnSubscribeFunc onSubscribe) { this.onSubscribe = onSubscribe; } - protected Observable() { - this(null); - //TODO should this be made private to prevent it? It really serves no good purpose and only confuses things. Unit tests are incorrectly using it today - } + private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 679be75a2b..4732d5fcad 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -64,14 +64,6 @@ protected BlockingObservable(OnSubscribeFunc onSubscribe) { super(onSubscribe); } - /** - * Used to prevent public instantiation - */ - @SuppressWarnings("unused") - private BlockingObservable() { - // prevent public instantiation - } - /** * Convert an Observable into a BlockingObservable. */ diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 8d7325a736..020254d8a0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -362,7 +362,7 @@ public void testCombineLatestWithFunctionThatThrowsAnException() { TestObservable w1 = new TestObservable(); TestObservable w2 = new TestObservable(); - Observable combined = Observable.create(combineLatest(w1, w2, new Func2() { + Observable combined = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), new Func2() { @Override public String call(String v1, String v2) { throw new RuntimeException("I don't work."); @@ -387,7 +387,7 @@ public void testCombineLatestDifferentLengthObservableSequences1() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction())); + Observable combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction())); combineLatestW.subscribe(w); /* simulate sending data */ @@ -425,7 +425,7 @@ public void testCombineLatestDifferentLengthObservableSequences2() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction())); + Observable combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction())); combineLatestW.subscribe(w); /* simulate sending data */ @@ -461,7 +461,7 @@ public void testCombineLatestWithInterleavingSequences() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction())); + Observable combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction())); combineLatestW.subscribe(w); /* simulate sending data */ @@ -936,12 +936,12 @@ private static String getStringValue(Object o) { } } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { Observer observer; @Override - public Subscription subscribe(Observer observer) { + public Subscription onSubscribe(Observer observer) { // just store the variable where it can be accessed so we can manually trigger it this.observer = observer; return Subscriptions.empty(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 873ba50a01..3a2d480608 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -255,7 +255,7 @@ public void testSimpleAsyncConcat() { TestObservable o1 = new TestObservable("one", "two", "three"); TestObservable o2 = new TestObservable("four", "five", "six"); - Observable.concat(o1, o2).subscribe(observer); + Observable.concat(Observable.create(o1), Observable.create(o2)).subscribe(observer); try { // wait for async observables to complete @@ -301,12 +301,12 @@ public void run() { // emit first if (!s.isUnsubscribed()) { System.out.println("Emit o1"); - observer.onNext(o1); + observer.onNext(Observable.create(o1)); } // emit second if (!s.isUnsubscribed()) { System.out.println("Emit o2"); - observer.onNext(o2); + observer.onNext(Observable.create(o2)); } // wait until sometime later and emit third @@ -317,7 +317,7 @@ public void run() { } if (!s.isUnsubscribed()) { System.out.println("Emit o3"); - observer.onNext(o3); + observer.onNext(Observable.create(o3)); } } catch (Throwable e) { @@ -404,7 +404,7 @@ public void testBlockedObservableOfObservables() { final CountDownLatch callOnce = new CountDownLatch(1); final CountDownLatch okToContinue = new CountDownLatch(1); TestObservable> observableOfObservables = new TestObservable>(callOnce, okToContinue, odds, even); - OnSubscribeFunc concatF = concat(observableOfObservables); + OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables)); Observable concat = Observable.create(concatF); concat.subscribe(observer); try { @@ -443,8 +443,8 @@ public void testConcatConcurrentWithInfinity() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @SuppressWarnings("unchecked") - TestObservable> observableOfObservables = new TestObservable>(w1, w2); - OnSubscribeFunc concatF = concat(observableOfObservables); + TestObservable> observableOfObservables = new TestObservable>(Observable.create(w1), Observable.create(w2)); + OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables)); Observable concat = Observable.create(concatF); @@ -485,8 +485,8 @@ public void testConcatNonBlockingObservables() { @Override public Subscription onSubscribe(Observer> observer) { // simulate what would happen in an observable - observer.onNext(w1); - observer.onNext(w2); + observer.onNext(Observable.create(w1)); + observer.onNext(Observable.create(w2)); observer.onCompleted(); return new Subscription() { @@ -540,7 +540,7 @@ public void testConcatUnsubscribe() { @SuppressWarnings("unchecked") final Observer aObserver = mock(Observer.class); @SuppressWarnings("unchecked") - final Observable concat = Observable.create(concat(w1, w2)); + final Observable concat = Observable.create(concat(Observable.create(w1), Observable.create(w2))); final SafeObservableSubscription s1 = new SafeObservableSubscription(); try { @@ -583,8 +583,8 @@ public void testConcatUnsubscribeConcurrent() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @SuppressWarnings("unchecked") - TestObservable> observableOfObservables = new TestObservable>(w1, w2); - OnSubscribeFunc concatF = concat(observableOfObservables); + TestObservable> observableOfObservables = new TestObservable>(Observable.create(w1), Observable.create(w2)); + OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables)); Observable concat = Observable.create(concatF); @@ -616,7 +616,7 @@ public void testConcatUnsubscribeConcurrent() { verify(aObserver, never()).onError(any(Throwable.class)); } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { private final Subscription s = new Subscription() { @@ -656,7 +656,7 @@ public TestObservable(T seed, int size) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { t = new Thread(new Runnable() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java index c88bbab972..7d906720fa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java @@ -93,7 +93,7 @@ public void testMaterialize1() { final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three"); TestObserver Observer = new TestObserver(); - Observable> m = Observable.create(materialize(o1)); + Observable> m = Observable.create(materialize(Observable.create(o1))); m.subscribe(Observer); try { @@ -118,7 +118,7 @@ public void testMaterialize2() { final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three"); TestObserver Observer = new TestObserver(); - Observable> m = Observable.create(materialize(o1)); + Observable> m = Observable.create(materialize(Observable.create(o1))); m.subscribe(Observer); try { @@ -143,7 +143,7 @@ public void testMaterialize2() { public void testMultipleSubscribes() throws InterruptedException, ExecutionException { final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three"); - Observable> m = Observable.create(materialize(o)); + Observable> m = Observable.create(materialize(Observable.create(o))); assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size()); assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size()); @@ -174,7 +174,7 @@ public void onNext(Notification value) { } - private static class TestAsyncErrorObservable extends Observable { + private static class TestAsyncErrorObservable implements OnSubscribeFunc { String[] valuesToReturn; @@ -185,7 +185,7 @@ private static class TestAsyncErrorObservable extends Observable { volatile Thread t; @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { t = new Thread(new Runnable() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 059701aae4..34b7330fd2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -292,8 +292,8 @@ public void before() { @Test public void testMergeObservableOfObservables() { - final Observable o1 = new TestSynchronousObservable(); - final Observable o2 = new TestSynchronousObservable(); + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); Observable> observableOfObservables = Observable.create(new OnSubscribeFunc>() { @@ -325,8 +325,8 @@ public void unsubscribe() { @Test public void testMergeArray() { - final Observable o1 = new TestSynchronousObservable(); - final Observable o2 = new TestSynchronousObservable(); + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); @SuppressWarnings("unchecked") Observable m = Observable.create(merge(o1, o2)); @@ -339,8 +339,8 @@ public void testMergeArray() { @Test public void testMergeList() { - final Observable o1 = new TestSynchronousObservable(); - final Observable o2 = new TestSynchronousObservable(); + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); List> listOfObservables = new ArrayList>(); listOfObservables.add(o1); listOfObservables.add(o2); @@ -359,7 +359,7 @@ public void testUnSubscribe() { TestObservable tB = new TestObservable(); @SuppressWarnings("unchecked") - Observable m = Observable.create(merge(tA, tB)); + Observable m = Observable.create(merge(Observable.create(tA), Observable.create(tB))); Subscription s = m.subscribe(stringObserver); tA.sendOnNext("Aone"); @@ -386,7 +386,7 @@ public void testMergeArrayWithThreading() { final TestASynchronousObservable o2 = new TestASynchronousObservable(); @SuppressWarnings("unchecked") - Observable m = Observable.create(merge(o1, o2)); + Observable m = Observable.create(merge(Observable.create(o1), Observable.create(o2))); m.subscribe(stringObserver); try { @@ -413,7 +413,7 @@ public void testSynchronizationOfMultipleSequences() throws Throwable { final AtomicInteger totalCounter = new AtomicInteger(); @SuppressWarnings("unchecked") - Observable m = Observable.create(merge(o1, o2)); + Observable m = Observable.create(merge(Observable.create(o1), Observable.create(o2))); m.subscribe(new Observer() { @Override @@ -480,8 +480,8 @@ public void onNext(String v) { @Test public void testError1() { // we are using synchronous execution to test this exactly rather than non-deterministic concurrent behavior - final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" - final Observable o2 = new TestErrorObservable("one", "two", "three"); // we expect to lose all of these since o1 is done first and fails + final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" + final Observable o2 = Observable.create(new TestErrorObservable("one", "two", "three")); // we expect to lose all of these since o1 is done first and fails @SuppressWarnings("unchecked") Observable m = Observable.create(merge(o1, o2)); @@ -503,10 +503,10 @@ public void testError1() { @Test public void testError2() { // we are using synchronous execution to test this exactly rather than non-deterministic concurrent behavior - final Observable o1 = new TestErrorObservable("one", "two", "three"); - final Observable o2 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" - final Observable o3 = new TestErrorObservable("seven", "eight", null);// we expect to lose all of these since o2 is done first and fails - final Observable o4 = new TestErrorObservable("nine");// we expect to lose all of these since o2 is done first and fails + final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three")); + final Observable o2 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" + final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight", null));// we expect to lose all of these since o2 is done first and fails + final Observable o4 = Observable.create(new TestErrorObservable("nine"));// we expect to lose all of these since o2 is done first and fails @SuppressWarnings("unchecked") Observable m = Observable.create(merge(o1, o2, o3, o4)); @@ -525,10 +525,10 @@ public void testError2() { verify(stringObserver, times(0)).onNext("nine"); } - private static class TestSynchronousObservable extends Observable { + private static class TestSynchronousObservable implements OnSubscribeFunc { @Override - public Subscription subscribe(Observer observer) { + public Subscription onSubscribe(Observer observer) { observer.onNext("hello"); observer.onCompleted(); @@ -544,12 +544,12 @@ public void unsubscribe() { } } - private static class TestASynchronousObservable extends Observable { + private static class TestASynchronousObservable implements OnSubscribeFunc { Thread t; final CountDownLatch onNextBeingSent = new CountDownLatch(1); @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { t = new Thread(new Runnable() { @Override @@ -578,7 +578,7 @@ public void unsubscribe() { /** * A Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens. */ - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { Observer observer = null; volatile boolean unsubscribed = false; @@ -609,13 +609,13 @@ public void sendOnError(Throwable e) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { this.observer = observer; return s; } } - private static class TestErrorObservable extends Observable { + private static class TestErrorObservable implements OnSubscribeFunc { String[] valuesToReturn; @@ -624,7 +624,7 @@ private static class TestErrorObservable extends Observable { } @Override - public Subscription subscribe(Observer observer) { + public Subscription onSubscribe(Observer observer) { for (String s : valuesToReturn) { if (s == null) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java index 78ed7dc18d..abb450dd4d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java @@ -354,8 +354,8 @@ public void before() { @Test public void testErrorDelayed1() { - final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called - final Observable o2 = new TestErrorObservable("one", "two", "three"); + final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called + final Observable o2 = Observable.create(new TestErrorObservable("one", "two", "three")); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2)); @@ -373,10 +373,10 @@ public void testErrorDelayed1() { @Test public void testErrorDelayed2() { - final Observable o1 = new TestErrorObservable("one", "two", "three"); - final Observable o2 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called - final Observable o3 = new TestErrorObservable("seven", "eight", null); - final Observable o4 = new TestErrorObservable("nine"); + final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three")); + final Observable o2 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called + final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight", null)); + final Observable o4 = Observable.create(new TestErrorObservable("nine")); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); @@ -397,10 +397,10 @@ public void testErrorDelayed2() { @Test public void testErrorDelayed3() { - final Observable o1 = new TestErrorObservable("one", "two", "three"); - final Observable o2 = new TestErrorObservable("four", "five", "six"); - final Observable o3 = new TestErrorObservable("seven", "eight", null); - final Observable o4 = new TestErrorObservable("nine"); + final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three")); + final Observable o2 = Observable.create(new TestErrorObservable("four", "five", "six")); + final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight", null)); + final Observable o4 = Observable.create(new TestErrorObservable("nine")); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); @@ -421,10 +421,10 @@ public void testErrorDelayed3() { @Test public void testErrorDelayed4() { - final Observable o1 = new TestErrorObservable("one", "two", "three"); - final Observable o2 = new TestErrorObservable("four", "five", "six"); - final Observable o3 = new TestErrorObservable("seven", "eight"); - final Observable o4 = new TestErrorObservable("nine", null); + final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three")); + final Observable o2 = Observable.create(new TestErrorObservable("four", "five", "six")); + final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight")); + final Observable o4 = Observable.create(new TestErrorObservable("nine", null)); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); @@ -452,7 +452,7 @@ public void testErrorDelayed4WithThreading() { final TestAsyncErrorObservable o4 = new TestAsyncErrorObservable("nine", null); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4)); + Observable m = Observable.create(mergeDelayError(Observable.create(o1), Observable.create(o2), Observable.create(o3), Observable.create(o4))); m.subscribe(stringObserver); try { @@ -479,8 +479,8 @@ public void testErrorDelayed4WithThreading() { @Test public void testCompositeErrorDelayed1() { - final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called - final Observable o2 = new TestErrorObservable("one", "two", null); + final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called + final Observable o2 = Observable.create(new TestErrorObservable("one", "two", null)); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2)); @@ -498,8 +498,8 @@ public void testCompositeErrorDelayed1() { @Test public void testCompositeErrorDelayed2() { - final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called - final Observable o2 = new TestErrorObservable("one", "two", null); + final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called + final Observable o2 = Observable.create(new TestErrorObservable("one", "two", null)); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2)); @@ -522,8 +522,8 @@ public void testCompositeErrorDelayed2() { @Test public void testMergeObservableOfObservables() { - final Observable o1 = new TestSynchronousObservable(); - final Observable o2 = new TestSynchronousObservable(); + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); Observable> observableOfObservables = Observable.create(new OnSubscribeFunc>() { @@ -555,8 +555,8 @@ public void unsubscribe() { @Test public void testMergeArray() { - final Observable o1 = new TestSynchronousObservable(); - final Observable o2 = new TestSynchronousObservable(); + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); @SuppressWarnings("unchecked") Observable m = Observable.create(mergeDelayError(o1, o2)); @@ -569,8 +569,8 @@ public void testMergeArray() { @Test public void testMergeList() { - final Observable o1 = new TestSynchronousObservable(); - final Observable o2 = new TestSynchronousObservable(); + final Observable o1 = Observable.create(new TestSynchronousObservable()); + final Observable o2 = Observable.create(new TestSynchronousObservable()); List> listOfObservables = new ArrayList>(); listOfObservables.add(o1); listOfObservables.add(o2); @@ -589,7 +589,7 @@ public void testUnSubscribe() { TestObservable tB = new TestObservable(); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(tA, tB)); + Observable m = Observable.create(mergeDelayError(Observable.create(tA), Observable.create(tB))); Subscription s = m.subscribe(stringObserver); tA.sendOnNext("Aone"); @@ -616,7 +616,7 @@ public void testMergeArrayWithThreading() { final TestASynchronousObservable o2 = new TestASynchronousObservable(); @SuppressWarnings("unchecked") - Observable m = Observable.create(mergeDelayError(o1, o2)); + Observable m = Observable.create(mergeDelayError(Observable.create(o1), Observable.create(o2))); m.subscribe(stringObserver); try { @@ -631,10 +631,10 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } - private static class TestSynchronousObservable extends Observable { + private static class TestSynchronousObservable implements OnSubscribeFunc { @Override - public Subscription subscribe(Observer observer) { + public Subscription onSubscribe(Observer observer) { observer.onNext("hello"); observer.onCompleted(); @@ -650,11 +650,11 @@ public void unsubscribe() { } } - private static class TestASynchronousObservable extends Observable { + private static class TestASynchronousObservable implements OnSubscribeFunc { Thread t; @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { t = new Thread(new Runnable() { @Override @@ -680,7 +680,7 @@ public void unsubscribe() { /** * A Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens. */ - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { Observer observer = null; volatile boolean unsubscribed = false; @@ -711,13 +711,13 @@ public void sendOnError(Throwable e) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { this.observer = observer; return s; } } - private static class TestErrorObservable extends Observable { + private static class TestErrorObservable implements OnSubscribeFunc { String[] valuesToReturn; @@ -726,7 +726,7 @@ private static class TestErrorObservable extends Observable { } @Override - public Subscription subscribe(Observer observer) { + public Subscription onSubscribe(Observer observer) { boolean errorThrown = false; for (String s : valuesToReturn) { if (s == null) { @@ -754,7 +754,7 @@ public void unsubscribe() { } } - private static class TestAsyncErrorObservable extends Observable { + private static class TestAsyncErrorObservable implements OnSubscribeFunc { String[] valuesToReturn; @@ -765,7 +765,7 @@ private static class TestAsyncErrorObservable extends Observable { Thread t; @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { t = new Thread(new Runnable() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java index 0522234e0a..427d15b816 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java @@ -27,6 +27,8 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; import rx.util.Exceptions; /** @@ -121,8 +123,7 @@ private T getRecentValue() { public static class UnitTest { @Test public void testMostRecent() { - Subscription s = mock(Subscription.class); - TestObservable observable = new TestObservable(s); + Subject observable = PublishSubject.create(); Iterator it = mostRecent(observable, "default").iterator(); @@ -130,25 +131,24 @@ public void testMostRecent() { assertEquals("default", it.next()); assertEquals("default", it.next()); - observable.sendOnNext("one"); + observable.onNext("one"); assertTrue(it.hasNext()); assertEquals("one", it.next()); assertEquals("one", it.next()); - observable.sendOnNext("two"); + observable.onNext("two"); assertTrue(it.hasNext()); assertEquals("two", it.next()); assertEquals("two", it.next()); - observable.sendOnCompleted(); + observable.onCompleted(); assertFalse(it.hasNext()); } @Test(expected = TestException.class) public void testMostRecentWithException() { - Subscription s = mock(Subscription.class); - TestObservable observable = new TestObservable(s); + Subject observable = PublishSubject.create(); Iterator it = mostRecent(observable, "default").iterator(); @@ -156,43 +156,12 @@ public void testMostRecentWithException() { assertEquals("default", it.next()); assertEquals("default", it.next()); - observable.sendOnError(new TestException()); + observable.onError(new TestException()); assertTrue(it.hasNext()); it.next(); } - private static class TestObservable extends Observable { - - Observer observer = null; - Subscription s; - - public TestObservable(Subscription s) { - this.s = s; - } - - /* used to simulate subscription */ - public void sendOnCompleted() { - observer.onCompleted(); - } - - /* used to simulate subscription */ - public void sendOnNext(String value) { - observer.onNext(value); - } - - /* used to simulate subscription */ - public void sendOnError(Throwable e) { - observer.onError(e); - } - - @Override - public Subscription subscribe(final Observer observer) { - this.observer = observer; - return s; - } - } - private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index 1550878432..e83cfdaa03 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -93,7 +93,7 @@ public static class UnitTest { @Test public void testMulticast() { - TestObservable source = new TestObservable(); + Subject source = PublishSubject.create(); ConnectableObservable multicasted = OperationMulticast.multicast(source, PublishSubject.create()); @@ -102,14 +102,14 @@ public void testMulticast() { Observer observer = mock(Observer.class); multicasted.subscribe(observer); - source.sendOnNext("one"); - source.sendOnNext("two"); + source.onNext("one"); + source.onNext("two"); multicasted.connect(); - source.sendOnNext("three"); - source.sendOnNext("four"); - source.sendOnCompleted(); + source.onNext("three"); + source.onNext("four"); + source.onCompleted(); verify(observer, never()).onNext("one"); verify(observer, never()).onNext("two"); @@ -121,7 +121,7 @@ public void testMulticast() { @Test public void testMulticastConnectTwice() { - TestObservable source = new TestObservable(); + Subject source = PublishSubject.create(); ConnectableObservable multicasted = OperationMulticast.multicast(source, PublishSubject.create()); @@ -130,13 +130,13 @@ public void testMulticastConnectTwice() { Observer observer = mock(Observer.class); multicasted.subscribe(observer); - source.sendOnNext("one"); + source.onNext("one"); multicasted.connect(); multicasted.connect(); - source.sendOnNext("two"); - source.sendOnCompleted(); + source.onNext("two"); + source.onCompleted(); verify(observer, never()).onNext("one"); verify(observer, times(1)).onNext("two"); @@ -146,7 +146,7 @@ public void testMulticastConnectTwice() { @Test public void testMulticastDisconnect() { - TestObservable source = new TestObservable(); + Subject source = PublishSubject.create(); ConnectableObservable multicasted = OperationMulticast.multicast(source, PublishSubject.create()); @@ -155,17 +155,17 @@ public void testMulticastDisconnect() { Observer observer = mock(Observer.class); multicasted.subscribe(observer); - source.sendOnNext("one"); + source.onNext("one"); Subscription connection = multicasted.connect(); - source.sendOnNext("two"); + source.onNext("two"); connection.unsubscribe(); - source.sendOnNext("three"); + source.onNext("three"); multicasted.connect(); - source.sendOnNext("four"); - source.sendOnCompleted(); + source.onNext("four"); + source.onCompleted(); verify(observer, never()).onNext("one"); verify(observer, times(1)).onNext("two"); @@ -175,67 +175,5 @@ public void testMulticastDisconnect() { } - - private static class TestObservable extends Observable { - - Observer observer = new Observer() { - @Override - public void onCompleted() { - // Do nothing - } - - @Override - public void onError(Throwable e) { - // Do nothing - } - - @Override - public void onNext(String args) { - // Do nothing - } - }; - Subscription s = new Subscription() { - @Override - public void unsubscribe() { - observer = new Observer() { - @Override - public void onCompleted() { - // Do nothing - } - - @Override - public void onError(Throwable e) { - // Do nothing - } - - @Override - public void onNext(String args) { - // Do nothing - } - }; - } - }; - - public TestObservable() { - } - - /* used to simulate subscription */ - public void sendOnCompleted() { - observer.onCompleted(); - } - - /* used to simulate subscription */ - public void sendOnNext(String value) { - observer.onNext(value); - } - - @Override - public Subscription subscribe(final Observer observer) { - this.observer = observer; - return s; - } - - } - } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index a9e68f28b3..5fcdc08a63 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -38,6 +38,8 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; import rx.util.Exceptions; @@ -174,8 +176,7 @@ public static class UnitTest { @Test public void testNext() throws Throwable { - Subscription s = mock(Subscription.class); - final TestObservable obs = new TestObservable(s); + Subject obs = PublishSubject.create(); Iterator it = next(obs).iterator(); @@ -183,27 +184,26 @@ public void testNext() throws Throwable { Future next = nextAsync(it); Thread.sleep(100); - obs.sendOnNext("one"); + obs.onNext("one"); assertEquals("one", next.get()); assertTrue(it.hasNext()); next = nextAsync(it); Thread.sleep(100); - obs.sendOnNext("two"); + obs.onNext("two"); assertEquals("two", next.get()); assertTrue(it.hasNext()); - obs.sendOnCompleted(); + obs.onCompleted(); assertFalse(it.hasNext()); } @Test(expected = TestException.class) public void testOnError() throws Throwable { - Subscription s = mock(Subscription.class); - final TestObservable obs = new TestObservable(s); + Subject obs = PublishSubject.create();; Iterator it = next(obs).iterator(); @@ -211,14 +211,14 @@ public void testOnError() throws Throwable { Future next = nextAsync(it); Thread.sleep(100); - obs.sendOnNext("one"); + obs.onNext("one"); assertEquals("one", next.get()); assertTrue(it.hasNext()); next = nextAsync(it); Thread.sleep(100); - obs.sendOnError(new TestException()); + obs.onError(new TestException()); try { next.get(); @@ -229,8 +229,7 @@ public void testOnError() throws Throwable { @Test public void testOnErrorViaHasNext() throws Throwable { - Subscription s = mock(Subscription.class); - final TestObservable obs = new TestObservable(s); + Subject obs = PublishSubject.create(); Iterator it = next(obs).iterator(); @@ -238,14 +237,14 @@ public void testOnErrorViaHasNext() throws Throwable { Future next = nextAsync(it); Thread.sleep(100); - obs.sendOnNext("one"); + obs.onNext("one"); assertEquals("one", next.get()); assertTrue(it.hasNext()); next = nextAsync(it); Thread.sleep(100); - obs.sendOnError(new TestException()); + obs.onError(new TestException()); // this should not throw an exception but instead just return false try { @@ -267,38 +266,6 @@ public String call() throws Exception { }); } - private static class TestObservable extends Observable { - - Observer observer = null; - Subscription s; - - public TestObservable(Subscription s) { - this.s = s; - } - - /* used to simulate subscription */ - public void sendOnCompleted() { - observer.onCompleted(); - } - - /* used to simulate subscription */ - public void sendOnNext(String value) { - observer.onNext(value); - } - - /* used to simulate subscription */ - public void sendOnError(Throwable e) { - observer.onError(e); - } - - @Override - public Subscription subscribe(final Observer observer) { - this.observer = observer; - return s; - } - - } - @SuppressWarnings("serial") private static class TestException extends RuntimeException { diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java index a752ec1f62..28bf8be311 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java @@ -177,7 +177,7 @@ public Observable call(Throwable t1) { } }; - Observable observable = Observable.create(onErrorResumeNextViaFunction(w, resume)); + Observable observable = Observable.create(onErrorResumeNextViaFunction(Observable.create(w), resume)); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -214,7 +214,7 @@ public Observable call(Throwable t1) { } }; - Observable observable = Observable.create(onErrorResumeNextViaFunction(w, resume)); + Observable observable = Observable.create(onErrorResumeNextViaFunction(Observable.create(w), resume)); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -234,7 +234,7 @@ public Observable call(Throwable t1) { verify(aObserver, times(0)).onCompleted(); } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { final Subscription s; final String[] values; @@ -246,7 +246,7 @@ public TestObservable(Subscription s, String... values) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java index 2c25ead1fa..3b49ce5f9c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java @@ -123,7 +123,8 @@ public static class UnitTest { public void testResumeNext() { Subscription s = mock(Subscription.class); // Trigger failure on second element - TestObservable w = new TestObservable(s, "one", "fail", "two", "three"); + TestObservable f = new TestObservable(s, "one", "fail", "two", "three"); + Observable w = Observable.create(f); Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume)); @@ -132,7 +133,7 @@ public void testResumeNext() { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -152,7 +153,8 @@ public void testMapResumeAsyncNext() { // Trigger multiple failures Observable w = Observable.from("one", "fail", "two", "three", "fail"); // Resume Observable is async - TestObservable resume = new TestObservable(sr, "twoResume", "threeResume"); + TestObservable f = new TestObservable(sr, "twoResume", "threeResume"); + Observable resume = Observable.create(f); // Introduce map function that fails intermittently (Map does not prevent this when the observer is a // rx.operator incl onErrorResumeNextViaObservable) @@ -172,7 +174,7 @@ public String call(String s) { observable.subscribe(aObserver); try { - resume.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -186,7 +188,7 @@ public String call(String s) { verify(aObserver, times(1)).onNext("threeResume"); } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { final Subscription s; final String[] values; @@ -198,7 +200,7 @@ public TestObservable(Subscription s, String... values) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java index 471de4ecdc..02a268c04a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java @@ -131,7 +131,8 @@ public static class UnitTest { @Test public void testResumeNext() { Subscription s = mock(Subscription.class); - TestObservable w = new TestObservable(s, "one"); + TestObservable f = new TestObservable(s, "one"); + Observable w = Observable.create(f); final AtomicReference capturedException = new AtomicReference(); Observable observable = Observable.create(onErrorReturn(w, new Func1() { @@ -149,7 +150,7 @@ public String call(Throwable e) { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -167,7 +168,8 @@ public String call(Throwable e) { @Test public void testFunctionThrowsError() { Subscription s = mock(Subscription.class); - TestObservable w = new TestObservable(s, "one"); + TestObservable f = new TestObservable(s, "one"); + Observable w = Observable.create(f); final AtomicReference capturedException = new AtomicReference(); Observable observable = Observable.create(onErrorReturn(w, new Func1() { @@ -185,7 +187,7 @@ public String call(Throwable e) { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -199,7 +201,7 @@ public String call(Throwable e) { assertNotNull(capturedException.get()); } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { final Subscription s; final String[] values; @@ -211,7 +213,7 @@ public TestObservable(Subscription s, String... values) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java index c77354220b..9642ed9269 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java @@ -130,7 +130,8 @@ public static class UnitTest { public void testResumeNextWithException() { Subscription s = mock(Subscription.class); // Trigger failure on second element - TestObservable w = new TestObservable(s, "one", "EXCEPTION", "two", "three"); + TestObservable f = new TestObservable(s, "one", "EXCEPTION", "two", "three"); + Observable w = Observable.create(f); Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onExceptionResumeNextViaObservable(w, resume)); @@ -139,7 +140,7 @@ public void testResumeNextWithException() { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -158,7 +159,8 @@ public void testResumeNextWithException() { public void testResumeNextWithRuntimeException() { Subscription s = mock(Subscription.class); // Trigger failure on second element - TestObservable w = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three"); + TestObservable f = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three"); + Observable w = Observable.create(f); Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onExceptionResumeNextViaObservable(w, resume)); @@ -167,7 +169,7 @@ public void testResumeNextWithRuntimeException() { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -186,7 +188,8 @@ public void testResumeNextWithRuntimeException() { public void testThrowablePassesThru() { Subscription s = mock(Subscription.class); // Trigger failure on second element - TestObservable w = new TestObservable(s, "one", "THROWABLE", "two", "three"); + TestObservable f = new TestObservable(s, "one", "THROWABLE", "two", "three"); + Observable w = Observable.create(f); Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onExceptionResumeNextViaObservable(w, resume)); @@ -195,7 +198,7 @@ public void testThrowablePassesThru() { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -214,7 +217,8 @@ public void testThrowablePassesThru() { public void testErrorPassesThru() { Subscription s = mock(Subscription.class); // Trigger failure on second element - TestObservable w = new TestObservable(s, "one", "ERROR", "two", "three"); + TestObservable f = new TestObservable(s, "one", "ERROR", "two", "three"); + Observable w = Observable.create(f); Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onExceptionResumeNextViaObservable(w, resume)); @@ -223,7 +227,7 @@ public void testErrorPassesThru() { observable.subscribe(aObserver); try { - w.t.join(); + f.t.join(); } catch (InterruptedException e) { fail(e.getMessage()); } @@ -244,7 +248,8 @@ public void testMapResumeAsyncNext() { // Trigger multiple failures Observable w = Observable.from("one", "fail", "two", "three", "fail"); // Resume Observable is async - TestObservable resume = new TestObservable(sr, "twoResume", "threeResume"); + TestObservable f = new TestObservable(sr, "twoResume", "threeResume"); + Observable resume = Observable.create(f); // Introduce map function that fails intermittently (Map does not prevent this when the observer is a // rx.operator incl onErrorResumeNextViaObservable) @@ -265,8 +270,8 @@ public String call(String s) { try { // if the thread gets started (which it shouldn't if it's working correctly) - if (resume.t != null) { - resume.t.join(); + if (f.t != null) { + f.t.join(); } } catch (InterruptedException e) { fail(e.getMessage()); @@ -281,7 +286,7 @@ public String call(String s) { verify(aObserver, times(1)).onCompleted(); } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { final Subscription s; final String[] values; @@ -293,7 +298,7 @@ public TestObservable(Subscription s, String... values) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java index 720c331880..129728c303 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java @@ -86,7 +86,7 @@ public static class UnitTest { @Test public void testOnCompletedAfterUnSubscribe() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -106,7 +106,7 @@ public void testOnCompletedAfterUnSubscribe() { @Test public void testOnNextAfterUnSubscribe() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -126,7 +126,7 @@ public void testOnNextAfterUnSubscribe() { @Test public void testOnErrorAfterUnSubscribe() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -146,7 +146,7 @@ public void testOnErrorAfterUnSubscribe() { @Test public void testOnNextAfterOnError() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -168,7 +168,7 @@ public void testOnNextAfterOnError() { @Test public void testOnCompletedAfterOnError() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -190,7 +190,7 @@ public void testOnCompletedAfterOnError() { @Test public void testOnNextAfterOnCompleted() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -213,7 +213,7 @@ public void testOnNextAfterOnCompleted() { @Test public void testOnErrorAfterOnCompleted() { TestObservable t = new TestObservable(null); - Observable st = Observable.create(synchronize(t)); + Observable st = Observable.create(synchronize(Observable.create(t))); @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -232,7 +232,7 @@ public void testOnErrorAfterOnCompleted() { /** * A Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens. */ - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { Observer observer = null; @@ -255,7 +255,7 @@ public void sendOnError(Throwable e) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { this.observer = observer; return new Subscription() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 49e06d5758..881071e5b6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -247,8 +247,9 @@ public void unsubscribe() @Test public void testUnsubscribeAfterTake() { - Subscription s = mock(Subscription.class); - TestObservable w = new TestObservable(s, "one", "two", "three"); + final Subscription s = mock(Subscription.class); + TestObservableFunc f = new TestObservableFunc(s, "one", "two", "three"); + Observable w = Observable.create(f); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -257,7 +258,7 @@ public void testUnsubscribeAfterTake() { // wait for the Observable to complete try { - w.t.join(); + f.t.join(); } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); @@ -272,19 +273,19 @@ public void testUnsubscribeAfterTake() { verifyNoMoreInteractions(aObserver); } - private static class TestObservable extends Observable { + private static class TestObservableFunc implements OnSubscribeFunc { final Subscription s; final String[] values; Thread t = null; - public TestObservable(Subscription s, String... values) { + public TestObservableFunc(Subscription s, String... values) { this.s = s; this.values = values; } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java b/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java index 5bfaffe657..ab2d6045a9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java @@ -157,7 +157,7 @@ public void testTakeUntil() { TestObservable other = new TestObservable(sOther); Observer result = mock(Observer.class); - Observable stringObservable = takeUntil(source, other); + Observable stringObservable = takeUntil(Observable.create(source), Observable.create(other)); stringObservable.subscribe(result); source.sendOnNext("one"); source.sendOnNext("two"); @@ -184,7 +184,7 @@ public void testTakeUntilSourceCompleted() { TestObservable other = new TestObservable(sOther); Observer result = mock(Observer.class); - Observable stringObservable = takeUntil(source, other); + Observable stringObservable = takeUntil(Observable.create(source), Observable.create(other)); stringObservable.subscribe(result); source.sendOnNext("one"); source.sendOnNext("two"); @@ -207,7 +207,7 @@ public void testTakeUntilSourceError() { Throwable error = new Throwable(); Observer result = mock(Observer.class); - Observable stringObservable = takeUntil(source, other); + Observable stringObservable = takeUntil(Observable.create(source), Observable.create(other)); stringObservable.subscribe(result); source.sendOnNext("one"); source.sendOnNext("two"); @@ -231,7 +231,7 @@ public void testTakeUntilOtherError() { Throwable error = new Throwable(); Observer result = mock(Observer.class); - Observable stringObservable = takeUntil(source, other); + Observable stringObservable = takeUntil(Observable.create(source), Observable.create(other)); stringObservable.subscribe(result); source.sendOnNext("one"); source.sendOnNext("two"); @@ -255,7 +255,7 @@ public void testTakeUntilOtherCompleted() { TestObservable other = new TestObservable(sOther); Observer result = mock(Observer.class); - Observable stringObservable = takeUntil(source, other); + Observable stringObservable = takeUntil(Observable.create(source), Observable.create(other)); stringObservable.subscribe(result); source.sendOnNext("one"); source.sendOnNext("two"); @@ -269,7 +269,7 @@ public void testTakeUntilOtherCompleted() { } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { Observer observer = null; Subscription s; @@ -294,7 +294,7 @@ public void sendOnError(Throwable e) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { this.observer = observer; return s; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index add3e37adc..5b833b22a1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -260,7 +260,7 @@ public void testTakeWhileProtectsPredicateCall() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(takeWhile(source, new Func1() + Observable take = Observable.create(takeWhile(Observable.create(source), new Func1() { @Override public Boolean call(String s) @@ -289,7 +289,7 @@ public void testUnsubscribeAfterTake() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + Observable take = Observable.create(takeWhileWithIndex(Observable.create(w), new Func2() { @Override public Boolean call(String s, Integer index) @@ -314,7 +314,7 @@ public Boolean call(String s, Integer index) verify(s, times(1)).unsubscribe(); } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { final Subscription s; final String[] values; @@ -326,7 +326,7 @@ public TestObservable(Subscription s, String... values) { } @Override - public Subscription subscribe(final Observer observer) { + public Subscription onSubscribe(final Observer observer) { System.out.println("TestObservable subscribed to ..."); t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 20ad035384..597fce7c89 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -393,7 +393,7 @@ public void testZippingDifferentLengthObservableSequences1() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable zipW = Observable.create(zip(w1, w2, w3, getConcat3StringsZipr())); + Observable zipW = Observable.create(zip(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsZipr())); zipW.subscribe(w); /* simulate sending data */ @@ -427,7 +427,7 @@ public void testZippingDifferentLengthObservableSequences2() { TestObservable w2 = new TestObservable(); TestObservable w3 = new TestObservable(); - Observable zipW = Observable.create(zip(w1, w2, w3, getConcat3StringsZipr())); + Observable zipW = Observable.create(zip(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsZipr())); zipW.subscribe(w); /* simulate sending data */ @@ -909,12 +909,12 @@ private static String getStringValue(Object o) { } } - private static class TestObservable extends Observable { + private static class TestObservable implements OnSubscribeFunc { Observer observer; @Override - public Subscription subscribe(Observer Observer) { + public Subscription onSubscribe(Observer Observer) { // just store the variable where it can be accessed so we can manually trigger it this.observer = Observer; return Subscriptions.empty();