Skip to content

Commit 162c31c

Browse files
Remove Request Batching in Merge
Removing the batching until we can find a correct way to do it. The performance impact of this change is seen here: Benchmark (size) Mode Samples 1.x No Request Batching r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4585554.607 4666745.314 102% r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51273.033 39922.246 78% r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 47.515 37.634 79% r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 90901.735 93454.726 103% r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 5.407 4.910 91% r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4181618.767 4173322.551 100% r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 422193.599 408972.130 97% r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 36886.812 36448.978 99% r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 4815945.720 4887943.643 101% r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 43.926 39.027 89% r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 72578.046 70412.656 97% r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3260.024 3064.403 94% r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4678858.201 4808504.588 103% r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34407.547 36364.476 106% r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 31.312 32.261 103%
1 parent dd73c15 commit 162c31c

File tree

2 files changed

+97
-40
lines changed

2 files changed

+97
-40
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

+6-39
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,9 @@ private void handleNewSource(Observable<? extends T> t) {
186186
InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
187187
i.sindex = childrenSubscribers.add(i);
188188
t.unsafeSubscribe(i);
189-
if (!isUnsubscribed())
189+
if (!isUnsubscribed()) {
190190
request(1);
191+
}
191192
}
192193

193194
private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
@@ -382,19 +383,8 @@ private int drainScalarValueQueue() {
382383
public Boolean call(InnerSubscriber<T> s) {
383384
if (s.q != null) {
384385
long r = mergeProducer.requested;
385-
int emitted = 0;
386-
emitted += s.drainQueue();
386+
int emitted = s.drainQueue();
387387
if (emitted > 0) {
388-
/*
389-
* `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests)
390-
* but `emitted` can ONLY be touched by the thread holding the `emitLock` which we're currently inside.
391-
*
392-
* Entering and leaving the emitLock flushes all values so this is visible to us.
393-
*/
394-
emitted += s.emitted;
395-
// TODO we may want to store this in s.emitted and only request if above batch
396-
// reset this since we have requested them all
397-
s.emitted = 0;
398388
s.requestMore(emitted);
399389
}
400390
if (emitted == r) {
@@ -542,9 +532,6 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
542532
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
543533

544534
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
545-
/* protected by emitLock */
546-
int emitted = 0;
547-
final int THRESHOLD = (int) (q.capacity() * 0.7);
548535

549536
public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
550537
this.parentSubscriber = parent;
@@ -618,6 +605,7 @@ private void emit(T t, boolean complete) {
618605
* putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case.
619606
*/
620607
if (parentSubscriber.getEmitLock()) {
608+
long emitted = 0;
621609
enqueue = false;
622610
try {
623611
// drain the queue if there is anything in it before emitting the current value
@@ -660,30 +648,9 @@ private void emit(T t, boolean complete) {
660648
} finally {
661649
drain = parentSubscriber.releaseEmitLock();
662650
}
663-
if (emitted > THRESHOLD) {
664-
// this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
665-
/**
666-
* <pre> {@code
667-
* Without this batching:
668-
*
669-
* Benchmark (size) Mode Samples Score Score error Units
670-
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
671-
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
672-
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
673-
*
674-
* With this batching:
675-
*
676-
* Benchmark (size) Mode Samples Score Score error Units
677-
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
678-
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
679-
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
680-
*} </pre>
681-
*/
651+
// request upstream what we just emitted
652+
if(emitted > 0) {
682653
request(emitted);
683-
// we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
684-
// and it will be flushed before anything else touches it because the emitLock will be obtained
685-
// before any other usage of it
686-
emitted = 0;
687654
}
688655
}
689656
if (enqueue) {

src/test/java/rx/internal/operators/OperatorMergeTest.java

+91-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Collections;
3131
import java.util.Iterator;
3232
import java.util.List;
33+
import java.util.concurrent.ConcurrentLinkedQueue;
3334
import java.util.concurrent.CountDownLatch;
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,9 +41,14 @@
4041
import org.mockito.Mock;
4142
import org.mockito.MockitoAnnotations;
4243

43-
import rx.*;
44+
import rx.Notification;
45+
import rx.Observable;
4446
import rx.Observable.OnSubscribe;
47+
import rx.Observer;
48+
import rx.Scheduler;
4549
import rx.Scheduler.Worker;
50+
import rx.Subscriber;
51+
import rx.Subscription;
4652
import rx.functions.Action0;
4753
import rx.functions.Action1;
4854
import rx.functions.Func1;
@@ -1105,4 +1111,88 @@ public void shouldNotReceivedDelayedErrorWhileThereAreStillNormalEmissionsInTheQ
11051111
subscriber.assertReceivedOnNext(asList(1, 2, 3, 4));
11061112
assertEquals(asList(exception), subscriber.getOnErrorEvents());
11071113
}
1114+
1115+
@Test
1116+
public void testMergeKeepsRequesting() throws InterruptedException {
1117+
//for (int i = 0; i < 5000; i++) {
1118+
//System.out.println(i + ".......................................................................");
1119+
final CountDownLatch latch = new CountDownLatch(1);
1120+
final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>();
1121+
1122+
Observable.range(1, 2)
1123+
// produce many integers per second
1124+
.flatMap(new Func1<Integer, Observable<Integer>>() {
1125+
@Override
1126+
public Observable<Integer> call(final Integer number) {
1127+
return Observable.range(1, Integer.MAX_VALUE)
1128+
.doOnRequest(new Action1<Long>() {
1129+
1130+
@Override
1131+
public void call(Long n) {
1132+
messages.add(">>>>>>>> A requested[" + number + "]: " + n);
1133+
}
1134+
1135+
})
1136+
// pause a bit
1137+
.doOnNext(pauseForMs(3))
1138+
// buffer on backpressure
1139+
.onBackpressureBuffer()
1140+
// do in parallel
1141+
.subscribeOn(Schedulers.computation())
1142+
.doOnRequest(new Action1<Long>() {
1143+
1144+
@Override
1145+
public void call(Long n) {
1146+
messages.add(">>>>>>>> B requested[" + number + "]: " + n);
1147+
}
1148+
1149+
});
1150+
}
1151+
1152+
})
1153+
// take a number bigger than 2* RxRingBuffer.SIZE (used by OperatorMerge)
1154+
.take(RxRingBuffer.SIZE * 2 + 1)
1155+
// log count
1156+
.doOnNext(printCount())
1157+
// release latch
1158+
.doOnCompleted(new Action0() {
1159+
@Override
1160+
public void call() {
1161+
latch.countDown();
1162+
}
1163+
}).subscribe();
1164+
boolean a = latch.await(2, TimeUnit.SECONDS);
1165+
if (!a) {
1166+
for (String s : messages) {
1167+
System.out.println("DEBUG => " + s);
1168+
}
1169+
}
1170+
assertTrue(a);
1171+
//}
1172+
}
1173+
1174+
private static Action1<Integer> printCount() {
1175+
return new Action1<Integer>() {
1176+
long count;
1177+
1178+
@Override
1179+
public void call(Integer t1) {
1180+
count++;
1181+
System.out.println("count=" + count);
1182+
}
1183+
};
1184+
}
1185+
1186+
private static Action1<Integer> pauseForMs(final long time) {
1187+
return new Action1<Integer>() {
1188+
@Override
1189+
public void call(Integer s) {
1190+
try {
1191+
Thread.sleep(time);
1192+
} catch (InterruptedException e) {
1193+
throw new RuntimeException(e);
1194+
}
1195+
}
1196+
};
1197+
}
11081198
}

0 commit comments

Comments
 (0)