Skip to content

Commit 3b911b8

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Fixed a non-deterministic test and a few scheduler leaks.
1 parent f260a57 commit 3b911b8

File tree

4 files changed

+159
-144
lines changed

4 files changed

+159
-144
lines changed

src/test/java/rx/BackpressureTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ public void testOnBackpressureDropWithAction() {
446446
final AtomicInteger emitCount = new AtomicInteger();
447447
final AtomicInteger dropCount = new AtomicInteger();
448448
final AtomicInteger passCount = new AtomicInteger();
449-
final int NUM = RxRingBuffer.SIZE * 3 / 2; // > 1 so that take doesn't prevent buffer overflow
449+
final int NUM = RxRingBuffer.SIZE * 3; // > 1 so that take doesn't prevent buffer overflow
450450
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
451451
firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
452452
@Override

src/test/java/rx/schedulers/AbstractSchedulerTests.java

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -154,43 +154,45 @@ public String call(String s) {
154154
public final void testSequenceOfActions() throws InterruptedException {
155155
final Scheduler scheduler = getScheduler();
156156
final Scheduler.Worker inner = scheduler.createWorker();
157-
158-
final CountDownLatch latch = new CountDownLatch(2);
159-
final Action0 first = mock(Action0.class);
160-
final Action0 second = mock(Action0.class);
161-
162-
// make it wait until both the first and second are called
163-
doAnswer(new Answer() {
164-
165-
@Override
166-
public Object answer(InvocationOnMock invocation) throws Throwable {
167-
try {
168-
return invocation.getMock();
169-
} finally {
170-
latch.countDown();
157+
try {
158+
final CountDownLatch latch = new CountDownLatch(2);
159+
final Action0 first = mock(Action0.class);
160+
final Action0 second = mock(Action0.class);
161+
162+
// make it wait until both the first and second are called
163+
doAnswer(new Answer() {
164+
165+
@Override
166+
public Object answer(InvocationOnMock invocation) throws Throwable {
167+
try {
168+
return invocation.getMock();
169+
} finally {
170+
latch.countDown();
171+
}
171172
}
172-
}
173-
}).when(first).call();
174-
doAnswer(new Answer() {
175-
176-
@Override
177-
public Object answer(InvocationOnMock invocation) throws Throwable {
178-
try {
179-
return invocation.getMock();
180-
} finally {
181-
latch.countDown();
173+
}).when(first).call();
174+
doAnswer(new Answer() {
175+
176+
@Override
177+
public Object answer(InvocationOnMock invocation) throws Throwable {
178+
try {
179+
return invocation.getMock();
180+
} finally {
181+
latch.countDown();
182+
}
182183
}
183-
}
184-
}).when(second).call();
185-
186-
inner.schedule(first);
187-
inner.schedule(second);
188-
189-
latch.await();
190-
191-
verify(first, times(1)).call();
192-
verify(second, times(1)).call();
193-
184+
}).when(second).call();
185+
186+
inner.schedule(first);
187+
inner.schedule(second);
188+
189+
latch.await();
190+
191+
verify(first, times(1)).call();
192+
verify(second, times(1)).call();
193+
} finally {
194+
inner.unsubscribe();
195+
}
194196
}
195197

196198
@Test

src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 63 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -426,63 +426,72 @@ public void testOnErrorThrowsDoesntPreventDelivery2() {
426426
public void testEmissionSubscriptionRace() throws Exception {
427427
Scheduler s = Schedulers.io();
428428
Scheduler.Worker worker = Schedulers.io().createWorker();
429-
for (int i = 0; i < 50000; i++) {
430-
if (i % 1000 == 0) {
431-
System.out.println(i);
432-
}
433-
final BehaviorSubject<Object> rs = BehaviorSubject.create();
434-
435-
final CountDownLatch finish = new CountDownLatch(1);
436-
final CountDownLatch start = new CountDownLatch(1);
437-
438-
worker.schedule(new Action0() {
439-
@Override
440-
public void call() {
441-
try {
442-
start.await();
443-
} catch (Exception e1) {
444-
e1.printStackTrace();
445-
}
446-
rs.onNext(1);
447-
}
448-
});
449-
450-
final AtomicReference<Object> o = new AtomicReference<Object>();
451-
452-
rs.subscribeOn(s).observeOn(Schedulers.io())
453-
.subscribe(new Observer<Object>() {
454-
455-
@Override
456-
public void onCompleted() {
457-
o.set(-1);
458-
finish.countDown();
459-
}
460-
461-
@Override
462-
public void onError(Throwable e) {
463-
o.set(e);
464-
finish.countDown();
465-
}
466-
467-
@Override
468-
public void onNext(Object t) {
469-
o.set(t);
470-
finish.countDown();
429+
try {
430+
for (int i = 0; i < 50000; i++) {
431+
if (i % 1000 == 0) {
432+
System.out.println(i);
471433
}
434+
final BehaviorSubject<Object> rs = BehaviorSubject.create();
435+
436+
final CountDownLatch finish = new CountDownLatch(1);
437+
final CountDownLatch start = new CountDownLatch(1);
438+
439+
worker.schedule(new Action0() {
440+
@Override
441+
public void call() {
442+
try {
443+
start.await();
444+
} catch (Exception e1) {
445+
e1.printStackTrace();
446+
}
447+
rs.onNext(1);
448+
}
449+
});
450+
451+
final AtomicReference<Object> o = new AtomicReference<Object>();
472452

473-
});
474-
start.countDown();
475-
476-
if (!finish.await(5, TimeUnit.SECONDS)) {
477-
System.out.println(o.get());
478-
System.out.println(rs.hasObservers());
479-
rs.onCompleted();
480-
Assert.fail("Timeout @ " + i);
481-
break;
482-
} else {
483-
Assert.assertEquals(1, o.get());
484-
rs.onCompleted();
453+
rs.subscribeOn(s).observeOn(Schedulers.io())
454+
.subscribe(new Observer<Object>() {
455+
456+
@Override
457+
public void onCompleted() {
458+
o.set(-1);
459+
finish.countDown();
460+
}
461+
462+
@Override
463+
public void onError(Throwable e) {
464+
o.set(e);
465+
finish.countDown();
466+
}
467+
468+
@Override
469+
public void onNext(Object t) {
470+
o.set(t);
471+
finish.countDown();
472+
}
473+
474+
});
475+
start.countDown();
476+
477+
if (!finish.await(5, TimeUnit.SECONDS)) {
478+
System.out.println(o.get());
479+
System.out.println(rs.hasObservers());
480+
rs.onCompleted();
481+
Assert.fail("Timeout @ " + i);
482+
break;
483+
} else {
484+
Assert.assertEquals(1, o.get());
485+
worker.schedule(new Action0() {
486+
@Override
487+
public void call() {
488+
rs.onCompleted();
489+
}
490+
});
491+
}
485492
}
493+
} finally {
494+
worker.unsubscribe();
486495
}
487496
}
488497

src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java

Lines changed: 58 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -342,68 +342,72 @@ public void run() {
342342
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
343343
Scheduler s = Schedulers.io();
344344
Scheduler.Worker worker = Schedulers.io().createWorker();
345-
for (int i = 0; i < 50000; i++) {
346-
if (i % 1000 == 0) {
347-
System.out.println(i);
348-
}
349-
final ReplaySubject<Object> rs = ReplaySubject.createWithSize(2);
350-
351-
final CountDownLatch finish = new CountDownLatch(1);
352-
final CountDownLatch start = new CountDownLatch(1);
353-
354-
worker.schedule(new Action0() {
355-
@Override
356-
public void call() {
357-
try {
358-
start.await();
359-
} catch (Exception e1) {
360-
e1.printStackTrace();
361-
}
362-
rs.onNext(1);
363-
}
364-
});
365-
366-
final AtomicReference<Object> o = new AtomicReference<Object>();
367-
368-
rs.subscribeOn(s).observeOn(Schedulers.io())
369-
.subscribe(new Observer<Object>() {
370-
371-
@Override
372-
public void onCompleted() {
373-
o.set(-1);
374-
finish.countDown();
375-
}
376-
377-
@Override
378-
public void onError(Throwable e) {
379-
o.set(e);
380-
finish.countDown();
381-
}
382-
383-
@Override
384-
public void onNext(Object t) {
385-
o.set(t);
386-
finish.countDown();
345+
try {
346+
for (int i = 0; i < 50000; i++) {
347+
if (i % 1000 == 0) {
348+
System.out.println(i);
387349
}
350+
final ReplaySubject<Object> rs = ReplaySubject.createWithSize(2);
351+
352+
final CountDownLatch finish = new CountDownLatch(1);
353+
final CountDownLatch start = new CountDownLatch(1);
388354

389-
});
390-
start.countDown();
391-
392-
if (!finish.await(5, TimeUnit.SECONDS)) {
393-
System.out.println(o.get());
394-
System.out.println(rs.hasObservers());
395-
rs.onCompleted();
396-
Assert.fail("Timeout @ " + i);
397-
break;
398-
} else {
399-
Assert.assertEquals(1, o.get());
400355
worker.schedule(new Action0() {
401356
@Override
402357
public void call() {
403-
rs.onCompleted();
358+
try {
359+
start.await();
360+
} catch (Exception e1) {
361+
e1.printStackTrace();
362+
}
363+
rs.onNext(1);
364+
}
365+
});
366+
367+
final AtomicReference<Object> o = new AtomicReference<Object>();
368+
369+
rs.subscribeOn(s).observeOn(Schedulers.io())
370+
.subscribe(new Observer<Object>() {
371+
372+
@Override
373+
public void onCompleted() {
374+
o.set(-1);
375+
finish.countDown();
376+
}
377+
378+
@Override
379+
public void onError(Throwable e) {
380+
o.set(e);
381+
finish.countDown();
404382
}
383+
384+
@Override
385+
public void onNext(Object t) {
386+
o.set(t);
387+
finish.countDown();
388+
}
389+
405390
});
391+
start.countDown();
392+
393+
if (!finish.await(5, TimeUnit.SECONDS)) {
394+
System.out.println(o.get());
395+
System.out.println(rs.hasObservers());
396+
rs.onCompleted();
397+
Assert.fail("Timeout @ " + i);
398+
break;
399+
} else {
400+
Assert.assertEquals(1, o.get());
401+
worker.schedule(new Action0() {
402+
@Override
403+
public void call() {
404+
rs.onCompleted();
405+
}
406+
});
407+
}
406408
}
409+
} finally {
410+
worker.unsubscribe();
407411
}
408412
}
409413
@Test(timeout = 5000)

0 commit comments

Comments
 (0)