Skip to content

Commit acaf42e

Browse files
author
Aaron Tull
committed
Implemented Completable#andThen(Observable)
1 parent 6aa760e commit acaf42e

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

src/main/java/rx/Completable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,21 @@ public final Completable compose(CompletableTransformer transformer) {
10801080
return to(transformer);
10811081
}
10821082

1083+
/**
1084+
* Returns an Observable which will subscribe to this Completable and once that is completed then
1085+
* will subscribe to the {@code next} Observable. An error event from this Completable will be
1086+
* propagated to the downstream subscriber and will result in skipping the subscription of the
1087+
* Observable.
1088+
*
1089+
* @param next the Observable to subscribe after this Completable is completed, not null
1090+
* @return Observable that composes this Completable and next
1091+
* @throws NullPointerException if next is null
1092+
*/
1093+
public final <T> Observable<T> andThen(Observable<T> next) {
1094+
requireNonNull(next);
1095+
return next.delaySubscription(toObservable());
1096+
}
1097+
10831098
/**
10841099
* Concatenates this Completable with another Completable.
10851100
* @param other the other Completable, not null

src/test/java/rx/CompletableTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.junit.*;
2424

2525
import rx.Completable.*;
26+
import rx.Observable.OnSubscribe;
2627
import rx.exceptions.*;
2728
import rx.functions.*;
2829
import rx.observers.TestSubscriber;
@@ -357,6 +358,64 @@ public void call(Long v) {
357358
Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested);
358359
}
359360

361+
@Test
362+
public void andThen() {
363+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
364+
Completable.complete().andThen(Observable.just("foo")).subscribe(ts);
365+
ts.requestMore(1);
366+
ts.assertValue("foo");
367+
ts.assertCompleted();
368+
ts.assertNoErrors();
369+
}
370+
371+
@Test
372+
public void andThenNever() {
373+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
374+
Completable.never().andThen(Observable.just("foo")).subscribe(ts);
375+
ts.requestMore(1);
376+
ts.assertNoValues();
377+
ts.assertNoTerminalEvent();
378+
}
379+
380+
@Test
381+
public void andThenError() {
382+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
383+
final AtomicBoolean hasRun = new AtomicBoolean(false);
384+
final Exception e = new Exception();
385+
Completable.create(new CompletableOnSubscribe() {
386+
@Override
387+
public void call(CompletableSubscriber cs) {
388+
cs.onError(e);
389+
}
390+
})
391+
.andThen(Observable.<String>create(new OnSubscribe<String>() {
392+
@Override
393+
public void call(Subscriber<? super String> s) {
394+
hasRun.set(true);
395+
s.onNext("foo");
396+
s.onCompleted();
397+
}
398+
}))
399+
.subscribe(ts);
400+
ts.assertNoValues();
401+
ts.assertError(e);
402+
Assert.assertFalse("Should not have subscribed to observable when completable errors", hasRun.get());
403+
}
404+
405+
@Test
406+
public void andThenSubscribeOn() {
407+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
408+
TestScheduler scheduler = new TestScheduler();
409+
Completable.complete().andThen(Observable.just("foo").delay(1, TimeUnit.SECONDS, scheduler)).subscribe(ts);
410+
ts.requestMore(1);
411+
ts.assertNoValues();
412+
ts.assertNoTerminalEvent();
413+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
414+
ts.assertValue("foo");
415+
ts.assertCompleted();
416+
ts.assertNoErrors();
417+
}
418+
360419
@Test(expected = NullPointerException.class)
361420
public void createNull() {
362421
Completable.create(null);

0 commit comments

Comments
 (0)