Skip to content

Commit c833083

Browse files
committed
Merge pull request ReactiveX#3042 from davidmoten/default-if-empty-backp-2
add backpressure support for defaultIfEmpty()
2 parents b187c71 + 60924af commit c833083

File tree

3 files changed

+34
-66
lines changed

3 files changed

+34
-66
lines changed

src/main/java/rx/Observable.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.exceptions.*;
2020
import rx.functions.*;
2121
import rx.internal.operators.*;
22+
import rx.internal.producers.SingleProducer;
2223
import rx.internal.util.*;
2324
import rx.observables.*;
2425
import rx.observers.SafeSubscriber;
@@ -3857,8 +3858,14 @@ public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler sched
38573858
* items, or the items emitted by the source Observable
38583859
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
38593860
*/
3860-
public final Observable<T> defaultIfEmpty(T defaultValue) {
3861-
return lift(new OperatorDefaultIfEmpty<T>(defaultValue));
3861+
public final Observable<T> defaultIfEmpty(final T defaultValue) {
3862+
//if empty switch to an observable that emits defaultValue and supports backpressure
3863+
return switchIfEmpty(Observable.create(new OnSubscribe<T>() {
3864+
3865+
@Override
3866+
public void call(Subscriber<? super T> subscriber) {
3867+
subscriber.setProducer(new SingleProducer<T>(subscriber, defaultValue));
3868+
}}));
38623869
}
38633870

38643871
/**

src/main/java/rx/internal/operators/OperatorDefaultIfEmpty.java

-64
This file was deleted.

src/test/java/rx/internal/operators/OperatorDefaultIfEmptyTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import rx.Observer;
2727
import rx.Subscriber;
2828
import rx.exceptions.TestException;
29+
import rx.observers.TestSubscriber;
2930

3031
public class OperatorDefaultIfEmptyTest {
3132

@@ -85,4 +86,28 @@ public void onCompleted() {
8586
verify(o, never()).onNext(any(Integer.class));
8687
verify(o, never()).onCompleted();
8788
}
89+
90+
@Test
91+
public void testBackpressureEmpty() {
92+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
93+
Observable.<Integer>empty().defaultIfEmpty(1).subscribe(ts);
94+
ts.assertNoValues();
95+
ts.assertNoTerminalEvent();
96+
ts.requestMore(1);
97+
ts.assertValue(1);
98+
ts.assertCompleted();
99+
}
100+
101+
@Test
102+
public void testBackpressureNonEmpty() {
103+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
104+
Observable.just(1,2,3).defaultIfEmpty(1).subscribe(ts);
105+
ts.assertNoValues();
106+
ts.assertNoTerminalEvent();
107+
ts.requestMore(2);
108+
ts.assertValues(1, 2);
109+
ts.requestMore(1);
110+
ts.assertValues(1, 2, 3);
111+
ts.assertCompleted();
112+
}
88113
}

0 commit comments

Comments
 (0)