Skip to content

Commit a1b9655

Browse files
authored
3.x: Fix delay-cancellation race producing random subsequent emissions (#7855)
1 parent 8b4eece commit a1b9655

File tree

4 files changed

+78
-3
lines changed

4 files changed

+78
-3
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
111111

112112
@Override
113113
public void run() {
114-
downstream.onNext(t);
114+
if (!w.isDisposed()) {
115+
downstream.onNext(t);
116+
}
115117
}
116118
}
117119

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
111111

112112
@Override
113113
public void run() {
114-
downstream.onNext(t);
114+
if (!w.isDisposed()) {
115+
downstream.onNext(t);
116+
}
115117
}
116118
}
117119

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.*;
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.*;
23+
import java.util.concurrent.locks.LockSupport;
2324

2425
import org.junit.*;
2526
import org.mockito.InOrder;
@@ -28,6 +29,7 @@
2829
import io.reactivex.rxjava3.core.*;
2930
import io.reactivex.rxjava3.exceptions.TestException;
3031
import io.reactivex.rxjava3.functions.*;
32+
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
3133
import io.reactivex.rxjava3.internal.functions.Functions;
3234
import io.reactivex.rxjava3.processors.PublishProcessor;
3335
import io.reactivex.rxjava3.schedulers.*;
@@ -1030,4 +1032,38 @@ public Publisher<Object> apply(Integer t) throws Exception {
10301032
.to(TestHelper.<Integer>testConsumer())
10311033
.assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher");
10321034
}
1035+
1036+
@Test
1037+
public void cancelShouldPreventRandomSubsequentEmissions() {
1038+
for (int attempt = 1; attempt < 100; attempt ++) {
1039+
1040+
SequentialDisposable disposable = new SequentialDisposable();
1041+
ConcurrentLinkedQueue<Integer> sink = new ConcurrentLinkedQueue<>();
1042+
1043+
disposable.replace(
1044+
Flowable.range(1, 10)
1045+
.delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true)
1046+
.doOnNext(v -> {
1047+
if (v == 1) {
1048+
Schedulers.computation().scheduleDirect(disposable::dispose);
1049+
}
1050+
sink.offer(v);
1051+
})
1052+
.subscribe());
1053+
1054+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
1055+
1056+
Integer last = null;
1057+
1058+
while (!sink.isEmpty()) {
1059+
Integer current = sink.poll();
1060+
1061+
if (last != null && last + 1 != current) {
1062+
fail("Emission hole: " + last + " -> " + current);
1063+
}
1064+
1065+
last = current;
1066+
}
1067+
}
1068+
}
10331069
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.*;
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.concurrent.locks.LockSupport;
2324

2425
import org.junit.*;
2526
import org.mockito.InOrder;
@@ -29,6 +30,7 @@
2930
import io.reactivex.rxjava3.core.Observer;
3031
import io.reactivex.rxjava3.exceptions.TestException;
3132
import io.reactivex.rxjava3.functions.*;
33+
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
3234
import io.reactivex.rxjava3.internal.functions.Functions;
3335
import io.reactivex.rxjava3.observers.*;
3436
import io.reactivex.rxjava3.schedulers.*;
@@ -978,4 +980,37 @@ public Observable<Object> apply(Integer t) throws Exception {
978980
.to(TestHelper.<Integer>testConsumer())
979981
.assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null ObservableSource");
980982
}
981-
}
983+
984+
@Test
985+
public void cancelShouldPreventRandomSubsequentEmissions() {
986+
for (int attempt = 1; attempt < 100; attempt ++) {
987+
988+
SequentialDisposable disposable = new SequentialDisposable();
989+
ConcurrentLinkedQueue<Integer> sink = new ConcurrentLinkedQueue<>();
990+
991+
disposable.replace(
992+
Observable.range(1, 10)
993+
.delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true)
994+
.doOnNext(v -> {
995+
if (v == 1) {
996+
Schedulers.computation().scheduleDirect(disposable::dispose);
997+
}
998+
sink.offer(v);
999+
})
1000+
.subscribe());
1001+
1002+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
1003+
1004+
Integer last = null;
1005+
1006+
while (!sink.isEmpty()) {
1007+
Integer current = sink.poll();
1008+
1009+
if (last != null && last + 1 != current) {
1010+
fail("Emission hole: " + last + " -> " + current);
1011+
}
1012+
1013+
last = current;
1014+
}
1015+
}
1016+
}}

0 commit comments

Comments
 (0)