Skip to content

Commit 0a6e26d

Browse files
committed
Change retryWhen to eagerly ignore an error'd source's subsequent events
1 parent 34cd1a6 commit 0a6e26d

File tree

3 files changed

+79
-27
lines changed

3 files changed

+79
-27
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,26 +211,35 @@ public void call() {
211211
}
212212

213213
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
214+
boolean done;
214215
@Override
215216
public void onCompleted() {
216-
currentProducer.set(null);
217-
unsubscribe();
218-
terminals.onNext(Notification.createOnCompleted());
217+
if (!done) {
218+
done = true;
219+
currentProducer.set(null);
220+
unsubscribe();
221+
terminals.onNext(Notification.createOnCompleted());
222+
}
219223
}
220224

221225
@Override
222226
public void onError(Throwable e) {
223-
currentProducer.set(null);
224-
unsubscribe();
225-
terminals.onNext(Notification.createOnError(e));
227+
if (!done) {
228+
done = true;
229+
currentProducer.set(null);
230+
unsubscribe();
231+
terminals.onNext(Notification.createOnError(e));
232+
}
226233
}
227234

228235
@Override
229236
public void onNext(T v) {
230-
if (consumerCapacity.get() != Long.MAX_VALUE) {
231-
consumerCapacity.decrementAndGet();
237+
if (!done) {
238+
if (consumerCapacity.get() != Long.MAX_VALUE) {
239+
consumerCapacity.decrementAndGet();
240+
}
241+
child.onNext(v);
232242
}
233-
child.onNext(v);
234243
}
235244

236245
@Override

src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,34 @@ public void call() {
8484
// new subscription each time so if it unsubscribes itself it does not prevent retries
8585
// by unsubscribing the child subscription
8686
Subscriber<T> subscriber = new Subscriber<T>() {
87-
87+
boolean done;
8888
@Override
8989
public void onCompleted() {
90-
child.onCompleted();
90+
if (!done) {
91+
done = true;
92+
child.onCompleted();
93+
}
9194
}
9295

9396
@Override
9497
public void onError(Throwable e) {
95-
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
96-
// retry again
97-
inner.schedule(_self);
98-
} else {
99-
// give up and pass the failure
100-
child.onError(e);
98+
if (!done) {
99+
done = true;
100+
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
101+
// retry again
102+
inner.schedule(_self);
103+
} else {
104+
// give up and pass the failure
105+
child.onError(e);
106+
}
101107
}
102108
}
103109

104110
@Override
105111
public void onNext(T v) {
106-
child.onNext(v);
112+
if (!done) {
113+
child.onNext(v);
114+
}
107115
}
108116

109117
};

src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,23 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
19+
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.*;
21+
1822
import java.io.IOException;
23+
import java.util.Collections;
1924
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicInteger;
21-
import static org.junit.Assert.assertEquals;
25+
import java.util.concurrent.atomic.*;
26+
2227
import org.junit.Test;
2328
import org.mockito.InOrder;
24-
import static org.mockito.Mockito.*;
25-
import rx.Observable;
29+
30+
import rx.*;
2631
import rx.Observable.OnSubscribe;
27-
import rx.Observer;
28-
import rx.Subscriber;
29-
import rx.Subscription;
3032
import rx.exceptions.TestException;
31-
import rx.functions.Action1;
32-
import rx.functions.Func2;
33+
import rx.functions.*;
34+
import rx.observers.TestSubscriber;
3335
import rx.subjects.PublishSubject;
3436

3537
public class OperatorRetryWithPredicateTest {
@@ -270,4 +272,37 @@ public void testTimeoutWithRetry() {
270272

271273
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
272274
}
275+
276+
@Test
277+
public void testIssue2826() {
278+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
279+
final RuntimeException e = new RuntimeException("You shall not pass");
280+
final AtomicInteger c = new AtomicInteger();
281+
Observable.just(1).map(new Func1<Integer, Integer>() {
282+
@Override
283+
public Integer call(Integer t1) {
284+
c.incrementAndGet();
285+
throw e;
286+
}
287+
}).retry(retry5).subscribe(ts);
288+
289+
ts.assertTerminalEvent();
290+
assertEquals(6, c.get());
291+
assertEquals(Collections.singletonList(e), ts.getOnErrorEvents());
292+
}
293+
@Test
294+
public void testJustAndRetry() throws Exception {
295+
final AtomicBoolean throwException = new AtomicBoolean(true);
296+
int value = Observable.just(1).map(new Func1<Integer, Integer>() {
297+
@Override
298+
public Integer call(Integer t1) {
299+
if (throwException.compareAndSet(true, false)) {
300+
throw new TestException();
301+
}
302+
return t1;
303+
}
304+
}).retry(1).toBlocking().single();
305+
306+
assertEquals(1, value);
307+
}
273308
}

0 commit comments

Comments
 (0)