1515 */
1616package rx .observers ;
1717
18- import java .util .List ;
18+ import java .util .* ;
1919import java .util .concurrent .*;
2020
2121import rx .*;
22+ import rx .Observer ;
23+ import rx .annotations .Experimental ;
24+ import rx .exceptions .CompositeException ;
2225
2326/**
2427 * A {@code TestSubscriber} is a variety of {@link Subscriber} that you can use for unit testing, to perform
@@ -29,34 +32,72 @@ public class TestSubscriber<T> extends Subscriber<T> {
2932 private final TestObserver <T > testObserver ;
3033 private final CountDownLatch latch = new CountDownLatch (1 );
3134 private volatile Thread lastSeenThread ;
35+ /** Holds the initial request value. */
36+ private final long initialRequest ;
37+ /** The shared no-op observer. */
38+ private static final Observer <Object > INERT = new Observer <Object >() {
3239
33- public TestSubscriber (Subscriber <T > delegate ) {
40+ @ Override
41+ public void onCompleted () {
42+ // do nothing
43+ }
44+
45+ @ Override
46+ public void onError (Throwable e ) {
47+ // do nothing
48+ }
49+
50+ @ Override
51+ public void onNext (Object t ) {
52+ // do nothing
53+ }
54+
55+ };
56+
57+ /**
58+ * Constructs a TestSubscriber with the initial request to be requested from upstream.
59+ * @param initialRequest the initial request value, negative value will revert to the default unbounded behavior
60+ */
61+ @ SuppressWarnings ("unchecked" )
62+ @ Experimental
63+ public TestSubscriber (long initialRequest ) {
64+ this ((Observer <T >)INERT , initialRequest );
65+ }
66+
67+ /**
68+ * Constructs a TestSubscriber with the initial request to be requested from upstream
69+ * and a delegate Observer to wrap.
70+ * @param initialRequest the initial request value, negative value will revert to the default unbounded behavior
71+ * @param delegate the Observer instance to wrap
72+ */
73+ @ Experimental
74+ public TestSubscriber (Observer <T > delegate , long initialRequest ) {
75+ if (delegate == null ) {
76+ throw new NullPointerException ();
77+ }
3478 this .testObserver = new TestObserver <T >(delegate );
79+ this .initialRequest = initialRequest ;
80+ }
81+
82+ public TestSubscriber (Subscriber <T > delegate ) {
83+ this (delegate , -1 );
3584 }
3685
3786 public TestSubscriber (Observer <T > delegate ) {
38- this . testObserver = new TestObserver < T > (delegate );
87+ this (delegate , - 1 );
3988 }
4089
4190 public TestSubscriber () {
42- this .testObserver = new TestObserver <T >(new Observer <T >() {
43-
44- @ Override
45- public void onCompleted () {
46- // do nothing
47- }
48-
49- @ Override
50- public void onError (Throwable e ) {
51- // do nothing
52- }
53-
54- @ Override
55- public void onNext (T t ) {
56- // do nothing
57- }
58-
59- });
91+ this (-1 );
92+ }
93+
94+ @ Override
95+ public void onStart () {
96+ if (initialRequest >= 0 ) {
97+ requestMore (initialRequest );
98+ } else {
99+ super .onStart ();
100+ }
60101 }
61102
62103 /**
@@ -261,4 +302,121 @@ public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit uni
261302 public Thread getLastSeenThread () {
262303 return lastSeenThread ;
263304 }
264- }
305+
306+ /**
307+ * Assert if there is exactly a single completion event.
308+ */
309+ @ Experimental
310+ public void assertCompleted () {
311+ int s = testObserver .getOnCompletedEvents ().size ();
312+ if (s == 0 ) {
313+ throw new AssertionError ("Not completed!" );
314+ } else
315+ if (s > 1 ) {
316+ throw new AssertionError ("Completed multiple times: " + s );
317+ }
318+ }
319+ /**
320+ * Assert if there is no completion event.
321+ */
322+ @ Experimental
323+ public void assertNotCompleted () {
324+ int s = testObserver .getOnCompletedEvents ().size ();
325+ if (s == 1 ) {
326+ throw new AssertionError ("Completed!" );
327+ } else
328+ if (s > 1 ) {
329+ throw new AssertionError ("Completed multiple times: " + s );
330+ }
331+ }
332+ /**
333+ * Assert if there is exactly one error event which is a subclass of the given class.
334+ * @param clazz the class to check the error against.
335+ */
336+ @ Experimental
337+ public void assertError (Class <? extends Throwable > clazz ) {
338+ List <Throwable > err = testObserver .getOnErrorEvents ();
339+ if (err .size () == 0 ) {
340+ throw new AssertionError ("No errors" );
341+ } else
342+ if (err .size () > 1 ) {
343+ throw new AssertionError ("Multiple errors: " + err .size (), new CompositeException (err ));
344+ } else
345+ if (!clazz .isInstance (err .get (0 ))) {
346+ throw new AssertionError ("Exceptions differ; expected: " + clazz + ", actual: " + err .get (0 ), err .get (0 ));
347+ }
348+ }
349+ /**
350+ * Assert there is a single onError event with the exact exception.
351+ * @param throwable the throwable to check
352+ */
353+ @ Experimental
354+ public void assertError (Throwable throwable ) {
355+ List <Throwable > err = testObserver .getOnErrorEvents ();
356+ if (err .size () == 0 ) {
357+ throw new AssertionError ("No errors" );
358+ } else
359+ if (err .size () > 1 ) {
360+ throw new AssertionError ("Multiple errors: " + err .size (), new CompositeException (err ));
361+ } else
362+ if (throwable .equals (err .get (0 ))) {
363+ throw new AssertionError ("Exceptions differ; expected: " + throwable + ", actual: " + err .get (0 ), err .get (0 ));
364+ }
365+ }
366+ /**
367+ * Assert for no onError and onCompleted events.
368+ */
369+ @ Experimental
370+ public void assertNoTerminalEvent () {
371+ List <Throwable > err = testObserver .getOnErrorEvents ();
372+ int s = testObserver .getOnCompletedEvents ().size ();
373+ if (err .size () > 0 || s > 0 ) {
374+ if (err .isEmpty ()) {
375+ throw new AssertionError ("Found " + err .size () + " errors and " + s + " completion events instead of none" );
376+ } else
377+ if (err .size () == 1 ) {
378+ throw new AssertionError ("Found " + err .size () + " errors and " + s + " completion events instead of none" , err .get (0 ));
379+ } else {
380+ throw new AssertionError ("Found " + err .size () + " errors and " + s + " completion events instead of none" , new CompositeException (err ));
381+ }
382+ }
383+ }
384+ /**
385+ * Assert if there are no onNext events received.
386+ */
387+ @ Experimental
388+ public void assertNoValues () {
389+ int s = testObserver .getOnNextEvents ().size ();
390+ if (s > 0 ) {
391+ throw new AssertionError ("No onNext events expected yet some received: " + s );
392+ }
393+ }
394+ /**
395+ * Assert if the given number of onNext events are received.
396+ * @param count the expected number of onNext events
397+ */
398+ @ Experimental
399+ public void assertValueCount (int count ) {
400+ int s = testObserver .getOnNextEvents ().size ();
401+ if (s != count ) {
402+ throw new AssertionError ("Number of onNext events differ; expected: " + count + ", actual: " + s );
403+ }
404+ }
405+
406+ /**
407+ * Assert if the received onNext events, in order, are the specified values.
408+ * @param values the values to check
409+ */
410+ @ Experimental
411+ public void assertValues (T ... values ) {
412+ assertReceivedOnNext (Arrays .asList (values ));
413+ }
414+ /**
415+ * Assert if there is only a single received onNext event.
416+ * @param values the values to check
417+ */
418+ @ Experimental
419+ public void assertValue (T value ) {
420+ assertReceivedOnNext (Collections .singletonList (value ));
421+ }
422+ }
0 commit comments