diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index 724552c8f0..dad2ca7286 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -102,17 +102,15 @@
*
*
*
- * For more information see the RxJava
- * Wiki
+ * For more information see the RxJava Wiki
*
* @param
*/
public class Observable {
- //TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable")
-
- private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
-
+ /**
+ * Executed when 'subscribe' is invoked.
+ */
private final OnSubscribeFunc onSubscribe;
/**
@@ -125,7 +123,7 @@ public static interface OnSubscribeFunc extends Function {
public Subscription onSubscribe(Observer super T> t1);
}
-
+
/**
* Observable with Function to execute when subscribed to.
*
@@ -139,10 +137,8 @@ protected Observable(OnSubscribeFunc onSubscribe) {
this.onSubscribe = onSubscribe;
}
- protected Observable() {
- this(null);
- //TODO should this be made private to prevent it? It really serves no good purpose and only confuses things. Unit tests are incorrectly using it today
- }
+ private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
+
/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to
diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java
index 679be75a2b..4732d5fcad 100644
--- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java
+++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java
@@ -64,14 +64,6 @@ protected BlockingObservable(OnSubscribeFunc onSubscribe) {
super(onSubscribe);
}
- /**
- * Used to prevent public instantiation
- */
- @SuppressWarnings("unused")
- private BlockingObservable() {
- // prevent public instantiation
- }
-
/**
* Convert an Observable into a BlockingObservable.
*/
diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java
index 8d7325a736..020254d8a0 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java
@@ -362,7 +362,7 @@ public void testCombineLatestWithFunctionThatThrowsAnException() {
TestObservable w1 = new TestObservable();
TestObservable w2 = new TestObservable();
- Observable combined = Observable.create(combineLatest(w1, w2, new Func2() {
+ Observable combined = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), new Func2() {
@Override
public String call(String v1, String v2) {
throw new RuntimeException("I don't work.");
@@ -387,7 +387,7 @@ public void testCombineLatestDifferentLengthObservableSequences1() {
TestObservable w2 = new TestObservable();
TestObservable w3 = new TestObservable();
- Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
+ Observable combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);
/* simulate sending data */
@@ -425,7 +425,7 @@ public void testCombineLatestDifferentLengthObservableSequences2() {
TestObservable w2 = new TestObservable();
TestObservable w3 = new TestObservable();
- Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
+ Observable combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);
/* simulate sending data */
@@ -461,7 +461,7 @@ public void testCombineLatestWithInterleavingSequences() {
TestObservable w2 = new TestObservable();
TestObservable w3 = new TestObservable();
- Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
+ Observable combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);
/* simulate sending data */
@@ -936,12 +936,12 @@ private static String getStringValue(Object o) {
}
}
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
Observer super String> observer;
@Override
- public Subscription subscribe(Observer super String> observer) {
+ public Subscription onSubscribe(Observer super String> observer) {
// just store the variable where it can be accessed so we can manually trigger it
this.observer = observer;
return Subscriptions.empty();
diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java
index 873ba50a01..3a2d480608 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java
@@ -255,7 +255,7 @@ public void testSimpleAsyncConcat() {
TestObservable o1 = new TestObservable("one", "two", "three");
TestObservable o2 = new TestObservable("four", "five", "six");
- Observable.concat(o1, o2).subscribe(observer);
+ Observable.concat(Observable.create(o1), Observable.create(o2)).subscribe(observer);
try {
// wait for async observables to complete
@@ -301,12 +301,12 @@ public void run() {
// emit first
if (!s.isUnsubscribed()) {
System.out.println("Emit o1");
- observer.onNext(o1);
+ observer.onNext(Observable.create(o1));
}
// emit second
if (!s.isUnsubscribed()) {
System.out.println("Emit o2");
- observer.onNext(o2);
+ observer.onNext(Observable.create(o2));
}
// wait until sometime later and emit third
@@ -317,7 +317,7 @@ public void run() {
}
if (!s.isUnsubscribed()) {
System.out.println("Emit o3");
- observer.onNext(o3);
+ observer.onNext(Observable.create(o3));
}
} catch (Throwable e) {
@@ -404,7 +404,7 @@ public void testBlockedObservableOfObservables() {
final CountDownLatch callOnce = new CountDownLatch(1);
final CountDownLatch okToContinue = new CountDownLatch(1);
TestObservable> observableOfObservables = new TestObservable>(callOnce, okToContinue, odds, even);
- OnSubscribeFunc concatF = concat(observableOfObservables);
+ OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables));
Observable concat = Observable.create(concatF);
concat.subscribe(observer);
try {
@@ -443,8 +443,8 @@ public void testConcatConcurrentWithInfinity() {
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
- TestObservable> observableOfObservables = new TestObservable>(w1, w2);
- OnSubscribeFunc concatF = concat(observableOfObservables);
+ TestObservable> observableOfObservables = new TestObservable>(Observable.create(w1), Observable.create(w2));
+ OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables));
Observable concat = Observable.create(concatF);
@@ -485,8 +485,8 @@ public void testConcatNonBlockingObservables() {
@Override
public Subscription onSubscribe(Observer super Observable> observer) {
// simulate what would happen in an observable
- observer.onNext(w1);
- observer.onNext(w2);
+ observer.onNext(Observable.create(w1));
+ observer.onNext(Observable.create(w2));
observer.onCompleted();
return new Subscription() {
@@ -540,7 +540,7 @@ public void testConcatUnsubscribe() {
@SuppressWarnings("unchecked")
final Observer aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
- final Observable concat = Observable.create(concat(w1, w2));
+ final Observable concat = Observable.create(concat(Observable.create(w1), Observable.create(w2)));
final SafeObservableSubscription s1 = new SafeObservableSubscription();
try {
@@ -583,8 +583,8 @@ public void testConcatUnsubscribeConcurrent() {
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
- TestObservable> observableOfObservables = new TestObservable>(w1, w2);
- OnSubscribeFunc concatF = concat(observableOfObservables);
+ TestObservable> observableOfObservables = new TestObservable>(Observable.create(w1), Observable.create(w2));
+ OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables));
Observable concat = Observable.create(concatF);
@@ -616,7 +616,7 @@ public void testConcatUnsubscribeConcurrent() {
verify(aObserver, never()).onError(any(Throwable.class));
}
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
private final Subscription s = new Subscription() {
@@ -656,7 +656,7 @@ public TestObservable(T seed, int size) {
}
@Override
- public Subscription subscribe(final Observer super T> observer) {
+ public Subscription onSubscribe(final Observer super T> observer) {
t = new Thread(new Runnable() {
@Override
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
index c88bbab972..7d906720fa 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
@@ -93,7 +93,7 @@ public void testMaterialize1() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
TestObserver Observer = new TestObserver();
- Observable> m = Observable.create(materialize(o1));
+ Observable> m = Observable.create(materialize(Observable.create(o1)));
m.subscribe(Observer);
try {
@@ -118,7 +118,7 @@ public void testMaterialize2() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");
TestObserver Observer = new TestObserver();
- Observable> m = Observable.create(materialize(o1));
+ Observable> m = Observable.create(materialize(Observable.create(o1)));
m.subscribe(Observer);
try {
@@ -143,7 +143,7 @@ public void testMaterialize2() {
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
- Observable> m = Observable.create(materialize(o));
+ Observable> m = Observable.create(materialize(Observable.create(o)));
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
@@ -174,7 +174,7 @@ public void onNext(Notification value) {
}
- private static class TestAsyncErrorObservable extends Observable {
+ private static class TestAsyncErrorObservable implements OnSubscribeFunc {
String[] valuesToReturn;
@@ -185,7 +185,7 @@ private static class TestAsyncErrorObservable extends Observable {
volatile Thread t;
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
t = new Thread(new Runnable() {
@Override
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java
index 059701aae4..34b7330fd2 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java
@@ -292,8 +292,8 @@ public void before() {
@Test
public void testMergeObservableOfObservables() {
- final Observable o1 = new TestSynchronousObservable();
- final Observable o2 = new TestSynchronousObservable();
+ final Observable o1 = Observable.create(new TestSynchronousObservable());
+ final Observable o2 = Observable.create(new TestSynchronousObservable());
Observable> observableOfObservables = Observable.create(new OnSubscribeFunc>() {
@@ -325,8 +325,8 @@ public void unsubscribe() {
@Test
public void testMergeArray() {
- final Observable o1 = new TestSynchronousObservable();
- final Observable o2 = new TestSynchronousObservable();
+ final Observable o1 = Observable.create(new TestSynchronousObservable());
+ final Observable o2 = Observable.create(new TestSynchronousObservable());
@SuppressWarnings("unchecked")
Observable m = Observable.create(merge(o1, o2));
@@ -339,8 +339,8 @@ public void testMergeArray() {
@Test
public void testMergeList() {
- final Observable o1 = new TestSynchronousObservable();
- final Observable o2 = new TestSynchronousObservable();
+ final Observable o1 = Observable.create(new TestSynchronousObservable());
+ final Observable o2 = Observable.create(new TestSynchronousObservable());
List> listOfObservables = new ArrayList>();
listOfObservables.add(o1);
listOfObservables.add(o2);
@@ -359,7 +359,7 @@ public void testUnSubscribe() {
TestObservable tB = new TestObservable();
@SuppressWarnings("unchecked")
- Observable m = Observable.create(merge(tA, tB));
+ Observable m = Observable.create(merge(Observable.create(tA), Observable.create(tB)));
Subscription s = m.subscribe(stringObserver);
tA.sendOnNext("Aone");
@@ -386,7 +386,7 @@ public void testMergeArrayWithThreading() {
final TestASynchronousObservable o2 = new TestASynchronousObservable();
@SuppressWarnings("unchecked")
- Observable m = Observable.create(merge(o1, o2));
+ Observable m = Observable.create(merge(Observable.create(o1), Observable.create(o2)));
m.subscribe(stringObserver);
try {
@@ -413,7 +413,7 @@ public void testSynchronizationOfMultipleSequences() throws Throwable {
final AtomicInteger totalCounter = new AtomicInteger();
@SuppressWarnings("unchecked")
- Observable m = Observable.create(merge(o1, o2));
+ Observable m = Observable.create(merge(Observable.create(o1), Observable.create(o2)));
m.subscribe(new Observer() {
@Override
@@ -480,8 +480,8 @@ public void onNext(String v) {
@Test
public void testError1() {
// we are using synchronous execution to test this exactly rather than non-deterministic concurrent behavior
- final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six"
- final Observable o2 = new TestErrorObservable("one", "two", "three"); // we expect to lose all of these since o1 is done first and fails
+ final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six"
+ final Observable o2 = Observable.create(new TestErrorObservable("one", "two", "three")); // we expect to lose all of these since o1 is done first and fails
@SuppressWarnings("unchecked")
Observable m = Observable.create(merge(o1, o2));
@@ -503,10 +503,10 @@ public void testError1() {
@Test
public void testError2() {
// we are using synchronous execution to test this exactly rather than non-deterministic concurrent behavior
- final Observable o1 = new TestErrorObservable("one", "two", "three");
- final Observable o2 = new TestErrorObservable("four", null, "six"); // we expect to lose "six"
- final Observable o3 = new TestErrorObservable("seven", "eight", null);// we expect to lose all of these since o2 is done first and fails
- final Observable o4 = new TestErrorObservable("nine");// we expect to lose all of these since o2 is done first and fails
+ final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three"));
+ final Observable o2 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six"
+ final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight", null));// we expect to lose all of these since o2 is done first and fails
+ final Observable o4 = Observable.create(new TestErrorObservable("nine"));// we expect to lose all of these since o2 is done first and fails
@SuppressWarnings("unchecked")
Observable m = Observable.create(merge(o1, o2, o3, o4));
@@ -525,10 +525,10 @@ public void testError2() {
verify(stringObserver, times(0)).onNext("nine");
}
- private static class TestSynchronousObservable extends Observable {
+ private static class TestSynchronousObservable implements OnSubscribeFunc {
@Override
- public Subscription subscribe(Observer super String> observer) {
+ public Subscription onSubscribe(Observer super String> observer) {
observer.onNext("hello");
observer.onCompleted();
@@ -544,12 +544,12 @@ public void unsubscribe() {
}
}
- private static class TestASynchronousObservable extends Observable {
+ private static class TestASynchronousObservable implements OnSubscribeFunc {
Thread t;
final CountDownLatch onNextBeingSent = new CountDownLatch(1);
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
t = new Thread(new Runnable() {
@Override
@@ -578,7 +578,7 @@ public void unsubscribe() {
/**
* A Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens.
*/
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
Observer super String> observer = null;
volatile boolean unsubscribed = false;
@@ -609,13 +609,13 @@ public void sendOnError(Throwable e) {
}
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
this.observer = observer;
return s;
}
}
- private static class TestErrorObservable extends Observable {
+ private static class TestErrorObservable implements OnSubscribeFunc {
String[] valuesToReturn;
@@ -624,7 +624,7 @@ private static class TestErrorObservable extends Observable {
}
@Override
- public Subscription subscribe(Observer super String> observer) {
+ public Subscription onSubscribe(Observer super String> observer) {
for (String s : valuesToReturn) {
if (s == null) {
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java
index 78ed7dc18d..abb450dd4d 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java
@@ -354,8 +354,8 @@ public void before() {
@Test
public void testErrorDelayed1() {
- final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
- final Observable o2 = new TestErrorObservable("one", "two", "three");
+ final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
+ final Observable o2 = Observable.create(new TestErrorObservable("one", "two", "three"));
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2));
@@ -373,10 +373,10 @@ public void testErrorDelayed1() {
@Test
public void testErrorDelayed2() {
- final Observable o1 = new TestErrorObservable("one", "two", "three");
- final Observable o2 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
- final Observable o3 = new TestErrorObservable("seven", "eight", null);
- final Observable o4 = new TestErrorObservable("nine");
+ final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three"));
+ final Observable o2 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
+ final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight", null));
+ final Observable o4 = Observable.create(new TestErrorObservable("nine"));
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4));
@@ -397,10 +397,10 @@ public void testErrorDelayed2() {
@Test
public void testErrorDelayed3() {
- final Observable o1 = new TestErrorObservable("one", "two", "three");
- final Observable o2 = new TestErrorObservable("four", "five", "six");
- final Observable o3 = new TestErrorObservable("seven", "eight", null);
- final Observable o4 = new TestErrorObservable("nine");
+ final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three"));
+ final Observable o2 = Observable.create(new TestErrorObservable("four", "five", "six"));
+ final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight", null));
+ final Observable o4 = Observable.create(new TestErrorObservable("nine"));
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4));
@@ -421,10 +421,10 @@ public void testErrorDelayed3() {
@Test
public void testErrorDelayed4() {
- final Observable o1 = new TestErrorObservable("one", "two", "three");
- final Observable o2 = new TestErrorObservable("four", "five", "six");
- final Observable o3 = new TestErrorObservable("seven", "eight");
- final Observable o4 = new TestErrorObservable("nine", null);
+ final Observable o1 = Observable.create(new TestErrorObservable("one", "two", "three"));
+ final Observable o2 = Observable.create(new TestErrorObservable("four", "five", "six"));
+ final Observable o3 = Observable.create(new TestErrorObservable("seven", "eight"));
+ final Observable o4 = Observable.create(new TestErrorObservable("nine", null));
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4));
@@ -452,7 +452,7 @@ public void testErrorDelayed4WithThreading() {
final TestAsyncErrorObservable o4 = new TestAsyncErrorObservable("nine", null);
@SuppressWarnings("unchecked")
- Observable m = Observable.create(mergeDelayError(o1, o2, o3, o4));
+ Observable m = Observable.create(mergeDelayError(Observable.create(o1), Observable.create(o2), Observable.create(o3), Observable.create(o4)));
m.subscribe(stringObserver);
try {
@@ -479,8 +479,8 @@ public void testErrorDelayed4WithThreading() {
@Test
public void testCompositeErrorDelayed1() {
- final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
- final Observable o2 = new TestErrorObservable("one", "two", null);
+ final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
+ final Observable o2 = Observable.create(new TestErrorObservable("one", "two", null));
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2));
@@ -498,8 +498,8 @@ public void testCompositeErrorDelayed1() {
@Test
public void testCompositeErrorDelayed2() {
- final Observable o1 = new TestErrorObservable("four", null, "six"); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
- final Observable o2 = new TestErrorObservable("one", "two", null);
+ final Observable o1 = Observable.create(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
+ final Observable o2 = Observable.create(new TestErrorObservable("one", "two", null));
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2));
@@ -522,8 +522,8 @@ public void testCompositeErrorDelayed2() {
@Test
public void testMergeObservableOfObservables() {
- final Observable o1 = new TestSynchronousObservable();
- final Observable o2 = new TestSynchronousObservable();
+ final Observable o1 = Observable.create(new TestSynchronousObservable());
+ final Observable o2 = Observable.create(new TestSynchronousObservable());
Observable> observableOfObservables = Observable.create(new OnSubscribeFunc>() {
@@ -555,8 +555,8 @@ public void unsubscribe() {
@Test
public void testMergeArray() {
- final Observable o1 = new TestSynchronousObservable();
- final Observable o2 = new TestSynchronousObservable();
+ final Observable o1 = Observable.create(new TestSynchronousObservable());
+ final Observable o2 = Observable.create(new TestSynchronousObservable());
@SuppressWarnings("unchecked")
Observable m = Observable.create(mergeDelayError(o1, o2));
@@ -569,8 +569,8 @@ public void testMergeArray() {
@Test
public void testMergeList() {
- final Observable o1 = new TestSynchronousObservable();
- final Observable o2 = new TestSynchronousObservable();
+ final Observable o1 = Observable.create(new TestSynchronousObservable());
+ final Observable o2 = Observable.create(new TestSynchronousObservable());
List> listOfObservables = new ArrayList>();
listOfObservables.add(o1);
listOfObservables.add(o2);
@@ -589,7 +589,7 @@ public void testUnSubscribe() {
TestObservable tB = new TestObservable();
@SuppressWarnings("unchecked")
- Observable m = Observable.create(mergeDelayError(tA, tB));
+ Observable m = Observable.create(mergeDelayError(Observable.create(tA), Observable.create(tB)));
Subscription s = m.subscribe(stringObserver);
tA.sendOnNext("Aone");
@@ -616,7 +616,7 @@ public void testMergeArrayWithThreading() {
final TestASynchronousObservable o2 = new TestASynchronousObservable();
@SuppressWarnings("unchecked")
- Observable m = Observable.create(mergeDelayError(o1, o2));
+ Observable m = Observable.create(mergeDelayError(Observable.create(o1), Observable.create(o2)));
m.subscribe(stringObserver);
try {
@@ -631,10 +631,10 @@ public void testMergeArrayWithThreading() {
verify(stringObserver, times(1)).onCompleted();
}
- private static class TestSynchronousObservable extends Observable {
+ private static class TestSynchronousObservable implements OnSubscribeFunc {
@Override
- public Subscription subscribe(Observer super String> observer) {
+ public Subscription onSubscribe(Observer super String> observer) {
observer.onNext("hello");
observer.onCompleted();
@@ -650,11 +650,11 @@ public void unsubscribe() {
}
}
- private static class TestASynchronousObservable extends Observable {
+ private static class TestASynchronousObservable implements OnSubscribeFunc {
Thread t;
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
t = new Thread(new Runnable() {
@Override
@@ -680,7 +680,7 @@ public void unsubscribe() {
/**
* A Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens.
*/
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
Observer super String> observer = null;
volatile boolean unsubscribed = false;
@@ -711,13 +711,13 @@ public void sendOnError(Throwable e) {
}
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
this.observer = observer;
return s;
}
}
- private static class TestErrorObservable extends Observable {
+ private static class TestErrorObservable implements OnSubscribeFunc {
String[] valuesToReturn;
@@ -726,7 +726,7 @@ private static class TestErrorObservable extends Observable {
}
@Override
- public Subscription subscribe(Observer super String> observer) {
+ public Subscription onSubscribe(Observer super String> observer) {
boolean errorThrown = false;
for (String s : valuesToReturn) {
if (s == null) {
@@ -754,7 +754,7 @@ public void unsubscribe() {
}
}
- private static class TestAsyncErrorObservable extends Observable {
+ private static class TestAsyncErrorObservable implements OnSubscribeFunc {
String[] valuesToReturn;
@@ -765,7 +765,7 @@ private static class TestAsyncErrorObservable extends Observable {
Thread t;
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
t = new Thread(new Runnable() {
@Override
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java
index 0522234e0a..427d15b816 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java
@@ -27,6 +27,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
+import rx.subjects.PublishSubject;
+import rx.subjects.Subject;
import rx.util.Exceptions;
/**
@@ -121,8 +123,7 @@ private T getRecentValue() {
public static class UnitTest {
@Test
public void testMostRecent() {
- Subscription s = mock(Subscription.class);
- TestObservable observable = new TestObservable(s);
+ Subject observable = PublishSubject.create();
Iterator it = mostRecent(observable, "default").iterator();
@@ -130,25 +131,24 @@ public void testMostRecent() {
assertEquals("default", it.next());
assertEquals("default", it.next());
- observable.sendOnNext("one");
+ observable.onNext("one");
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertEquals("one", it.next());
- observable.sendOnNext("two");
+ observable.onNext("two");
assertTrue(it.hasNext());
assertEquals("two", it.next());
assertEquals("two", it.next());
- observable.sendOnCompleted();
+ observable.onCompleted();
assertFalse(it.hasNext());
}
@Test(expected = TestException.class)
public void testMostRecentWithException() {
- Subscription s = mock(Subscription.class);
- TestObservable observable = new TestObservable(s);
+ Subject observable = PublishSubject.create();
Iterator it = mostRecent(observable, "default").iterator();
@@ -156,43 +156,12 @@ public void testMostRecentWithException() {
assertEquals("default", it.next());
assertEquals("default", it.next());
- observable.sendOnError(new TestException());
+ observable.onError(new TestException());
assertTrue(it.hasNext());
it.next();
}
- private static class TestObservable extends Observable {
-
- Observer super String> observer = null;
- Subscription s;
-
- public TestObservable(Subscription s) {
- this.s = s;
- }
-
- /* used to simulate subscription */
- public void sendOnCompleted() {
- observer.onCompleted();
- }
-
- /* used to simulate subscription */
- public void sendOnNext(String value) {
- observer.onNext(value);
- }
-
- /* used to simulate subscription */
- public void sendOnError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public Subscription subscribe(final Observer super String> observer) {
- this.observer = observer;
- return s;
- }
- }
-
private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java
index 1550878432..e83cfdaa03 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java
@@ -93,7 +93,7 @@ public static class UnitTest {
@Test
public void testMulticast() {
- TestObservable source = new TestObservable();
+ Subject source = PublishSubject.create();
ConnectableObservable multicasted = OperationMulticast.multicast(source,
PublishSubject.create());
@@ -102,14 +102,14 @@ public void testMulticast() {
Observer observer = mock(Observer.class);
multicasted.subscribe(observer);
- source.sendOnNext("one");
- source.sendOnNext("two");
+ source.onNext("one");
+ source.onNext("two");
multicasted.connect();
- source.sendOnNext("three");
- source.sendOnNext("four");
- source.sendOnCompleted();
+ source.onNext("three");
+ source.onNext("four");
+ source.onCompleted();
verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
@@ -121,7 +121,7 @@ public void testMulticast() {
@Test
public void testMulticastConnectTwice() {
- TestObservable source = new TestObservable();
+ Subject source = PublishSubject.create();
ConnectableObservable multicasted = OperationMulticast.multicast(source,
PublishSubject.create());
@@ -130,13 +130,13 @@ public void testMulticastConnectTwice() {
Observer observer = mock(Observer.class);
multicasted.subscribe(observer);
- source.sendOnNext("one");
+ source.onNext("one");
multicasted.connect();
multicasted.connect();
- source.sendOnNext("two");
- source.sendOnCompleted();
+ source.onNext("two");
+ source.onCompleted();
verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
@@ -146,7 +146,7 @@ public void testMulticastConnectTwice() {
@Test
public void testMulticastDisconnect() {
- TestObservable source = new TestObservable();
+ Subject source = PublishSubject.create();
ConnectableObservable multicasted = OperationMulticast.multicast(source,
PublishSubject.create());
@@ -155,17 +155,17 @@ public void testMulticastDisconnect() {
Observer observer = mock(Observer.class);
multicasted.subscribe(observer);
- source.sendOnNext("one");
+ source.onNext("one");
Subscription connection = multicasted.connect();
- source.sendOnNext("two");
+ source.onNext("two");
connection.unsubscribe();
- source.sendOnNext("three");
+ source.onNext("three");
multicasted.connect();
- source.sendOnNext("four");
- source.sendOnCompleted();
+ source.onNext("four");
+ source.onCompleted();
verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
@@ -175,67 +175,5 @@ public void testMulticastDisconnect() {
}
-
- private static class TestObservable extends Observable {
-
- Observer super String> observer = new Observer() {
- @Override
- public void onCompleted() {
- // Do nothing
- }
-
- @Override
- public void onError(Throwable e) {
- // Do nothing
- }
-
- @Override
- public void onNext(String args) {
- // Do nothing
- }
- };
- Subscription s = new Subscription() {
- @Override
- public void unsubscribe() {
- observer = new Observer() {
- @Override
- public void onCompleted() {
- // Do nothing
- }
-
- @Override
- public void onError(Throwable e) {
- // Do nothing
- }
-
- @Override
- public void onNext(String args) {
- // Do nothing
- }
- };
- }
- };
-
- public TestObservable() {
- }
-
- /* used to simulate subscription */
- public void sendOnCompleted() {
- observer.onCompleted();
- }
-
- /* used to simulate subscription */
- public void sendOnNext(String value) {
- observer.onNext(value);
- }
-
- @Override
- public Subscription subscribe(final Observer super String> observer) {
- this.observer = observer;
- return s;
- }
-
- }
-
}
}
diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java
index a9e68f28b3..5fcdc08a63 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationNext.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java
@@ -38,6 +38,8 @@
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
+import rx.subjects.PublishSubject;
+import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;
@@ -174,8 +176,7 @@ public static class UnitTest {
@Test
public void testNext() throws Throwable {
- Subscription s = mock(Subscription.class);
- final TestObservable obs = new TestObservable(s);
+ Subject obs = PublishSubject.create();
Iterator it = next(obs).iterator();
@@ -183,27 +184,26 @@ public void testNext() throws Throwable {
Future next = nextAsync(it);
Thread.sleep(100);
- obs.sendOnNext("one");
+ obs.onNext("one");
assertEquals("one", next.get());
assertTrue(it.hasNext());
next = nextAsync(it);
Thread.sleep(100);
- obs.sendOnNext("two");
+ obs.onNext("two");
assertEquals("two", next.get());
assertTrue(it.hasNext());
- obs.sendOnCompleted();
+ obs.onCompleted();
assertFalse(it.hasNext());
}
@Test(expected = TestException.class)
public void testOnError() throws Throwable {
- Subscription s = mock(Subscription.class);
- final TestObservable obs = new TestObservable(s);
+ Subject obs = PublishSubject.create();;
Iterator it = next(obs).iterator();
@@ -211,14 +211,14 @@ public void testOnError() throws Throwable {
Future next = nextAsync(it);
Thread.sleep(100);
- obs.sendOnNext("one");
+ obs.onNext("one");
assertEquals("one", next.get());
assertTrue(it.hasNext());
next = nextAsync(it);
Thread.sleep(100);
- obs.sendOnError(new TestException());
+ obs.onError(new TestException());
try {
next.get();
@@ -229,8 +229,7 @@ public void testOnError() throws Throwable {
@Test
public void testOnErrorViaHasNext() throws Throwable {
- Subscription s = mock(Subscription.class);
- final TestObservable obs = new TestObservable(s);
+ Subject obs = PublishSubject.create();
Iterator it = next(obs).iterator();
@@ -238,14 +237,14 @@ public void testOnErrorViaHasNext() throws Throwable {
Future next = nextAsync(it);
Thread.sleep(100);
- obs.sendOnNext("one");
+ obs.onNext("one");
assertEquals("one", next.get());
assertTrue(it.hasNext());
next = nextAsync(it);
Thread.sleep(100);
- obs.sendOnError(new TestException());
+ obs.onError(new TestException());
// this should not throw an exception but instead just return false
try {
@@ -267,38 +266,6 @@ public String call() throws Exception {
});
}
- private static class TestObservable extends Observable {
-
- Observer super String> observer = null;
- Subscription s;
-
- public TestObservable(Subscription s) {
- this.s = s;
- }
-
- /* used to simulate subscription */
- public void sendOnCompleted() {
- observer.onCompleted();
- }
-
- /* used to simulate subscription */
- public void sendOnNext(String value) {
- observer.onNext(value);
- }
-
- /* used to simulate subscription */
- public void sendOnError(Throwable e) {
- observer.onError(e);
- }
-
- @Override
- public Subscription subscribe(final Observer super String> observer) {
- this.observer = observer;
- return s;
- }
-
- }
-
@SuppressWarnings("serial")
private static class TestException extends RuntimeException {
diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java
index a752ec1f62..28bf8be311 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java
@@ -177,7 +177,7 @@ public Observable call(Throwable t1) {
}
};
- Observable observable = Observable.create(onErrorResumeNextViaFunction(w, resume));
+ Observable observable = Observable.create(onErrorResumeNextViaFunction(Observable.create(w), resume));
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
@@ -214,7 +214,7 @@ public Observable call(Throwable t1) {
}
};
- Observable observable = Observable.create(onErrorResumeNextViaFunction(w, resume));
+ Observable observable = Observable.create(onErrorResumeNextViaFunction(Observable.create(w), resume));
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
@@ -234,7 +234,7 @@ public Observable call(Throwable t1) {
verify(aObserver, times(0)).onCompleted();
}
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
final Subscription s;
final String[] values;
@@ -246,7 +246,7 @@ public TestObservable(Subscription s, String... values) {
}
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {
diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java
index 2c25ead1fa..3b49ce5f9c 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java
@@ -123,7 +123,8 @@ public static class UnitTest {
public void testResumeNext() {
Subscription s = mock(Subscription.class);
// Trigger failure on second element
- TestObservable w = new TestObservable(s, "one", "fail", "two", "three");
+ TestObservable f = new TestObservable(s, "one", "fail", "two", "three");
+ Observable w = Observable.create(f);
Observable resume = Observable.from("twoResume", "threeResume");
Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume));
@@ -132,7 +133,7 @@ public void testResumeNext() {
observable.subscribe(aObserver);
try {
- w.t.join();
+ f.t.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
@@ -152,7 +153,8 @@ public void testMapResumeAsyncNext() {
// Trigger multiple failures
Observable w = Observable.from("one", "fail", "two", "three", "fail");
// Resume Observable is async
- TestObservable resume = new TestObservable(sr, "twoResume", "threeResume");
+ TestObservable f = new TestObservable(sr, "twoResume", "threeResume");
+ Observable resume = Observable.create(f);
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
// rx.operator incl onErrorResumeNextViaObservable)
@@ -172,7 +174,7 @@ public String call(String s) {
observable.subscribe(aObserver);
try {
- resume.t.join();
+ f.t.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
@@ -186,7 +188,7 @@ public String call(String s) {
verify(aObserver, times(1)).onNext("threeResume");
}
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
final Subscription s;
final String[] values;
@@ -198,7 +200,7 @@ public TestObservable(Subscription s, String... values) {
}
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {
diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java
index 471de4ecdc..02a268c04a 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java
@@ -131,7 +131,8 @@ public static class UnitTest {
@Test
public void testResumeNext() {
Subscription s = mock(Subscription.class);
- TestObservable w = new TestObservable(s, "one");
+ TestObservable f = new TestObservable(s, "one");
+ Observable w = Observable.create(f);
final AtomicReference capturedException = new AtomicReference();
Observable observable = Observable.create(onErrorReturn(w, new Func1() {
@@ -149,7 +150,7 @@ public String call(Throwable e) {
observable.subscribe(aObserver);
try {
- w.t.join();
+ f.t.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
@@ -167,7 +168,8 @@ public String call(Throwable e) {
@Test
public void testFunctionThrowsError() {
Subscription s = mock(Subscription.class);
- TestObservable w = new TestObservable(s, "one");
+ TestObservable f = new TestObservable(s, "one");
+ Observable w = Observable.create(f);
final AtomicReference capturedException = new AtomicReference();
Observable observable = Observable.create(onErrorReturn(w, new Func1() {
@@ -185,7 +187,7 @@ public String call(Throwable e) {
observable.subscribe(aObserver);
try {
- w.t.join();
+ f.t.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
@@ -199,7 +201,7 @@ public String call(Throwable e) {
assertNotNull(capturedException.get());
}
- private static class TestObservable extends Observable {
+ private static class TestObservable implements OnSubscribeFunc {
final Subscription s;
final String[] values;
@@ -211,7 +213,7 @@ public TestObservable(Subscription s, String... values) {
}
@Override
- public Subscription subscribe(final Observer super String> observer) {
+ public Subscription onSubscribe(final Observer super String> observer) {
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {
diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java
index c77354220b..9642ed9269 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationOnExceptionResumeNextViaObservable.java
@@ -130,7 +130,8 @@ public static class UnitTest {
public void testResumeNextWithException() {
Subscription s = mock(Subscription.class);
// Trigger failure on second element
- TestObservable w = new TestObservable(s, "one", "EXCEPTION", "two", "three");
+ TestObservable f = new TestObservable(s, "one", "EXCEPTION", "two", "three");
+ Observable w = Observable.create(f);
Observable resume = Observable.from("twoResume", "threeResume");
Observable observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
@@ -139,7 +140,7 @@ public void testResumeNextWithException() {
observable.subscribe(aObserver);
try {
- w.t.join();
+ f.t.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
@@ -158,7 +159,8 @@ public void testResumeNextWithException() {
public void testResumeNextWithRuntimeException() {
Subscription s = mock(Subscription.class);
// Trigger failure on second element
- TestObservable w = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three");
+ TestObservable f = new TestObservable(s, "one", "RUNTIMEEXCEPTION", "two", "three");
+ Observable w = Observable.create(f);
Observable resume = Observable.from("twoResume", "threeResume");
Observable observable = Observable.create(onExceptionResumeNextViaObservable(w, resume));
@@ -167,7 +169,7 @@ public void testResumeNextWithRuntimeException() {
observable.subscribe(aObserver);
try {
- w.t.join();
+ f.t.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
@@ -186,7 +188,8 @@ public void testResumeNextWithRuntimeException() {
public void testThrowablePassesThru() {
Subscription s = mock(Subscription.class);
// Trigger failure on second element
- TestObservable w = new TestObservable(s, "one", "THROWABLE", "two", "three");
+ TestObservable f = new TestObservable(s, "one", "THROWABLE", "two", "three");
+ Observable