Skip to content

Commit 1fdd9eb

Browse files
committed
Merge pull request ReactiveX#2859 from davidmoten/doOnRequest-fix-2
OperatorDoOnRequest should unsubscribe from upstream
2 parents 9f2fc67 + d736387 commit 1fdd9eb

File tree

2 files changed

+81
-1
lines changed

2 files changed

+81
-1
lines changed

src/main/java/rx/internal/operators/OperatorDoOnRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void request(long n) {
4848
}
4949

5050
});
51-
51+
child.add(parent);
5252
return parent;
5353
}
5454

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package rx.internal.operators;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.Subscriber;
15+
import rx.functions.Action0;
16+
import rx.functions.Action1;
17+
18+
public class OperatorDoOnRequestTest {
19+
20+
@Test
21+
public void testUnsubscribeHappensAgainstParent() {
22+
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
23+
Observable.just(1)
24+
//
25+
.doOnUnsubscribe(new Action0() {
26+
@Override
27+
public void call() {
28+
unsubscribed.set(true);
29+
}
30+
})
31+
//
32+
.doOnRequest(new Action1<Long>() {
33+
@Override
34+
public void call(Long n) {
35+
// do nothing
36+
}
37+
})
38+
//
39+
.subscribe();
40+
assertTrue(unsubscribed.get());
41+
}
42+
43+
@Test
44+
public void testDoRequest() {
45+
final List<Long> requests = new ArrayList<Long>();
46+
Observable.range(1, 5)
47+
//
48+
.doOnRequest(new Action1<Long>() {
49+
@Override
50+
public void call(Long n) {
51+
requests.add(n);
52+
}
53+
})
54+
//
55+
.subscribe(new Subscriber<Integer>() {
56+
57+
@Override
58+
public void onStart() {
59+
request(3);
60+
}
61+
62+
@Override
63+
public void onCompleted() {
64+
65+
}
66+
67+
@Override
68+
public void onError(Throwable e) {
69+
70+
}
71+
72+
@Override
73+
public void onNext(Integer t) {
74+
request(t);
75+
}
76+
});
77+
assertEquals(Arrays.asList(3L,1L,2L,3L,4L,5L), requests);
78+
}
79+
80+
}

0 commit comments

Comments
 (0)