Skip to content

Commit 8b6e500

Browse files
committed
Merge pull request ReactiveX#2868 from akarnokd/RetryWithBackpressureFix
Fixed reentrancy issue with the error producer.
2 parents 4d4a33c + 6fcdc7e commit 8b6e500

File tree

1 file changed

+63
-13
lines changed

1 file changed

+63
-13
lines changed

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,9 @@ public void call(final Subscriber<? super String> o) {
399399
public void request(long n) {
400400
if (n == Long.MAX_VALUE) {
401401
o.onNext("beginningEveryTime");
402-
if (count.getAndIncrement() < numFailures) {
403-
o.onError(new RuntimeException("forced failure: " + count.get()));
402+
int i = count.getAndIncrement();
403+
if (i < numFailures) {
404+
o.onError(new RuntimeException("forced failure: " + (i + 1)));
404405
} else {
405406
o.onNext("onSuccessOnly");
406407
o.onCompleted();
@@ -411,8 +412,7 @@ public void request(long n) {
411412
int i = count.getAndIncrement();
412413
if (i < numFailures) {
413414
o.onNext("beginningEveryTime");
414-
o.onError(new RuntimeException("forced failure: " + count.get()));
415-
req.decrementAndGet();
415+
o.onError(new RuntimeException("forced failure: " + (i + 1)));
416416
} else {
417417
do {
418418
if (i == numFailures) {
@@ -705,17 +705,18 @@ public void testRetryWithBackpressure() throws InterruptedException {
705705
inOrder.verifyNoMoreInteractions();
706706
}
707707
}
708+
708709
@Test(timeout = 15000)
709710
public void testRetryWithBackpressureParallel() throws InterruptedException {
710711
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
711712
int ncpu = Runtime.getRuntime().availableProcessors();
712-
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 1));
713+
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 2));
713714
final AtomicInteger timeouts = new AtomicInteger();
714715
final Map<Integer, List<String>> data = new ConcurrentHashMap<Integer, List<String>>();
715716
final Map<Integer, List<Throwable>> exceptions = new ConcurrentHashMap<Integer, List<Throwable>>();
716717
final Map<Integer, Integer> completions = new ConcurrentHashMap<Integer, Integer>();
717718

718-
int m = 2000;
719+
int m = 5000;
719720
final CountDownLatch cdl = new CountDownLatch(m);
720721
for (int i = 0; i < m; i++) {
721722
final int j = i;
@@ -726,16 +727,17 @@ public void run() {
726727
try {
727728
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
728729
TestSubscriber<String> ts = new TestSubscriber<String>();
729-
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
730-
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
731-
if (ts.getOnNextEvents().size() != NUM_RETRIES + 2) {
732-
data.put(j, ts.getOnNextEvents());
730+
origin.retry()
731+
.observeOn(Schedulers.computation()).unsafeSubscribe(ts);
732+
ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS);
733+
if (ts.getOnCompletedEvents().size() != 1) {
734+
completions.put(j, ts.getOnCompletedEvents().size());
733735
}
734736
if (ts.getOnErrorEvents().size() != 0) {
735737
exceptions.put(j, ts.getOnErrorEvents());
736738
}
737-
if (ts.getOnCompletedEvents().size() != 1) {
738-
completions.put(j, ts.getOnCompletedEvents().size());
739+
if (ts.getOnNextEvents().size() != NUM_RETRIES + 2) {
740+
data.put(j, ts.getOnNextEvents());
739741
}
740742
} catch (Throwable t) {
741743
timeouts.incrementAndGet();
@@ -749,7 +751,16 @@ public void run() {
749751
cdl.await();
750752
assertEquals(0, timeouts.get());
751753
if (data.size() > 0) {
752-
fail("Data content mismatch: " + data);
754+
System.out.println(allSequenceFrequency(data));
755+
}
756+
if (exceptions.size() > 0) {
757+
System.out.println(exceptions);
758+
}
759+
if (completions.size() > 0) {
760+
System.out.println(completions);
761+
}
762+
if (data.size() > 0) {
763+
fail("Data content mismatch: " + allSequenceFrequency(data));
753764
}
754765
if (exceptions.size() > 0) {
755766
fail("Exceptions received: " + exceptions);
@@ -758,6 +769,45 @@ public void run() {
758769
fail("Multiple completions received: " + completions);
759770
}
760771
}
772+
static <T> StringBuilder allSequenceFrequency(Map<Integer, List<T>> its) {
773+
StringBuilder b = new StringBuilder();
774+
for (Map.Entry<Integer, List<T>> e : its.entrySet()) {
775+
if (b.length() > 0) {
776+
b.append(", ");
777+
}
778+
b.append(e.getKey()).append("={");
779+
b.append(sequenceFrequency(e.getValue()));
780+
b.append("}");
781+
}
782+
return b;
783+
}
784+
static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
785+
StringBuilder sb = new StringBuilder();
786+
787+
Object prev = null;
788+
int cnt = 0;
789+
790+
for (Object curr : it) {
791+
if (sb.length() > 0) {
792+
if (!curr.equals(prev)) {
793+
if (cnt > 1) {
794+
sb.append(" x ").append(cnt);
795+
cnt = 1;
796+
}
797+
sb.append(", ");
798+
sb.append(curr);
799+
} else {
800+
cnt++;
801+
}
802+
} else {
803+
sb.append(curr);
804+
cnt++;
805+
}
806+
prev = curr;
807+
}
808+
809+
return sb;
810+
}
761811
@Test(timeout = 3000)
762812
public void testIssue1900() throws InterruptedException {
763813
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)