Skip to content

Commit a6a6a33

Browse files
committed
Merge pull request ReactiveX#2851 from zsxwing/fix-debounce
Add 'request(Long.MAX_VALUE)' in 'onStart' to fix the backpressure issue of debounce
2 parents 34cd1a6 + 8b60352 commit a6a6a33

File tree

2 files changed

+24
-0
lines changed

2 files changed

+24
-0
lines changed

src/main/java/rx/internal/operators/OperatorDebounceWithTime.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
6262
return new Subscriber<T>(child) {
6363
final DebounceState<T> state = new DebounceState<T>();
6464
final Subscriber<?> self = this;
65+
66+
@Override
67+
public void onStart() {
68+
request(Long.MAX_VALUE);
69+
}
70+
6571
@Override
6672
public void onNext(final T t) {
6773

src/test/java/rx/internal/operators/OperatorDebounceTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.Mockito.times;
2424
import static org.mockito.Mockito.verify;
2525

26+
import java.util.Arrays;
2627
import java.util.concurrent.TimeUnit;
2728

2829
import org.junit.Before;
@@ -36,6 +37,7 @@
3637
import rx.exceptions.TestException;
3738
import rx.functions.Action0;
3839
import rx.functions.Func1;
40+
import rx.observers.TestSubscriber;
3941
import rx.schedulers.TestScheduler;
4042
import rx.subjects.PublishSubject;
4143

@@ -287,4 +289,20 @@ public Observable<Integer> call(Integer t1) {
287289
verify(o).onCompleted();
288290
verify(o, never()).onError(any(Throwable.class));
289291
}
292+
293+
@Test
294+
public void debounceWithTimeBackpressure() throws InterruptedException {
295+
TestScheduler scheduler = new TestScheduler();
296+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
297+
Observable.merge(
298+
Observable.just(1),
299+
Observable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
300+
).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(subscriber);
301+
302+
scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
303+
304+
subscriber.assertReceivedOnNext(Arrays.asList(2));
305+
subscriber.assertTerminalEvent();
306+
subscriber.assertNoErrors();
307+
}
290308
}

0 commit comments

Comments
 (0)