Skip to content

Commit 13ed3ca

Browse files
Merge pull request ReactiveX#1414 from benjchristensen/merge-fixes
Merge Fixes
2 parents 34aaf0b + 2b921eb commit 13ed3ca

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
449449
private boolean mayNeedToDrain = false;
450450
/* protected by emitLock */
451451
int emitted = 0;
452+
final int THRESHOLD = (int) (q.capacity() * 0.7);
452453

453454
public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
454455
this.parentSubscriber = parent;
@@ -535,7 +536,9 @@ private void emit(T t, boolean complete) {
535536
emitted++;
536537
}
537538
} else {
538-
if (producer.requested > 0) {
539+
// this needs to check q.count() as draining above may not have drained the full queue
540+
// perf tests show this to be okay, though different queue implementations could perform poorly with this
541+
if (producer.requested > 0 && q.count() == 0) {
539542
if (complete) {
540543
parentSubscriber.completeInner(this);
541544
} else {
@@ -551,8 +554,25 @@ private void emit(T t, boolean complete) {
551554
} finally {
552555
drain = parentSubscriber.releaseEmitLock();
553556
}
554-
if (emitted > 256) {
557+
if (emitted > THRESHOLD) {
555558
// this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
559+
/**
560+
* <pre> {@code
561+
* Without this batching:
562+
*
563+
* Benchmark (size) Mode Samples Score Score error Units
564+
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
565+
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
566+
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
567+
*
568+
* With this batching:
569+
*
570+
* Benchmark (size) Mode Samples Score Score error Units
571+
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
572+
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
573+
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
574+
*} </pre>
575+
*/
556576
request(emitted);
557577
// we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
558578
// and it will be flushed before anything else touches it because the emitLock will be obtained

rxjava-core/src/test/java/rx/BackpressureTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public Observable<Integer> call(Integer i) {
180180
}
181181

182182
@Ignore
183-
@Test(timeout = 1000)
183+
@Test(timeout = 2000)
184184
public void testZipSync() {
185185
int NUM = (int) ((int) RxRingBuffer.SIZE * 4.1);
186186
AtomicInteger c1 = new AtomicInteger();
@@ -208,7 +208,7 @@ public Integer call(Integer t1, Integer t2) {
208208
}
209209

210210
@Ignore
211-
@Test(timeout = 1000)
211+
@Test(timeout = 2000)
212212
public void testZipAsync() {
213213
int NUM = (int) ((int) RxRingBuffer.SIZE * 2.1);
214214
AtomicInteger c1 = new AtomicInteger();
@@ -337,7 +337,7 @@ public void onNext(Integer t) {
337337
assertEquals(20, batches.get());
338338
}
339339

340-
@Test(timeout = 1000)
340+
@Test(timeout = 2000)
341341
public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
342342
AtomicInteger c = new AtomicInteger();
343343
final AtomicInteger totalReceived = new AtomicInteger();
@@ -380,7 +380,7 @@ public void onNext(Integer t) {
380380
assertEquals(20, batches.get());
381381
}
382382

383-
@Test(timeout = 1000)
383+
@Test(timeout = 2000)
384384
public void testFirehoseFailsAsExpected() {
385385
AtomicInteger c = new AtomicInteger();
386386
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
@@ -391,7 +391,7 @@ public void testFirehoseFailsAsExpected() {
391391
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
392392
}
393393

394-
@Test(timeout = 1000)
394+
@Test(timeout = 2000)
395395
public void testOnBackpressureDrop() {
396396
int NUM = (int) ((int) RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
397397
AtomicInteger c = new AtomicInteger();
@@ -405,7 +405,7 @@ public void testOnBackpressureDrop() {
405405
assertTrue(NUM < ts.getOnNextEvents().get(NUM - 1).intValue());
406406
}
407407

408-
@Test(timeout = 1000)
408+
@Test(timeout = 2000)
409409
public void testOnBackpressureBuffer() {
410410
int NUM = (int) ((int) RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
411411
AtomicInteger c = new AtomicInteger();

rxjava-core/src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ public void onNext(Integer t) {
669669
if (t < 100)
670670
try {
671671
// force a slow consumer
672-
Thread.sleep(2);
672+
Thread.sleep(1);
673673
} catch (InterruptedException e) {
674674
e.printStackTrace();
675675
}

0 commit comments

Comments
 (0)