|
23 | 23 | import rx.util.AtomicObservableSubscription;
|
24 | 24 | import rx.util.functions.Func1;
|
25 | 25 |
|
| 26 | +import java.util.concurrent.atomic.AtomicBoolean; |
26 | 27 | import java.util.concurrent.atomic.AtomicInteger;
|
27 | 28 |
|
| 29 | +import static org.junit.Assert.assertTrue; |
28 | 30 | import static org.junit.Assert.fail;
|
29 | 31 | import static org.mockito.Matchers.any;
|
30 | 32 | import static org.mockito.Mockito.mock;
|
@@ -82,6 +84,23 @@ private Take(Observable<T> items, int num) {
|
82 | 84 | @Override
|
83 | 85 | public Subscription call(Observer<T> observer) {
|
84 | 86 | if (num < 1) {
|
| 87 | + items.subscribe(new Observer<T>() |
| 88 | + { |
| 89 | + @Override |
| 90 | + public void onCompleted() |
| 91 | + { |
| 92 | + } |
| 93 | + |
| 94 | + @Override |
| 95 | + public void onError(Exception e) |
| 96 | + { |
| 97 | + } |
| 98 | + |
| 99 | + @Override |
| 100 | + public void onNext(T args) |
| 101 | + { |
| 102 | + } |
| 103 | + }).unsubscribe(); |
85 | 104 | observer.onCompleted();
|
86 | 105 | return Subscriptions.empty();
|
87 | 106 | }
|
@@ -178,8 +197,28 @@ public Subscription call(Observer<String> observer)
|
178 | 197 |
|
179 | 198 | @Test
|
180 | 199 | public void testTakeZeroDoesntLeakError() {
|
181 |
| - Observable<String> source = Observable.error(new Exception("test failed")); |
| 200 | + final AtomicBoolean subscribed = new AtomicBoolean(false); |
| 201 | + final AtomicBoolean unSubscribed = new AtomicBoolean(false); |
| 202 | + Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>() |
| 203 | + { |
| 204 | + @Override |
| 205 | + public Subscription call(Observer<String> observer) |
| 206 | + { |
| 207 | + subscribed.set(true); |
| 208 | + observer.onError(new Exception("test failed")); |
| 209 | + return new Subscription() |
| 210 | + { |
| 211 | + @Override |
| 212 | + public void unsubscribe() |
| 213 | + { |
| 214 | + unSubscribed.set(true); |
| 215 | + } |
| 216 | + }; |
| 217 | + } |
| 218 | + }); |
182 | 219 | Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok");
|
| 220 | + assertTrue("source subscribed", subscribed.get()); |
| 221 | + assertTrue("source unsubscribed", unSubscribed.get()); |
183 | 222 | }
|
184 | 223 |
|
185 | 224 | @Test
|
|
0 commit comments