Skip to content

Remove unnecessary Observable constructor #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,15 @@
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/legend.png">
* <p>
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava
* Wiki</a>
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
*
* @param <T>
*/
public class Observable<T> {

//TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable")

private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

/**
* Executed when 'subscribe' is invoked.
*/
private final OnSubscribeFunc<T> onSubscribe;

/**
Expand All @@ -125,7 +123,7 @@ public static interface OnSubscribeFunc<T> extends Function {
public Subscription onSubscribe(Observer<? super T> t1);

}

/**
* Observable with Function to execute when subscribed to.
* <p>
Expand All @@ -139,10 +137,8 @@ protected Observable(OnSubscribeFunc<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

protected Observable() {
this(null);
//TODO should this be made private to prevent it? It really serves no good purpose and only confuses things. Unit tests are incorrectly using it today
}
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();


/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ protected BlockingObservable(OnSubscribeFunc<T> onSubscribe) {
super(onSubscribe);
}

/**
* Used to prevent public instantiation
*/
@SuppressWarnings("unused")
private BlockingObservable() {
// prevent public instantiation
}

/**
* Convert an Observable into a BlockingObservable.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void testCombineLatestWithFunctionThatThrowsAnException() {
TestObservable w1 = new TestObservable();
TestObservable w2 = new TestObservable();

Observable<String> combined = Observable.create(combineLatest(w1, w2, new Func2<String, String, String>() {
Observable<String> combined = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), new Func2<String, String, String>() {
@Override
public String call(String v1, String v2) {
throw new RuntimeException("I don't work.");
Expand All @@ -387,7 +387,7 @@ public void testCombineLatestDifferentLengthObservableSequences1() {
TestObservable w2 = new TestObservable();
TestObservable w3 = new TestObservable();

Observable<String> combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);

/* simulate sending data */
Expand Down Expand Up @@ -425,7 +425,7 @@ public void testCombineLatestDifferentLengthObservableSequences2() {
TestObservable w2 = new TestObservable();
TestObservable w3 = new TestObservable();

Observable<String> combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);

/* simulate sending data */
Expand Down Expand Up @@ -461,7 +461,7 @@ public void testCombineLatestWithInterleavingSequences() {
TestObservable w2 = new TestObservable();
TestObservable w3 = new TestObservable();

Observable<String> combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
combineLatestW.subscribe(w);

/* simulate sending data */
Expand Down Expand Up @@ -936,12 +936,12 @@ private static String getStringValue(Object o) {
}
}

private static class TestObservable extends Observable<String> {
private static class TestObservable implements OnSubscribeFunc<String> {

Observer<? super String> observer;

@Override
public Subscription subscribe(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
// just store the variable where it can be accessed so we can manually trigger it
this.observer = observer;
return Subscriptions.empty();
Expand Down
28 changes: 14 additions & 14 deletions rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testSimpleAsyncConcat() {
TestObservable<String> o1 = new TestObservable<String>("one", "two", "three");
TestObservable<String> o2 = new TestObservable<String>("four", "five", "six");

Observable.concat(o1, o2).subscribe(observer);
Observable.concat(Observable.create(o1), Observable.create(o2)).subscribe(observer);

try {
// wait for async observables to complete
Expand Down Expand Up @@ -301,12 +301,12 @@ public void run() {
// emit first
if (!s.isUnsubscribed()) {
System.out.println("Emit o1");
observer.onNext(o1);
observer.onNext(Observable.create(o1));
}
// emit second
if (!s.isUnsubscribed()) {
System.out.println("Emit o2");
observer.onNext(o2);
observer.onNext(Observable.create(o2));
}

// wait until sometime later and emit third
Expand All @@ -317,7 +317,7 @@ public void run() {
}
if (!s.isUnsubscribed()) {
System.out.println("Emit o3");
observer.onNext(o3);
observer.onNext(Observable.create(o3));
}

} catch (Throwable e) {
Expand Down Expand Up @@ -404,7 +404,7 @@ public void testBlockedObservableOfObservables() {
final CountDownLatch callOnce = new CountDownLatch(1);
final CountDownLatch okToContinue = new CountDownLatch(1);
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even);
OnSubscribeFunc<String> concatF = concat(observableOfObservables);
OnSubscribeFunc<String> concatF = concat(Observable.create(observableOfObservables));
Observable<String> concat = Observable.create(concatF);
concat.subscribe(observer);
try {
Expand Down Expand Up @@ -443,8 +443,8 @@ public void testConcatConcurrentWithInfinity() {
@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w1, w2);
OnSubscribeFunc<String> concatF = concat(observableOfObservables);
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(Observable.create(w1), Observable.create(w2));
OnSubscribeFunc<String> concatF = concat(Observable.create(observableOfObservables));

Observable<String> concat = Observable.create(concatF);

Expand Down Expand Up @@ -485,8 +485,8 @@ public void testConcatNonBlockingObservables() {
@Override
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
// simulate what would happen in an observable
observer.onNext(w1);
observer.onNext(w2);
observer.onNext(Observable.create(w1));
observer.onNext(Observable.create(w2));
observer.onCompleted();

return new Subscription() {
Expand Down Expand Up @@ -540,7 +540,7 @@ public void testConcatUnsubscribe() {
@SuppressWarnings("unchecked")
final Observer<String> aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
final Observable<String> concat = Observable.create(concat(w1, w2));
final Observable<String> concat = Observable.create(concat(Observable.create(w1), Observable.create(w2)));
final SafeObservableSubscription s1 = new SafeObservableSubscription();

try {
Expand Down Expand Up @@ -583,8 +583,8 @@ public void testConcatUnsubscribeConcurrent() {
@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w1, w2);
OnSubscribeFunc<String> concatF = concat(observableOfObservables);
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(Observable.create(w1), Observable.create(w2));
OnSubscribeFunc<String> concatF = concat(Observable.create(observableOfObservables));

Observable<String> concat = Observable.create(concatF);

Expand Down Expand Up @@ -616,7 +616,7 @@ public void testConcatUnsubscribeConcurrent() {
verify(aObserver, never()).onError(any(Throwable.class));
}

private static class TestObservable<T> extends Observable<T> {
private static class TestObservable<T> implements OnSubscribeFunc<T> {

private final Subscription s = new Subscription() {

Expand Down Expand Up @@ -656,7 +656,7 @@ public TestObservable(T seed, int size) {
}

@Override
public Subscription subscribe(final Observer<? super T> observer) {
public Subscription onSubscribe(final Observer<? super T> observer) {
t = new Thread(new Runnable() {

@Override
Expand Down
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testMaterialize1() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");

TestObserver Observer = new TestObserver();
Observable<Notification<String>> m = Observable.create(materialize(o1));
Observable<Notification<String>> m = Observable.create(materialize(Observable.create(o1)));
m.subscribe(Observer);

try {
Expand All @@ -118,7 +118,7 @@ public void testMaterialize2() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");

TestObserver Observer = new TestObserver();
Observable<Notification<String>> m = Observable.create(materialize(o1));
Observable<Notification<String>> m = Observable.create(materialize(Observable.create(o1)));
m.subscribe(Observer);

try {
Expand All @@ -143,7 +143,7 @@ public void testMaterialize2() {
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");

Observable<Notification<String>> m = Observable.create(materialize(o));
Observable<Notification<String>> m = Observable.create(materialize(Observable.create(o)));

assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
Expand Down Expand Up @@ -174,7 +174,7 @@ public void onNext(Notification<String> value) {

}

private static class TestAsyncErrorObservable extends Observable<String> {
private static class TestAsyncErrorObservable implements OnSubscribeFunc<String> {

String[] valuesToReturn;

Expand All @@ -185,7 +185,7 @@ private static class TestAsyncErrorObservable extends Observable<String> {
volatile Thread t;

@Override
public Subscription subscribe(final Observer<? super String> observer) {
public Subscription onSubscribe(final Observer<? super String> observer) {
t = new Thread(new Runnable() {

@Override
Expand Down
Loading