Skip to content

1.x: improve coverage of rx.Observable methods #4178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions src/main/java/rx/Notification.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,12 @@ public boolean isOnNext() {
* @param observer the target observer to call onXXX methods on based on the kind of this Notification instance
*/
public void accept(Observer<? super T> observer) {
switch (kind) {
case OnNext:
if (kind == Kind.OnNext) {
observer.onNext(getValue());
break;
case OnError:
observer.onError(getThrowable());
break;
case OnCompleted:
} else if (kind == Kind.OnCompleted) {
observer.onCompleted();
break;
default:
throw new AssertionError("Uncovered case: " + kind);
} else {
observer.onError(getThrowable());
}
}

Expand Down
21 changes: 14 additions & 7 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9781,6 +9781,9 @@ public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}

Expand Down Expand Up @@ -9822,7 +9825,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
Expand Down Expand Up @@ -10699,14 +10702,15 @@ public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Sche
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeInterval} operates by default on the {@code immediate} {@link Scheduler}.</dd>
* <dd>{@code timeInterval} does not operate on any particular scheduler but uses the current time
* from the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @return an Observable that emits time interval information items
* @see <a href="http://reactivex.io/documentation/operators/timeinterval.html">ReactiveX operators documentation: TimeInterval</a>
*/
public final Observable<TimeInterval<T>> timeInterval() {
return timeInterval(Schedulers.immediate());
return timeInterval(Schedulers.computation());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope nobody relied on that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unlikely somebody tested against real-time with this overload.

}

/**
Expand All @@ -10719,7 +10723,8 @@ public final Observable<TimeInterval<T>> timeInterval() {
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* <dd>The operator does not operate on any particular scheduler but uses the current time
* from the specified {@link Scheduler}.</dd>
* </dl>
*
* @param scheduler
Expand Down Expand Up @@ -11003,14 +11008,15 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timestamp} operates by default on the {@code immediate} {@link Scheduler}.</dd>
* <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
* from the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @return an Observable that emits timestamped items from the source Observable
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
public final Observable<Timestamped<T>> timestamp() {
return timestamp(Schedulers.immediate());
return timestamp(Schedulers.computation());
}

/**
Expand All @@ -11023,7 +11029,8 @@ public final Observable<Timestamped<T>> timestamp() {
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* <dd>The operator does not operate on any particular scheduler but uses the current time
* from the specified {@link Scheduler}.</dd>
* </dl>
*
* @param scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected final void complete() {

/**
* Atomically switches to the terminal state and emits the value if
* there is a request for it or stores it for retrieval by {@link #downstreamRequest(long)}.
* there is a request for it or stores it for retrieval by {@code downstreamRequest(long)}.
* @param value the value to complete with
*/
protected final void complete(R value) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* limitations under the License.
*/

import static rx.Observable.create;
import static rx.Observable.create; // NOPMD

import java.util.concurrent.atomic.*;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super

