Skip to content

Commit 488fe1c

Browse files
committed
Merge pull request ReactiveX#2995 from davidmoten/switch-overflow
switchOnNext - ensure initial requests additive and fix request overflow
2 parents 0caeea8 + d32a1b0 commit 488fe1c

File tree

2 files changed

+117
-8
lines changed

2 files changed

+117
-8
lines changed

src/main/java/rx/internal/operators/OperatorSwitch.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,26 @@ public void request(long n) {
9494
synchronized (guard) {
9595
localSubscriber = currentSubscriber;
9696
if (currentSubscriber == null) {
97-
initialRequested = n;
97+
long r = initialRequested + n;
98+
if (r < 0) {
99+
infinite = true;
100+
} else {
101+
initialRequested = r;
102+
}
98103
} else {
99-
// If n == Long.MAX_VALUE, infinite will become true. Then currentSubscriber.requested won't be used.
100-
// Therefore we don't need to worry about overflow.
101-
currentSubscriber.requested += n;
104+
long r = currentSubscriber.requested + n;
105+
if (r < 0) {
106+
infinite = true;
107+
} else {
108+
currentSubscriber.requested = r;
109+
}
102110
}
103111
}
104112
if (localSubscriber != null) {
105-
localSubscriber.requestMore(n);
113+
if (infinite)
114+
localSubscriber.requestMore(Long.MAX_VALUE);
115+
else
116+
localSubscriber.requestMore(n);
106117
}
107118
}
108119
});
@@ -167,7 +178,8 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
167178
if (queue == null) {
168179
queue = new ArrayList<Object>();
169180
}
170-
innerSubscriber.requested--;
181+
if (innerSubscriber.requested != Long.MAX_VALUE)
182+
innerSubscriber.requested--;
171183
queue.add(value);
172184
return;
173185
}
@@ -183,7 +195,8 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
183195
if (once) {
184196
once = false;
185197
synchronized (guard) {
186-
innerSubscriber.requested--;
198+
if (innerSubscriber.requested != Long.MAX_VALUE)
199+
innerSubscriber.requested--;
187200
}
188201
s.onNext(value);
189202
}

src/test/java/rx/internal/operators/OperatorSwitchTest.java

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,20 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.assertTrue;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Matchers.anyString;
21-
import static org.mockito.Mockito.*;
22+
import static org.mockito.Mockito.inOrder;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.never;
25+
import static org.mockito.Mockito.times;
26+
import static org.mockito.Mockito.verify;
2227

28+
import java.util.ArrayList;
2329
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.concurrent.CopyOnWriteArrayList;
2432
import java.util.concurrent.TimeUnit;
2533
import java.util.concurrent.atomic.AtomicBoolean;
2634

@@ -36,6 +44,7 @@
3644
import rx.Subscriber;
3745
import rx.exceptions.TestException;
3846
import rx.functions.Action0;
47+
import rx.functions.Action1;
3948
import rx.functions.Func1;
4049
import rx.observers.TestSubscriber;
4150
import rx.schedulers.TestScheduler;
@@ -574,4 +583,91 @@ public void onNext(String t) {
574583

575584
Assert.assertEquals(250, ts.getOnNextEvents().size());
576585
}
586+
587+
@Test(timeout = 10000)
588+
public void testInitialRequestsAreAdditive() {
589+
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
590+
Observable.switchOnNext(
591+
Observable.interval(100, TimeUnit.MILLISECONDS)
592+
.map(
593+
new Func1<Long, Observable<Long>>() {
594+
@Override
595+
public Observable<Long> call(Long t) {
596+
return Observable.just(1L, 2L, 3L);
597+
}
598+
}
599+
).take(3))
600+
.subscribe(ts);
601+
ts.requestMore(Long.MAX_VALUE - 100);
602+
ts.requestMore(1);
603+
ts.awaitTerminalEvent();
604+
}
605+
606+
@Test(timeout = 10000)
607+
public void testInitialRequestsDontOverflow() {
608+
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
609+
Observable.switchOnNext(
610+
Observable.interval(100, TimeUnit.MILLISECONDS)
611+
.map(new Func1<Long, Observable<Long>>() {
612+
@Override
613+
public Observable<Long> call(Long t) {
614+
return Observable.from(Arrays.asList(1L, 2L, 3L));
615+
}
616+
}).take(3)).subscribe(ts);
617+
ts.requestMore(Long.MAX_VALUE - 1);
618+
ts.requestMore(2);
619+
ts.awaitTerminalEvent();
620+
assertTrue(ts.getOnNextEvents().size() > 0);
621+
}
622+
623+
624+
@Test(timeout = 10000)
625+
public void testSecondaryRequestsDontOverflow() throws InterruptedException {
626+
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
627+
Observable.switchOnNext(
628+
Observable.interval(100, TimeUnit.MILLISECONDS)
629+
.map(new Func1<Long, Observable<Long>>() {
630+
@Override
631+
public Observable<Long> call(Long t) {
632+
return Observable.from(Arrays.asList(1L, 2L, 3L));
633+
}
634+
}).take(3)).subscribe(ts);
635+
ts.requestMore(1);
636+
//we will miss two of the first observable
637+
Thread.sleep(250);
638+
ts.requestMore(Long.MAX_VALUE - 1);
639+
ts.requestMore(Long.MAX_VALUE - 1);
640+
ts.awaitTerminalEvent();
641+
ts.assertValueCount(7);
642+
}
643+
644+
@Test(timeout = 10000)
645+
public void testSecondaryRequestsAdditivelyAreMoreThanLongMaxValueInducesMaxValueRequestFromUpstream() throws InterruptedException {
646+
final List<Long> requests = new CopyOnWriteArrayList<Long>();
647+
final Action1<Long> addRequest = new Action1<Long>() {
648+
649+
@Override
650+
public void call(Long n) {
651+
requests.add(n);
652+
}};
653+
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
654+
Observable.switchOnNext(
655+
Observable.interval(100, TimeUnit.MILLISECONDS)
656+
.map(new Func1<Long, Observable<Long>>() {
657+
@Override
658+
public Observable<Long> call(Long t) {
659+
return Observable.from(Arrays.asList(1L, 2L, 3L)).doOnRequest(addRequest);
660+
}
661+
}).take(3)).subscribe(ts);
662+
ts.requestMore(1);
663+
//we will miss two of the first observable
664+
Thread.sleep(250);
665+
ts.requestMore(Long.MAX_VALUE - 1);
666+
ts.requestMore(Long.MAX_VALUE - 1);
667+
ts.awaitTerminalEvent();
668+
assertTrue(ts.getOnNextEvents().size() > 0);
669+
assertEquals(5, (int) requests.size());
670+
assertEquals(Long.MAX_VALUE, (long) requests.get(3));
671+
assertEquals(Long.MAX_VALUE, (long) requests.get(4));
672+
}
577673
}

0 commit comments

Comments
 (0)