Skip to content

Commit b62698c

Browse files
committed
add backpressure support for defaultIfEmpty()
1 parent c868ba9 commit b62698c

File tree

3 files changed

+27
-65
lines changed

3 files changed

+27
-65
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3858,7 +3858,8 @@ public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler sched
38583858
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
38593859
*/
38603860
public final Observable<T> defaultIfEmpty(T defaultValue) {
3861-
return lift(new OperatorDefaultIfEmpty<T>(defaultValue));
3861+
//if empty switch to an observable that emits defaultValue and supports backpressure
3862+
return switchIfEmpty(Observable.from((Arrays.asList(defaultValue))));
38623863
}
38633864

38643865
/**

src/main/java/rx/internal/operators/OperatorDefaultIfEmpty.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

src/test/java/rx/internal/operators/OperatorDefaultIfEmptyTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.Observer;
2727
import rx.Subscriber;
2828
import rx.exceptions.TestException;
29+
import rx.observers.TestSubscriber;
2930

3031
public class OperatorDefaultIfEmptyTest {
3132

@@ -85,4 +86,28 @@ public void onCompleted() {
8586
verify(o, never()).onNext(any(Integer.class));
8687
verify(o, never()).onCompleted();
8788
}
89+
90+
@Test
91+
public void testBackpressureEmpty() {
92+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
93+
Observable.<Integer>empty().defaultIfEmpty(1).subscribe(ts);
94+
ts.assertNoValues();
95+
ts.assertNoTerminalEvent();
96+
ts.requestMore(1);
97+
ts.assertValue(1);
98+
ts.assertCompleted();
99+
}
100+
101+
@Test
102+
public void testBackpressureNonEmpty() {
103+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
104+
Observable.just(1,2,3).defaultIfEmpty(1).subscribe(ts);
105+
ts.assertNoValues();
106+
ts.assertNoTerminalEvent();
107+
ts.requestMore(2);
108+
ts.assertValues(1, 2);
109+
ts.requestMore(1);
110+
ts.assertValues(1, 2, 3);
111+
ts.assertCompleted();
112+
}
88113
}

0 commit comments

Comments
 (0)