Skip to content

Commit 97c4e53

Browse files
authored
1.x: improve coverage of rx.Observable methods (ReactiveX#4178)
* 1.x: improve coverage of rx.Observable methods * Added missing methods * Update based on feedback
1 parent 978825e commit 97c4e53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2488
-289
lines changed

src/main/java/rx/Notification.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,12 @@ public boolean isOnNext() {
158158
* @param observer the target observer to call onXXX methods on based on the kind of this Notification instance
159159
*/
160160
public void accept(Observer<? super T> observer) {
161-
switch (kind) {
162-
case OnNext:
161+
if (kind == Kind.OnNext) {
163162
observer.onNext(getValue());
164-
break;
165-
case OnError:
166-
observer.onError(getThrowable());
167-
break;
168-
case OnCompleted:
163+
} else if (kind == Kind.OnCompleted) {
169164
observer.onCompleted();
170-
break;
171-
default:
172-
throw new AssertionError("Uncovered case: " + kind);
165+
} else {
166+
observer.onError(getThrowable());
173167
}
174168
}
175169

src/main/java/rx/Observable.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9781,6 +9781,9 @@ public final Subscription subscribe(final Observer<? super T> observer) {
97819781
if (observer instanceof Subscriber) {
97829782
return subscribe((Subscriber<? super T>)observer);
97839783
}
9784+
if (observer == null) {
9785+
throw new NullPointerException("observer is null");
9786+
}
97849787
return subscribe(new ObserverSubscriber<T>(observer));
97859788
}
97869789

@@ -9822,7 +9825,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
98229825
Exceptions.throwIfFatal(e2);
98239826
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
98249827
// so we are unable to propagate the error correctly and will just throw
9825-
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
9828+
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
98269829
// TODO could the hook be the cause of the error in the on error handling.
98279830
RxJavaHooks.onObservableError(r);
98289831
// TODO why aren't we throwing the hook's return value.
@@ -10699,14 +10702,15 @@ public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Sche
1069910702
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
1070010703
* behavior.</dd>
1070110704
* <dt><b>Scheduler:</b></dt>
10702-
* <dd>{@code timeInterval} operates by default on the {@code immediate} {@link Scheduler}.</dd>
10705+
* <dd>{@code timeInterval} does not operate on any particular scheduler but uses the current time
10706+
* from the {@code computation} {@link Scheduler}.</dd>
1070310707
* </dl>
1070410708
*
1070510709
* @return an Observable that emits time interval information items
1070610710
* @see <a href="http://reactivex.io/documentation/operators/timeinterval.html">ReactiveX operators documentation: TimeInterval</a>
1070710711
*/
1070810712
public final Observable<TimeInterval<T>> timeInterval() {
10709-
return timeInterval(Schedulers.immediate());
10713+
return timeInterval(Schedulers.computation());
1071010714
}
1071110715

1071210716
/**
@@ -10719,7 +10723,8 @@ public final Observable<TimeInterval<T>> timeInterval() {
1071910723
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
1072010724
* behavior.</dd>
1072110725
* <dt><b>Scheduler:</b></dt>
10722-
* <dd>you specify which {@link Scheduler} this operator will use</dd>
10726+
* <dd>The operator does not operate on any particular scheduler but uses the current time
10727+
* from the specified {@link Scheduler}.</dd>
1072310728
* </dl>
1072410729
*
1072510730
* @param scheduler
@@ -11003,14 +11008,15 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
1100311008
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
1100411009
* behavior.</dd>
1100511010
* <dt><b>Scheduler:</b></dt>
11006-
* <dd>{@code timestamp} operates by default on the {@code immediate} {@link Scheduler}.</dd>
11011+
* <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
11012+
* from the {@code computation} {@link Scheduler}.</dd>
1100711013
* </dl>
1100811014
*
1100911015
* @return an Observable that emits timestamped items from the source Observable
1101011016
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
1101111017
*/
1101211018
public final Observable<Timestamped<T>> timestamp() {
11013-
return timestamp(Schedulers.immediate());
11019+
return timestamp(Schedulers.computation());
1101411020
}
1101511021

1101611022
/**
@@ -11023,7 +11029,8 @@ public final Observable<Timestamped<T>> timestamp() {
1102311029
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
1102411030
* behavior.</dd>
1102511031
* <dt><b>Scheduler:</b></dt>
11026-
* <dd>you specify which {@link Scheduler} this operator will use</dd>
11032+
* <dd>The operator does not operate on any particular scheduler but uses the current time
11033+
* from the specified {@link Scheduler}.</dd>
1102711034
* </dl>
1102811035
*
1102911036
* @param scheduler

src/main/java/rx/internal/operators/DeferredScalarSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected final void complete() {
8585

8686
/**
8787
* Atomically switches to the terminal state and emits the value if
88-
* there is a request for it or stores it for retrieval by {@link #downstreamRequest(long)}.
88+
* there is a request for it or stores it for retrieval by {@code downstreamRequest(long)}.
8989
* @param value the value to complete with
9090
*/
9191
protected final void complete(R value) {

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* limitations under the License.
3232
*/
3333

34-
import static rx.Observable.create;
34+
import static rx.Observable.create; // NOPMD
3535

3636
import java.util.concurrent.atomic.*;
3737

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super
7373

7474
@Override
7575
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> child) {
76-
final GroupBySubscriber<T, K, V> parent;
76+
final GroupBySubscriber<T, K, V> parent; // NOPMD
7777
try {
7878
parent = new GroupBySubscriber<T, K, V>(child, keySelector, valueSelector, bufferSize, delayError, mapFactory);
7979
} catch (Throwable ex) {

src/main/java/rx/internal/util/SynchronizedQueue.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,7 @@ public boolean equals(Object obj) {
115115
return false;
116116
}
117117
SynchronizedQueue<?> other = (SynchronizedQueue<?>) obj;
118-
if (!list.equals(other.list)) {
119-
return false;
120-
}
121-
return true;
118+
return list.equals(other.list);
122119
}
123120

124121
@Override

src/test/java/rx/NotificationTest.java

Lines changed: 91 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.mockito.Matchers.any;
2020
import static org.mockito.Mockito.*;
21+
import static org.junit.Assert.*;
2122

2223
import java.util.*;
2324

@@ -31,65 +32,65 @@ public class NotificationTest {
3132
public void testOnNextIntegerNotificationDoesNotEqualNullNotification(){
3233
final Notification<Integer> integerNotification = Notification.createOnNext(1);
3334
final Notification<Integer> nullNotification = Notification.createOnNext(null);
34-
Assert.assertFalse(integerNotification.equals(nullNotification));
35+
assertFalse(integerNotification.equals(nullNotification));
3536
}
3637

3738
@Test
3839
public void testOnNextNullNotificationDoesNotEqualIntegerNotification(){
3940
final Notification<Integer> integerNotification = Notification.createOnNext(1);
4041
final Notification<Integer> nullNotification = Notification.createOnNext(null);
41-
Assert.assertFalse(nullNotification.equals(integerNotification));
42+
assertFalse(nullNotification.equals(integerNotification));
4243
}
4344

4445
@Test
4546
public void testOnNextIntegerNotificationsWhenEqual(){
4647
final Notification<Integer> integerNotification = Notification.createOnNext(1);
4748
final Notification<Integer> integerNotification2 = Notification.createOnNext(1);
48-
Assert.assertTrue(integerNotification.equals(integerNotification2));
49+
assertTrue(integerNotification.equals(integerNotification2));
4950
}
5051

5152
@Test
5253
public void testOnNextIntegerNotificationsWhenNotEqual(){
5354
final Notification<Integer> integerNotification = Notification.createOnNext(1);
5455
final Notification<Integer> integerNotification2 = Notification.createOnNext(2);
55-
Assert.assertFalse(integerNotification.equals(integerNotification2));
56+
assertFalse(integerNotification.equals(integerNotification2));
5657
}
5758

5859
@Test
5960
public void testOnErrorIntegerNotificationDoesNotEqualNullNotification(){
6061
final Notification<Integer> integerNotification = Notification.createOnError(new Exception());
6162
final Notification<Integer> nullNotification = Notification.createOnError(null);
62-
Assert.assertFalse(integerNotification.equals(nullNotification));
63+
assertFalse(integerNotification.equals(nullNotification));
6364
}
6465

6566
@Test
6667
public void testOnErrorNullNotificationDoesNotEqualIntegerNotification(){
6768
final Notification<Integer> integerNotification = Notification.createOnError(new Exception());
6869
final Notification<Integer> nullNotification = Notification.createOnError(null);
69-
Assert.assertFalse(nullNotification.equals(integerNotification));
70+
assertFalse(nullNotification.equals(integerNotification));
7071
}
7172

7273
@Test
7374
public void testOnErrorIntegerNotificationsWhenEqual(){
7475
final Exception exception = new Exception();
7576
final Notification<Integer> onErrorNotification = Notification.createOnError(exception);
7677
final Notification<Integer> onErrorNotification2 = Notification.createOnError(exception);
77-
Assert.assertTrue(onErrorNotification.equals(onErrorNotification2));
78+
assertTrue(onErrorNotification.equals(onErrorNotification2));
7879
}
7980

8081
@Test
8182
public void testOnErrorIntegerNotificationWhenNotEqual(){
8283
final Notification<Integer> onErrorNotification = Notification.createOnError(new Exception());
8384
final Notification<Integer> onErrorNotification2 = Notification.createOnError(new Exception());
84-
Assert.assertFalse(onErrorNotification.equals(onErrorNotification2));
85+
assertFalse(onErrorNotification.equals(onErrorNotification2));
8586
}
8687

8788
@Test
8889
public void createWithClass() {
8990
Notification<Integer> n = Notification.createOnCompleted(Integer.class);
90-
Assert.assertTrue(n.isOnCompleted());
91-
Assert.assertFalse(n.hasThrowable());
92-
Assert.assertFalse(n.hasValue());
91+
assertTrue(n.isOnCompleted());
92+
assertFalse(n.hasThrowable());
93+
assertFalse(n.hasValue());
9394
}
9495

9596
@Test
@@ -121,9 +122,9 @@ static String stripAt(String s) {
121122

122123
@Test
123124
public void toStringVariants() {
124-
Assert.assertEquals("[rx.Notification OnNext 1]", stripAt(Notification.createOnNext(1).toString()));
125-
Assert.assertEquals("[rx.Notification OnError Forced failure]", stripAt(Notification.createOnError(new TestException("Forced failure")).toString()));
126-
Assert.assertEquals("[rx.Notification OnCompleted]", stripAt(Notification.createOnCompleted().toString()));
125+
assertEquals("[rx.Notification OnNext 1]", stripAt(Notification.createOnNext(1).toString()));
126+
assertEquals("[rx.Notification OnError Forced failure]", stripAt(Notification.createOnError(new TestException("Forced failure")).toString()));
127+
assertEquals("[rx.Notification OnCompleted]", stripAt(Notification.createOnCompleted().toString()));
127128
}
128129

129130
@Test
@@ -134,7 +135,7 @@ public void hashCodeWorks() {
134135
Notification<Integer> e1 = Notification.createOnError(new TestException());
135136
Notification<Integer> c1 = Notification.createOnCompleted();
136137

137-
Assert.assertEquals(n1.hashCode(), n1a.hashCode());
138+
assertEquals(n1.hashCode(), n1a.hashCode());
138139

139140
Set<Notification<Integer>> set = new HashSet<Notification<Integer>>();
140141

@@ -143,11 +144,11 @@ public void hashCodeWorks() {
143144
set.add(e1);
144145
set.add(c1);
145146

146-
Assert.assertTrue(set.contains(n1));
147-
Assert.assertTrue(set.contains(n1a));
148-
Assert.assertTrue(set.contains(n2));
149-
Assert.assertTrue(set.contains(e1));
150-
Assert.assertTrue(set.contains(c1));
147+
assertTrue(set.contains(n1));
148+
assertTrue(set.contains(n1a));
149+
assertTrue(set.contains(n2));
150+
assertTrue(set.contains(e1));
151+
assertTrue(set.contains(c1));
151152
}
152153

153154
@Test
@@ -164,27 +165,27 @@ public void equalsWorks() {
164165
Notification<Integer> c1 = Notification.createOnCompleted();
165166
Notification<Integer> c2 = Notification.createOnCompleted();
166167

167-
Assert.assertEquals(n1, n1a);
168-
Assert.assertNotEquals(n1, n2);
169-
Assert.assertNotEquals(n2, n1);
168+
assertEquals(n1, n1a);
169+
assertNotEquals(n1, n2);
170+
assertNotEquals(n2, n1);
170171

171-
Assert.assertNotEquals(n1, e1);
172-
Assert.assertNotEquals(e1, n1);
173-
Assert.assertNotEquals(e1, c1);
174-
Assert.assertNotEquals(n1, c1);
175-
Assert.assertNotEquals(c1, n1);
176-
Assert.assertNotEquals(c1, e1);
172+
assertNotEquals(n1, e1);
173+
assertNotEquals(e1, n1);
174+
assertNotEquals(e1, c1);
175+
assertNotEquals(n1, c1);
176+
assertNotEquals(c1, n1);
177+
assertNotEquals(c1, e1);
177178

178-
Assert.assertEquals(e1, e1);
179-
Assert.assertEquals(e1, e2);
179+
assertEquals(e1, e1);
180+
assertNotEquals(e1, e2);
180181

181-
Assert.assertEquals(c1, c2);
182+
assertEquals(c1, c2);
182183

183-
Assert.assertFalse(n1.equals(null));
184-
Assert.assertFalse(n1.equals(1));
184+
assertFalse(n1.equals(null));
185+
assertFalse(n1.equals(1));
185186

186-
Assert.assertEquals(z1a, z1);
187-
Assert.assertEquals(z1, z1a);
187+
assertEquals(z1a, z1);
188+
assertEquals(z1, z1a);
188189
}
189190

190191
@Test
@@ -195,25 +196,64 @@ public void contentChecks() {
195196
Notification<Integer> e2 = Notification.createOnError(null);
196197
Notification<Integer> c1 = Notification.createOnCompleted();
197198

198-
Assert.assertFalse(z1.hasValue());
199-
Assert.assertFalse(z1.hasThrowable());
200-
Assert.assertFalse(z1.isOnCompleted());
199+
assertFalse(z1.hasValue());
200+
assertFalse(z1.hasThrowable());
201+
assertFalse(z1.isOnCompleted());
201202

202-
Assert.assertTrue(n1.hasValue());
203-
Assert.assertFalse(n1.hasThrowable());
204-
Assert.assertFalse(n1.isOnCompleted());
203+
assertTrue(n1.hasValue());
204+
assertFalse(n1.hasThrowable());
205+
assertFalse(n1.isOnCompleted());
205206

206-
Assert.assertFalse(e1.hasValue());
207-
Assert.assertTrue(e1.hasThrowable());
208-
Assert.assertFalse(e1.isOnCompleted());
207+
assertFalse(e1.hasValue());
208+
assertTrue(e1.hasThrowable());
209+
assertFalse(e1.isOnCompleted());
209210

210-
Assert.assertFalse(e2.hasValue());
211-
Assert.assertFalse(e2.hasThrowable());
212-
Assert.assertFalse(e2.isOnCompleted());
211+
assertFalse(e2.hasValue());
212+
assertFalse(e2.hasThrowable());
213+
assertFalse(e2.isOnCompleted());
213214

214-
Assert.assertFalse(c1.hasValue());
215-
Assert.assertFalse(c1.hasThrowable());
216-
Assert.assertTrue(c1.isOnCompleted());
215+
assertFalse(c1.hasValue());
216+
assertFalse(c1.hasThrowable());
217+
assertTrue(c1.isOnCompleted());
217218

218219
}
220+
221+
@Test
222+
public void exceptionEquality() {
223+
EqualException ex1 = new EqualException("1");
224+
EqualException ex2 = new EqualException("1");
225+
EqualException ex3 = new EqualException("3");
226+
227+
Notification<Integer> e1 = Notification.createOnError(ex1);
228+
Notification<Integer> e2 = Notification.createOnError(ex2);
229+
Notification<Integer> e3 = Notification.createOnError(ex3);
230+
231+
assertEquals(e1, e1);
232+
assertEquals(e1, e2);
233+
assertEquals(e2, e1);
234+
assertEquals(e2, e2);
235+
236+
assertNotEquals(e1, e3);
237+
assertNotEquals(e2, e3);
238+
assertNotEquals(e3, e1);
239+
assertNotEquals(e3, e2);
240+
}
241+
242+
static final class EqualException extends RuntimeException {
243+
244+
/** */
245+
private static final long serialVersionUID = 446310455393317050L;
246+
247+
public EqualException(String message) {
248+
super(message);
249+
}
250+
251+
@Override
252+
public boolean equals(Object o) {
253+
if (o instanceof EqualException) {
254+
return getMessage().equals(((EqualException)o).getMessage());
255+
}
256+
return false;
257+
}
258+
}
219259
}

0 commit comments

Comments
 (0)