File tree Expand file tree Collapse file tree 1 file changed +5
-3
lines changed
rxjava-core/src/test/java/rx Expand file tree Collapse file tree 1 file changed +5
-3
lines changed Original file line number Diff line number Diff line change @@ -173,10 +173,12 @@ public Observable<Integer> call(Integer i) {
173
173
}).take (NUM ).subscribe (ts );
174
174
ts .awaitTerminalEvent ();
175
175
ts .assertNoErrors ();
176
- System .out .println ("testFlatMapAsync => Received: " + ts .getOnNextEvents ().size () + " Emitted: " + c .get ());
176
+ System .out .println ("testFlatMapAsync => Received: " + ts .getOnNextEvents ().size () + " Emitted: " + c .get () + " Size: " + RxRingBuffer . SIZE );
177
177
assertEquals (NUM , ts .getOnNextEvents ().size ());
178
- // expect less than 1 buffer since the flatMap is emitting 10 each time, so it is NUM/10 that will be taken.
179
- assertTrue (c .get () <= RxRingBuffer .SIZE );
178
+ // even though we only need 10, it will request at least RxRingBuffer.SIZE, and then as it drains keep requesting more
179
+ // and then it will be non-deterministic when the take() causes the unsubscribe as it is scheduled on 10 different schedulers (threads)
180
+ // normally this number is ~250 but can get up to ~1200 when RxRingBuffer.SIZE == 1024
181
+ assertTrue (c .get () <= RxRingBuffer .SIZE * 2 );
180
182
}
181
183
182
184
@ Ignore
You can’t perform that action at this time.
0 commit comments