Skip to content

Commit 0507fd1

Browse files
committed
Merge pull request ReactiveX#2854 from akarnokd/AbstractOnSubscribeRequestFix
Fixes wrong request accounting in AbstractOnSubscribe
2 parents 1fdd9eb + 31339a2 commit 0507fd1

File tree

3 files changed

+59
-16
lines changed

3 files changed

+59
-16
lines changed

src/main/java/rx/internal/operators/BackpressureUtils.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
* Utility functions for use with backpressure.
2323
*
2424
*/
25-
final class BackpressureUtils {
26-
25+
public final class BackpressureUtils {
26+
/** Utility class, no instances. */
27+
private BackpressureUtils() {
28+
throw new IllegalStateException("No instances!");
29+
}
2730
/**
2831
* Adds {@code n} to {@code requested} field and returns the value prior to
2932
* addition once the addition is successful (uses CAS semantics). If
@@ -37,16 +40,18 @@ final class BackpressureUtils {
3740
* the number of requests to add to the requested count
3841
* @return requested value just prior to successful addition
3942
*/
40-
static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
43+
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
4144
// add n to field but check for overflow
4245
while (true) {
4346
long current = requested.get(object);
4447
long next = current + n;
4548
// check for overflow
46-
if (next < 0)
49+
if (next < 0) {
4750
next = Long.MAX_VALUE;
48-
if (requested.compareAndSet(object, current, next))
51+
}
52+
if (requested.compareAndSet(object, current, next)) {
4953
return current;
54+
}
5055
}
5156
}
5257

@@ -63,16 +68,18 @@ static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object,
6368
* the number of requests to add to the requested count
6469
* @return requested value just prior to successful addition
6570
*/
66-
static <T> long getAndAddRequest(AtomicLong requested, long n) {
71+
public static long getAndAddRequest(AtomicLong requested, long n) {
6772
// add n to field but check for overflow
6873
while (true) {
6974
long current = requested.get();
7075
long next = current + n;
7176
// check for overflow
72-
if (next < 0)
77+
if (next < 0) {
7378
next = Long.MAX_VALUE;
74-
if (requested.compareAndSet(current, next))
79+
}
80+
if (requested.compareAndSet(current, next)) {
7581
return current;
82+
}
7683
}
7784
}
7885
}

src/main/java/rx/observables/AbstractOnSubscribe.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.annotations.Experimental;
2525
import rx.exceptions.CompositeException;
2626
import rx.functions.*;
27+
import rx.internal.operators.BackpressureUtils;
2728

2829
/**
2930
* Abstract base class for the {@link OnSubscribe} interface that helps you build Observable sources one
@@ -332,14 +333,15 @@ private SubscriptionProducer(SubscriptionState<T, S> state) {
332333
}
333334
@Override
334335
public void request(long n) {
335-
if (n == Long.MAX_VALUE) {
336-
for (; !state.subscriber.isUnsubscribed(); ) {
337-
if (!doNext()) {
338-
break;
336+
if (n > 0 && BackpressureUtils.getAndAddRequest(state.requestCount, n) == 0) {
337+
if (n == Long.MAX_VALUE) {
338+
// fast-path
339+
for (; !state.subscriber.isUnsubscribed(); ) {
340+
if (!doNext()) {
341+
break;
342+
}
339343
}
340-
}
341-
} else
342-
if (n > 0 && state.requestCount.getAndAdd(n) == 0) {
344+
} else
343345
if (!state.subscriber.isUnsubscribed()) {
344346
do {
345347
if (!doNext()) {

src/test/java/rx/observables/AbstractOnSubscribeTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
package rx.observables;
1818

19-
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.*;
2020
import static org.mockito.Matchers.any;
2121
import static org.mockito.Mockito.*;
2222

2323
import java.util.*;
2424
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import org.junit.Test;
2728
import org.mockito.InOrder;
@@ -503,4 +504,37 @@ public void testMissingEmission() {
503504
verify(o, never()).onNext(any(Object.class));
504505
verify(o).onError(any(IllegalStateException.class));
505506
}
507+
508+
@Test
509+
public void testCanRequestInOnNext() {
510+
AbstractOnSubscribe<Integer, Void> aos = new AbstractOnSubscribe<Integer, Void>() {
511+
@Override
512+
protected void next(SubscriptionState<Integer, Void> state) {
513+
state.onNext(1);
514+
state.onCompleted();
515+
}
516+
};
517+
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
518+
aos.toObservable().subscribe(new Subscriber<Integer>() {
519+
520+
@Override
521+
public void onCompleted() {
522+
523+
}
524+
525+
@Override
526+
public void onError(Throwable e) {
527+
exception.set(e);
528+
}
529+
530+
@Override
531+
public void onNext(Integer t) {
532+
request(1);
533+
}
534+
});
535+
if (exception.get()!=null) {
536+
exception.get().printStackTrace();
537+
}
538+
assertNull(exception.get());
539+
}
506540
}

0 commit comments

Comments
 (0)