Skip to content

Commit c22d0bb

Browse files
committed
Merge pull request ReactiveX#67 from omo/observeon-first
Fix observeOn() invocation order.
2 parents 0893101 + 7be64f3 commit c22d0bb

File tree

5 files changed

+86
-1
lines changed

5 files changed

+86
-1
lines changed

rxandroid/src/main/java/rx/android/content/ContentObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public Boolean call(android.support.v4.app.Fragment fragment) {
6565
*/
6666
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> source) {
6767
Assertions.assertUiThread();
68-
return source.lift(new OperatorConditionalBinding<T, Activity>(activity, ACTIVITY_VALIDATOR)).observeOn(mainThread());
68+
return source.observeOn(mainThread()).lift(new OperatorConditionalBinding<T, Activity>(activity, ACTIVITY_VALIDATOR));
6969
}
7070

7171
/**

rxandroid/src/main/java/rx/android/content/OperatorConditionalBinding.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import rx.Observable;
1717
import rx.Subscriber;
18+
import rx.android.internal.Assertions;
1819
import rx.functions.Func1;
1920
import rx.internal.util.UtilityFunctions;
2021

@@ -53,6 +54,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
5354

5455
@Override
5556
public void onCompleted() {
57+
Assertions.assertUiThread();
5658
if (shouldForwardNotification()) {
5759
child.onCompleted();
5860
} else {
@@ -62,6 +64,7 @@ public void onCompleted() {
6264

6365
@Override
6466
public void onError(Throwable e) {
67+
Assertions.assertUiThread();
6568
if (shouldForwardNotification()) {
6669
child.onError(e);
6770
} else {
@@ -71,6 +74,7 @@ public void onError(Throwable e) {
7174

7275
@Override
7376
public void onNext(T t) {
77+
Assertions.assertUiThread();
7478
if (shouldForwardNotification()) {
7579
child.onNext(t);
7680
} else {

rxandroid/src/test/java/rx/android/TestUtil.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,18 @@
1616
import android.view.View;
1717

1818
import org.robolectric.Robolectric;
19+
import org.robolectric.util.Scheduler;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.Executors;
23+
24+
import rx.Observable;
25+
import rx.Subscriber;
1926

2027
public class TestUtil {
2128

29+
static public final String STRING_EXPECTATION = "Hello";
30+
2231
private TestUtil() {
2332
throw new AssertionError("Utility class");
2433
}
@@ -27,4 +36,20 @@ public static View createView() {
2736
return new View(Robolectric.application);
2837
}
2938

39+
public static Observable<String> atBackgroundThread(final CountDownLatch done) {
40+
return Observable.create(new Observable.OnSubscribe<String>() {
41+
@Override
42+
public void call(final Subscriber<? super String> subscriber) {
43+
Executors.newSingleThreadExecutor().submit(new Runnable() {
44+
@Override
45+
public void run() {
46+
subscriber.onNext(STRING_EXPECTATION);
47+
subscriber.onCompleted();
48+
done.countDown();
49+
}
50+
});
51+
}
52+
});
53+
}
54+
3055
}

rxandroid/src/test/java/rx/android/content/ContentObservableTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@
2323
import org.robolectric.Robolectric;
2424
import org.robolectric.RobolectricTestRunner;
2525
import org.robolectric.annotation.Config;
26+
2627
import rx.Observable;
2728
import rx.Observer;
2829
import rx.android.content.ContentObservable;
30+
import rx.android.TestUtil;
2931
import rx.observers.TestObserver;
3032

3133
import android.app.Activity;
3234
import android.app.Fragment;
3335
import android.support.v4.app.FragmentActivity;
3436

3537
import java.util.concurrent.Callable;
38+
import java.util.concurrent.CountDownLatch;
3639
import java.util.concurrent.ExecutionException;
3740
import java.util.concurrent.Executors;
3841
import java.util.concurrent.Future;
@@ -101,6 +104,30 @@ public Object call() throws Exception {
101104
}
102105
}
103106

107+
@Test
108+
public void bindFragmentToSourceFromDifferentThread() throws InterruptedException {
109+
CountDownLatch done = new CountDownLatch(1);
110+
ContentObservable.bindFragment(fragment, TestUtil.atBackgroundThread(done)).subscribe(new TestObserver<String>(observer));
111+
done.await();
112+
113+
Robolectric.runUiThreadTasksIncludingDelayedTasks();
114+
115+
verify(observer).onNext(TestUtil.STRING_EXPECTATION);
116+
verify(observer).onCompleted();
117+
}
118+
119+
@Test
120+
public void bindSupportFragmentToSourceFromDifferentThread() throws InterruptedException {
121+
CountDownLatch done = new CountDownLatch(1);
122+
ContentObservable.bindFragment(supportFragment, TestUtil.atBackgroundThread(done)).subscribe(new TestObserver<String>(observer));
123+
done.await();
124+
125+
Robolectric.runUiThreadTasksIncludingDelayedTasks();
126+
127+
verify(observer).onNext(TestUtil.STRING_EXPECTATION);
128+
verify(observer).onCompleted();
129+
}
130+
104131
@Test(expected = IllegalStateException.class)
105132
public void itThrowsIfObserverCallsFromActivityFromBackgroundThread() throws Throwable {
106133
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@@ -116,4 +143,18 @@ public Object call() throws Exception {
116143
throw e.getCause();
117144
}
118145
}
146+
147+
@Test
148+
public void bindActivityToSourceFromDifferentThread() throws InterruptedException {
149+
CountDownLatch done = new CountDownLatch(1);
150+
ContentObservable.bindActivity(activity, TestUtil.atBackgroundThread(done)).subscribe(new TestObserver<String>(observer));
151+
done.await();
152+
153+
Robolectric.runUiThreadTasksIncludingDelayedTasks();
154+
155+
verify(observer).onNext(TestUtil.STRING_EXPECTATION);
156+
verify(observer).onCompleted();
157+
}
158+
159+
119160
}

rxandroid/src/test/java/rx/android/view/BindViewTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
import org.robolectric.RobolectricTestRunner;
1717
import org.robolectric.annotation.Config;
1818

19+
import java.util.concurrent.CountDownLatch;
20+
1921
import rx.Observer;
2022
import rx.Subscription;
23+
import rx.android.TestUtil;
2124
import rx.subjects.PublishSubject;
2225

2326
@RunWith(RobolectricTestRunner.class)
@@ -119,6 +122,18 @@ public void earlyUnsubscribeStopsNotifications() {
119122
verifyNoMoreInteractions(observer);
120123
}
121124

125+
@Test
126+
public void bindViewInDifferentThread() throws InterruptedException {
127+
CountDownLatch done = new CountDownLatch(1);
128+
ViewObservable.bindView(target, TestUtil.atBackgroundThread(done)).subscribe(observer);
129+
done.await();
130+
131+
Robolectric.runUiThreadTasksIncludingDelayedTasks();
132+
133+
verify(observer).onNext(TestUtil.STRING_EXPECTATION);
134+
verify(observer).onCompleted();
135+
}
136+
122137
@Test(expected = IllegalArgumentException.class)
123138
public void nullViewIsNotAllowed() {
124139
ViewObservable.bindView(null, subject);

0 commit comments

Comments
 (0)