@Override
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> child) {
final GroupBySubscriber<T, K, V> parent;
final GroupBySubscriber<T, K, V> parent; // NOPMD
try {
parent = new GroupBySubscriber<T, K, V>(child, keySelector, valueSelector, bufferSize, delayError, mapFactory);
} catch (Throwable ex) {
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/rx/internal/util/SynchronizedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ public boolean equals(Object obj) {
return false;
}
SynchronizedQueue<?> other = (SynchronizedQueue<?>) obj;
if (!list.equals(other.list)) {
return false;
}
return true;
return list.equals(other.list);
}

@Override
Expand Down
142 changes: 91 additions & 51 deletions src/test/java/rx/NotificationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;

import java.util.*;

Expand All @@ -31,65 +32,65 @@ public class NotificationTest {
public void testOnNextIntegerNotificationDoesNotEqualNullNotification(){
final Notification<Integer> integerNotification = Notification.createOnNext(1);
final Notification<Integer> nullNotification = Notification.createOnNext(null);
Assert.assertFalse(integerNotification.equals(nullNotification));
assertFalse(integerNotification.equals(nullNotification));
}

@Test
public void testOnNextNullNotificationDoesNotEqualIntegerNotification(){
final Notification<Integer> integerNotification = Notification.createOnNext(1);
final Notification<Integer> nullNotification = Notification.createOnNext(null);
Assert.assertFalse(nullNotification.equals(integerNotification));
assertFalse(nullNotification.equals(integerNotification));
}

@Test
public void testOnNextIntegerNotificationsWhenEqual(){
final Notification<Integer> integerNotification = Notification.createOnNext(1);
final Notification<Integer> integerNotification2 = Notification.createOnNext(1);
Assert.assertTrue(integerNotification.equals(integerNotification2));
assertTrue(integerNotification.equals(integerNotification2));
}

@Test
public void testOnNextIntegerNotificationsWhenNotEqual(){
final Notification<Integer> integerNotification = Notification.createOnNext(1);
final Notification<Integer> integerNotification2 = Notification.createOnNext(2);
Assert.assertFalse(integerNotification.equals(integerNotification2));
assertFalse(integerNotification.equals(integerNotification2));
}

@Test
public void testOnErrorIntegerNotificationDoesNotEqualNullNotification(){
final Notification<Integer> integerNotification = Notification.createOnError(new Exception());
final Notification<Integer> nullNotification = Notification.createOnError(null);
Assert.assertFalse(integerNotification.equals(nullNotification));
assertFalse(integerNotification.equals(nullNotification));
}

@Test
public void testOnErrorNullNotificationDoesNotEqualIntegerNotification(){
final Notification<Integer> integerNotification = Notification.createOnError(new Exception());
final Notification<Integer> nullNotification = Notification.createOnError(null);
Assert.assertFalse(nullNotification.equals(integerNotification));
assertFalse(nullNotification.equals(integerNotification));
}

@Test
public void testOnErrorIntegerNotificationsWhenEqual(){
final Exception exception = new Exception();
final Notification<Integer> onErrorNotification = Notification.createOnError(exception);
final Notification<Integer> onErrorNotification2 = Notification.createOnError(exception);
Assert.assertTrue(onErrorNotification.equals(onErrorNotification2));
assertTrue(onErrorNotification.equals(onErrorNotification2));
}

@Test
public void testOnErrorIntegerNotificationWhenNotEqual(){
final Notification<Integer> onErrorNotification = Notification.createOnError(new Exception());
final Notification<Integer> onErrorNotification2 = Notification.createOnError(new Exception());
Assert.assertFalse(onErrorNotification.equals(onErrorNotification2));
assertFalse(onErrorNotification.equals(onErrorNotification2));
}

@Test
public void createWithClass() {
Notification<Integer> n = Notification.createOnCompleted(Integer.class);
Assert.assertTrue(n.isOnCompleted());
Assert.assertFalse(n.hasThrowable());
Assert.assertFalse(n.hasValue());
assertTrue(n.isOnCompleted());
assertFalse(n.hasThrowable());
assertFalse(n.hasValue());
}

@Test
Expand Down Expand Up @@ -121,9 +122,9 @@ static String stripAt(String s) {

@Test
public void toStringVariants() {
Assert.assertEquals("[rx.Notification OnNext 1]", stripAt(Notification.createOnNext(1).toString()));
Assert.assertEquals("[rx.Notification OnError Forced failure]", stripAt(Notification.createOnError(new TestException("Forced failure")).toString()));
Assert.assertEquals("[rx.Notification OnCompleted]", stripAt(Notification.createOnCompleted().toString()));
assertEquals("[rx.Notification OnNext 1]", stripAt(Notification.createOnNext(1).toString()));
assertEquals("[rx.Notification OnError Forced failure]", stripAt(Notification.createOnError(new TestException("Forced failure")).toString()));
assertEquals("[rx.Notification OnCompleted]", stripAt(Notification.createOnCompleted().toString()));
}

@Test
Expand All @@ -134,7 +135,7 @@ public void hashCodeWorks() {
Notification<Integer> e1 = Notification.createOnError(new TestException());
Notification<Integer> c1 = Notification.createOnCompleted();

Assert.assertEquals(n1.hashCode(), n1a.hashCode());
assertEquals(n1.hashCode(), n1a.hashCode());

Set<Notification<Integer>> set = new HashSet<Notification<Integer>>();

Expand All @@ -143,11 +144,11 @@ public void hashCodeWorks() {
set.add(e1);
set.add(c1);

Assert.assertTrue(set.contains(n1));
Assert.assertTrue(set.contains(n1a));
Assert.assertTrue(set.contains(n2));
Assert.assertTrue(set.contains(e1));
Assert.assertTrue(set.contains(c1));
assertTrue(set.contains(n1));
assertTrue(set.contains(n1a));
assertTrue(set.contains(n2));
assertTrue(set.contains(e1));
assertTrue(set.contains(c1));
}

@Test
Expand All @@ -164,27 +165,27 @@ public void equalsWorks() {
Notification<Integer> c1 = Notification.createOnCompleted();
Notification<Integer> c2 = Notification.createOnCompleted();

Assert.assertEquals(n1, n1a);
Assert.assertNotEquals(n1, n2);
Assert.assertNotEquals(n2, n1);
assertEquals(n1, n1a);
assertNotEquals(n1, n2);
assertNotEquals(n2, n1);

Assert.assertNotEquals(n1, e1);
Assert.assertNotEquals(e1, n1);
Assert.assertNotEquals(e1, c1);
Assert.assertNotEquals(n1, c1);
Assert.assertNotEquals(c1, n1);
Assert.assertNotEquals(c1, e1);
assertNotEquals(n1, e1);
assertNotEquals(e1, n1);
assertNotEquals(e1, c1);
assertNotEquals(n1, c1);
assertNotEquals(c1, n1);
assertNotEquals(c1, e1);

Assert.assertEquals(e1, e1);
Assert.assertEquals(e1, e2);
assertEquals(e1, e1);
assertNotEquals(e1, e2);

Assert.assertEquals(c1, c2);
assertEquals(c1, c2);

Assert.assertFalse(n1.equals(null));
Assert.assertFalse(n1.equals(1));
assertFalse(n1.equals(null));
assertFalse(n1.equals(1));

Assert.assertEquals(z1a, z1);
Assert.assertEquals(z1, z1a);
assertEquals(z1a, z1);
assertEquals(z1, z1a);
}

@Test
Expand All @@ -195,25 +196,64 @@ public void contentChecks() {
Notification<Integer> e2 = Notification.createOnError(null);
Notification<Integer> c1 = Notification.createOnCompleted();

Assert.assertFalse(z1.hasValue());
Assert.assertFalse(z1.hasThrowable());
Assert.assertFalse(z1.isOnCompleted());
assertFalse(z1.hasValue());
assertFalse(z1.hasThrowable());
assertFalse(z1.isOnCompleted());

Assert.assertTrue(n1.hasValue());
Assert.assertFalse(n1.hasThrowable());
Assert.assertFalse(n1.isOnCompleted());
assertTrue(n1.hasValue());
assertFalse(n1.hasThrowable());
assertFalse(n1.isOnCompleted());

Assert.assertFalse(e1.hasValue());
Assert.assertTrue(e1.hasThrowable());
Assert.assertFalse(e1.isOnCompleted());
assertFalse(e1.hasValue());
assertTrue(e1.hasThrowable());
assertFalse(e1.isOnCompleted());

Assert.assertFalse(e2.hasValue());
Assert.assertFalse(e2.hasThrowable());
Assert.assertFalse(e2.isOnCompleted());
assertFalse(e2.hasValue());
assertFalse(e2.hasThrowable());
assertFalse(e2.isOnCompleted());

Assert.assertFalse(c1.hasValue());
Assert.assertFalse(c1.hasThrowable());
Assert.assertTrue(c1.isOnCompleted());
assertFalse(c1.hasValue());
assertFalse(c1.hasThrowable());
assertTrue(c1.isOnCompleted());

}

@Test
public void exceptionEquality() {
EqualException ex1 = new EqualException("1");
EqualException ex2 = new EqualException("1");
EqualException ex3 = new EqualException("3");

Notification<Integer> e1 = Notification.createOnError(ex1);
Notification<Integer> e2 = Notification.createOnError(ex2);
Notification<Integer> e3 = Notification.createOnError(ex3);

assertEquals(e1, e1);
assertEquals(e1, e2);
assertEquals(e2, e1);
assertEquals(e2, e2);

assertNotEquals(e1, e3);
assertNotEquals(e2, e3);
assertNotEquals(e3, e1);
assertNotEquals(e3, e2);
}

static final class EqualException extends RuntimeException {

/** */
private static final long serialVersionUID = 446310455393317050L;

public EqualException(String message) {
super(message);
}

@Override
public boolean equals(Object o) {
if (o instanceof EqualException) {
return getMessage().equals(((EqualException)o).getMessage());
}
return false;
}
}
}
Loading