File tree Expand file tree Collapse file tree 2 files changed +44
-7
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +44
-7
lines changed Original file line number Diff line number Diff line change 1515 */
1616package rx .internal .operators ;
1717
18+ import java .util .concurrent .atomic .AtomicLong ;
19+
1820import rx .Observable .Operator ;
1921import rx .Producer ;
2022import rx .Subscriber ;
@@ -78,15 +80,24 @@ public void onNext(T i) {
7880 @ Override
7981 public void setProducer (final Producer producer ) {
8082 child .setProducer (new Producer () {
81-
83+
84+ // keeps track of requests up to maximum of `limit`
85+ final AtomicLong requested = new AtomicLong (0 );
86+
8287 @ Override
8388 public void request (long n ) {
84- if (!completed ) {
85- long c = limit - count ;
86- if (n < c ) {
87- producer .request (n );
88- } else {
89- producer .request (c );
89+ if (n >0 && !completed ) {
90+ // because requests may happen concurrently use a CAS loop to
91+ // ensure we only request as much as needed, no more no less
92+ while (true ) {
93+ long r = requested .get ();
94+ long c = Math .min (n , limit - r );
95+ if (c == 0 )
96+ break ;
97+ else if (requested .compareAndSet (r , r + c )) {
98+ producer .request (c );
99+ break ;
100+ }
90101 }
91102 }
92103 }
Original file line number Diff line number Diff line change 2727
2828import java .util .Arrays ;
2929import java .util .concurrent .CountDownLatch ;
30+ import java .util .concurrent .TimeUnit ;
3031import java .util .concurrent .atomic .AtomicBoolean ;
3132import java .util .concurrent .atomic .AtomicInteger ;
3233import java .util .concurrent .atomic .AtomicLong ;
@@ -388,4 +389,29 @@ public void call(Integer t1) {
388389 latch .await ();
389390 assertNull (exception .get ());
390391 }
392+
393+ @ Test
394+ public void testDoesntRequestMoreThanNeededFromUpstream () throws InterruptedException {
395+ final AtomicLong requests = new AtomicLong ();
396+ TestSubscriber <Long > ts = new TestSubscriber <Long >(0 );
397+ Observable .interval (100 , TimeUnit .MILLISECONDS )
398+ //
399+ .doOnRequest (new Action1 <Long >() {
400+ @ Override
401+ public void call (Long n ) {
402+ requests .addAndGet (n );
403+ }})
404+ //
405+ .take (2 )
406+ //
407+ .subscribe (ts );
408+ Thread .sleep (50 );
409+ ts .requestMore (1 );
410+ ts .requestMore (1 );
411+ ts .requestMore (1 );
412+ ts .awaitTerminalEvent ();
413+ ts .assertCompleted ();
414+ ts .assertNoErrors ();
415+ assertEquals (2 ,requests .get ());
416+ }
391417}
You can’t perform that action at this time.
0 commit comments