Skip to content

Commit 6518eb9

Browse files
authored
3.x: Fix size+time bound window not creating windows properly (ReactiveX#6652)
1 parent 0c50f0a commit 6518eb9

File tree

6 files changed

+105
-15
lines changed

6 files changed

+105
-15
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ void drainLoop() {
498498

499499
if (isHolder) {
500500
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
501-
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
501+
if (!restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
502502
w.onComplete();
503503
count = 0;
504504
w = UnicastProcessor.<T>create(bufferSize);

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ void drainLoop() {
444444

445445
if (isHolder) {
446446
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
447-
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
447+
if (!restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
448448
w.onComplete();
449449
count = 0;
450450
w = UnicastSubject.create(bufferSize);

src/test/java/io/reactivex/rxjava3/flowable/FlowableWindowTests.java

+43
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
1920

2021
import org.junit.Test;
2122

2223
import io.reactivex.rxjava3.core.*;
2324
import io.reactivex.rxjava3.functions.*;
25+
import io.reactivex.rxjava3.processors.PublishProcessor;
26+
import io.reactivex.rxjava3.schedulers.TestScheduler;
27+
import io.reactivex.rxjava3.subscribers.TestSubscriber;
2428

2529
public class FlowableWindowTests extends RxJavaTest {
2630

@@ -50,4 +54,43 @@ public void accept(List<Integer> xs) {
5054
assertEquals(2, lists.size());
5155

5256
}
57+
58+
@Test
59+
public void timeSizeWindowAlternatingBounds() {
60+
TestScheduler scheduler = new TestScheduler();
61+
PublishProcessor<Integer> pp = PublishProcessor.create();
62+
63+
TestSubscriber<List<Integer>> ts = pp.window(5, TimeUnit.SECONDS, scheduler, 2)
64+
.flatMapSingle(new Function<Flowable<Integer>, SingleSource<List<Integer>>>() {
65+
@Override
66+
public SingleSource<List<Integer>> apply(Flowable<Integer> v) throws Throwable {
67+
return v.toList();
68+
}
69+
})
70+
.test();
71+
72+
pp.onNext(1);
73+
pp.onNext(2);
74+
ts.assertValueCount(1); // size bound hit
75+
76+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
77+
pp.onNext(3);
78+
scheduler.advanceTimeBy(6, TimeUnit.SECONDS);
79+
ts.assertValueCount(2); // time bound hit
80+
81+
pp.onNext(4);
82+
pp.onNext(5);
83+
84+
ts.assertValueCount(3); // size bound hit again
85+
86+
pp.onNext(4);
87+
88+
scheduler.advanceTimeBy(6, TimeUnit.SECONDS);
89+
90+
ts.assertValueCount(4)
91+
.assertNoErrors()
92+
.assertNotComplete();
93+
94+
ts.cancel();
95+
}
5396
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,19 @@ public void subscribe(Subscriber<? super String> subscriber) {
6565
Flowable<Flowable<String>> windowed = source.window(100, TimeUnit.MILLISECONDS, scheduler, 2);
6666
windowed.subscribe(observeWindow(list, lists));
6767

68-
scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
68+
scheduler.advanceTimeTo(95, TimeUnit.MILLISECONDS);
6969
assertEquals(1, lists.size());
7070
assertEquals(lists.get(0), list("one", "two"));
7171

72-
scheduler.advanceTimeTo(200, TimeUnit.MILLISECONDS);
73-
assertEquals(2, lists.size());
74-
assertEquals(lists.get(1), list("three", "four"));
72+
scheduler.advanceTimeTo(195, TimeUnit.MILLISECONDS);
73+
assertEquals(3, lists.size());
74+
assertTrue(lists.get(1).isEmpty());
75+
assertEquals(lists.get(2), list("three", "four"));
7576

7677
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
77-
assertEquals(3, lists.size());
78-
assertEquals(lists.get(2), list("five"));
78+
assertEquals(5, lists.size());
79+
assertTrue(lists.get(3).isEmpty());
80+
assertEquals(lists.get(4), list("five"));
7981
}
8082

8183
@Test

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,19 @@ public void subscribe(Observer<? super String> observer) {
6565
Observable<Observable<String>> windowed = source.window(100, TimeUnit.MILLISECONDS, scheduler, 2);
6666
windowed.subscribe(observeWindow(list, lists));
6767

68-
scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
68+
scheduler.advanceTimeTo(95, TimeUnit.MILLISECONDS);
6969
assertEquals(1, lists.size());
7070
assertEquals(lists.get(0), list("one", "two"));
7171

72-
scheduler.advanceTimeTo(200, TimeUnit.MILLISECONDS);
73-
assertEquals(2, lists.size());
74-
assertEquals(lists.get(1), list("three", "four"));
72+
scheduler.advanceTimeTo(195, TimeUnit.MILLISECONDS);
73+
assertEquals(3, lists.size());
74+
assertTrue(lists.get(1).isEmpty());
75+
assertEquals(lists.get(2), list("three", "four"));
7576

7677
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
77-
assertEquals(3, lists.size());
78-
assertEquals(lists.get(2), list("five"));
78+
assertEquals(5, lists.size());
79+
assertTrue(lists.get(3).isEmpty());
80+
assertEquals(lists.get(4), list("five"));
7981
}
8082

8183
@Test

src/test/java/io/reactivex/rxjava3/observable/ObservableWindowTests.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.*;
19+
import java.util.concurrent.TimeUnit;
1920

2021
import org.junit.Test;
2122

23+
import io.reactivex.rxjava3.core.*;
2224
import io.reactivex.rxjava3.core.Observable;
23-
import io.reactivex.rxjava3.core.RxJavaTest;
2425
import io.reactivex.rxjava3.functions.*;
26+
import io.reactivex.rxjava3.observers.TestObserver;
27+
import io.reactivex.rxjava3.schedulers.*;
28+
import io.reactivex.rxjava3.subjects.PublishSubject;
2529

2630
public class ObservableWindowTests extends RxJavaTest {
2731

@@ -51,4 +55,43 @@ public void accept(List<Integer> xs) {
5155
assertEquals(2, lists.size());
5256

5357
}
58+
59+
@Test
60+
public void timeSizeWindowAlternatingBounds() {
61+
TestScheduler scheduler = new TestScheduler();
62+
PublishSubject<Integer> ps = PublishSubject.create();
63+
64+
TestObserver<List<Integer>> to = ps.window(5, TimeUnit.SECONDS, scheduler, 2)
65+
.flatMapSingle(new Function<Observable<Integer>, SingleSource<List<Integer>>>() {
66+
@Override
67+
public SingleSource<List<Integer>> apply(Observable<Integer> v) throws Throwable {
68+
return v.toList();
69+
}
70+
})
71+
.test();
72+
73+
ps.onNext(1);
74+
ps.onNext(2);
75+
to.assertValueCount(1); // size bound hit
76+
77+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
78+
ps.onNext(3);
79+
scheduler.advanceTimeBy(6, TimeUnit.SECONDS);
80+
to.assertValueCount(2); // time bound hit
81+
82+
ps.onNext(4);
83+
ps.onNext(5);
84+
85+
to.assertValueCount(3); // size bound hit again
86+
87+
ps.onNext(4);
88+
89+
scheduler.advanceTimeBy(6, TimeUnit.SECONDS);
90+
91+
to.assertValueCount(4)
92+
.assertNoErrors()
93+
.assertNotComplete();
94+
95+
to.dispose();
96+
}
5497
}

0 commit comments

Comments
 (0)