Skip to content

Commit d51763d

Browse files
committed
OperatorSingle should request exactly what it needs
1 parent 9f2fc67 commit d51763d

File tree

2 files changed

+235
-36
lines changed

2 files changed

+235
-36
lines changed

src/main/java/rx/internal/operators/OperatorSingle.java

Lines changed: 67 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
package rx.internal.operators;
1717

1818
import java.util.NoSuchElementException;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920

2021
import rx.Observable.Operator;
22+
import rx.Producer;
2123
import rx.Subscriber;
2224

2325
/**
@@ -44,53 +46,84 @@ private OperatorSingle(boolean hasDefaultValue, final T defaultValue) {
4446
}
4547

4648
@Override
47-
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
48-
return new Subscriber<T>(subscriber) {
49+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
4950

50-
private T value;
51-
private boolean isNonEmpty = false;
52-
private boolean hasTooManyElements = false;
51+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, hasDefaultValue,
52+
defaultValue);
53+
54+
child.setProducer(new Producer() {
55+
56+
private final AtomicBoolean requestedTwo = new AtomicBoolean(false);
5357

5458
@Override
55-
public void onNext(T value) {
56-
if (isNonEmpty) {
57-
hasTooManyElements = true;
58-
subscriber.onError(new IllegalArgumentException("Sequence contains too many elements"));
59-
unsubscribe();
60-
} else {
61-
this.value = value;
62-
isNonEmpty = true;
63-
// Issue: https://github.com/ReactiveX/RxJava/pull/1527
64-
// Because we cache a value and don't emit now, we need to request another one.
65-
request(1);
59+
public void request(long n) {
60+
if (n > 0 && requestedTwo.compareAndSet(false, true)) {
61+
parent.requestMore(2);
6662
}
6763
}
6864

69-
@Override
70-
public void onCompleted() {
71-
if (hasTooManyElements) {
72-
// We have already sent an onError message
65+
});
66+
child.add(parent);
67+
return parent;
68+
}
69+
70+
private static final class ParentSubscriber<T> extends Subscriber<T> {
71+
private final Subscriber<? super T> child;
72+
private final boolean hasDefaultValue;
73+
private final T defaultValue;
74+
75+
private T value;
76+
private boolean isNonEmpty = false;
77+
private boolean hasTooManyElements = false;
78+
79+
80+
ParentSubscriber(Subscriber<? super T> child, boolean hasDefaultValue,
81+
T defaultValue) {
82+
this.child = child;
83+
this.hasDefaultValue = hasDefaultValue;
84+
this.defaultValue = defaultValue;
85+
}
86+
87+
void requestMore(long n) {
88+
request(n);
89+
}
90+
91+
@Override
92+
public void onNext(T value) {
93+
if (isNonEmpty) {
94+
hasTooManyElements = true;
95+
child.onError(new IllegalArgumentException("Sequence contains too many elements"));
96+
unsubscribe();
97+
} else {
98+
this.value = value;
99+
isNonEmpty = true;
100+
}
101+
}
102+
103+
@Override
104+
public void onCompleted() {
105+
if (hasTooManyElements) {
106+
// We have already sent an onError message
107+
} else {
108+
if (isNonEmpty) {
109+
child.onNext(value);
110+
child.onCompleted();
73111
} else {
74-
if (isNonEmpty) {
75-
subscriber.onNext(value);
76-
subscriber.onCompleted();
112+
if (hasDefaultValue) {
113+
child.onNext(defaultValue);
114+
child.onCompleted();
77115
} else {
78-
if (hasDefaultValue) {
79-
subscriber.onNext(defaultValue);
80-
subscriber.onCompleted();
81-
} else {
82-
subscriber.onError(new NoSuchElementException("Sequence contains no elements"));
83-
}
116+
child.onError(new NoSuchElementException("Sequence contains no elements"));
84117
}
85118
}
86119
}
120+
}
87121

88-
@Override
89-
public void onError(Throwable e) {
90-
subscriber.onError(e);
91-
}
122+
@Override
123+
public void onError(Throwable e) {
124+
child.onError(e);
125+
}
92126

93-
};
94127
}
95128

96129
}

src/test/java/rx/internal/operators/OperatorSingleTest.java

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,24 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919
import static org.mockito.Matchers.isA;
20-
import static org.mockito.Mockito.*;
21-
20+
import static org.mockito.Mockito.inOrder;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.spy;
23+
import static org.mockito.Mockito.times;
24+
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.List;
2228
import java.util.NoSuchElementException;
29+
import java.util.concurrent.atomic.AtomicLong;
2330

2431
import org.junit.Test;
2532
import org.mockito.InOrder;
2633

2734
import rx.Observable;
2835
import rx.Observer;
2936
import rx.Subscriber;
37+
import rx.functions.Action1;
3038
import rx.functions.Func1;
3139
import rx.functions.Func2;
3240

@@ -73,6 +81,164 @@ public void testSingleWithEmpty() {
7381
isA(NoSuchElementException.class));
7482
inOrder.verifyNoMoreInteractions();
7583
}
84+
85+
@Test
86+
public void testSingleDoesNotRequestMoreThanItNeedsToEmitItem() {
87+
final AtomicLong request = new AtomicLong();
88+
Observable.just(1).doOnRequest(new Action1<Long>() {
89+
@Override
90+
public void call(Long n) {
91+
request.addAndGet(n);
92+
}
93+
}).toBlocking().single();
94+
assertEquals(2, request.get());
95+
}
96+
97+
@Test
98+
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromEmpty() {
99+
final AtomicLong request = new AtomicLong();
100+
try {
101+
Observable.empty().doOnRequest(new Action1<Long>() {
102+
@Override
103+
public void call(Long n) {
104+
request.addAndGet(n);
105+
}
106+
}).toBlocking().single();
107+
} catch (NoSuchElementException e) {
108+
assertEquals(2, request.get());
109+
}
110+
}
111+
112+
@Test
113+
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromMoreThanOne() {
114+
final AtomicLong request = new AtomicLong();
115+
try {
116+
Observable.just(1, 2).doOnRequest(new Action1<Long>() {
117+
@Override
118+
public void call(Long n) {
119+
request.addAndGet(n);
120+
}
121+
}).toBlocking().single();
122+
} catch (IllegalArgumentException e) {
123+
assertEquals(2, request.get());
124+
}
125+
}
126+
127+
@Test
128+
public void testSingleDoesNotRequestMoreThanItNeedsIf1Then2Requested() {
129+
final List<Long> requests = new ArrayList<Long>();
130+
Observable.just(1)
131+
//
132+
.doOnRequest(new Action1<Long>() {
133+
@Override
134+
public void call(Long n) {
135+
requests.add(n);
136+
}
137+
})
138+
//
139+
.single()
140+
//
141+
.subscribe(new Subscriber<Integer>() {
142+
143+
@Override
144+
public void onStart() {
145+
request(1);
146+
}
147+
148+
@Override
149+
public void onCompleted() {
150+
151+
}
152+
153+
@Override
154+
public void onError(Throwable e) {
155+
156+
}
157+
158+
@Override
159+
public void onNext(Integer t) {
160+
request(2);
161+
}
162+
});
163+
assertEquals(Arrays.asList(2L), requests);
164+
}
165+
166+
@Test
167+
public void testSingleDoesNotRequestMoreThanItNeedsIf3Requested() {
168+
final List<Long> requests = new ArrayList<Long>();
169+
Observable.just(1)
170+
//
171+
.doOnRequest(new Action1<Long>() {
172+
@Override
173+
public void call(Long n) {
174+
requests.add(n);
175+
}
176+
})
177+
//
178+
.single()
179+
//
180+
.subscribe(new Subscriber<Integer>() {
181+
182+
@Override
183+
public void onStart() {
184+
request(3);
185+
}
186+
187+
@Override
188+
public void onCompleted() {
189+
190+
}
191+
192+
@Override
193+
public void onError(Throwable e) {
194+
195+
}
196+
197+
@Override
198+
public void onNext(Integer t) {
199+
}
200+
});
201+
assertEquals(Arrays.asList(2L), requests);
202+
}
203+
204+
@Test
205+
public void testSingleRequestsExactlyWhatItNeedsIf1Requested() {
206+
final List<Long> requests = new ArrayList<Long>();
207+
Observable.just(1)
208+
//
209+
.doOnRequest(new Action1<Long>() {
210+
@Override
211+
public void call(Long n) {
212+
requests.add(n);
213+
}
214+
})
215+
//
216+
.single()
217+
//
218+
.subscribe(new Subscriber<Integer>() {
219+
220+
@Override
221+
public void onStart() {
222+
request(1);
223+
}
224+
225+
@Override
226+
public void onCompleted() {
227+
228+
}
229+
230+
@Override
231+
public void onError(Throwable e) {
232+
233+
}
234+
235+
@Override
236+
public void onNext(Integer t) {
237+
}
238+
});
239+
assertEquals(Arrays.asList(2L), requests);
240+
}
241+
76242

77243
@Test
78244
public void testSingleWithPredicate() {

0 commit comments

Comments
 (0)