Skip to content

Commit 3769be0

Browse files
akarnokdakarnokd
authored andcommitted
Refactored exception reporting of most operators.
1 parent 689e73f commit 3769be0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+167
-141
lines changed

src/main/java/rx/exceptions/Exceptions.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
*/
1616
package rx.exceptions;
1717

18-
import java.util.HashSet;
19-
import java.util.List;
20-
import java.util.Set;
18+
import java.util.*;
2119

20+
import rx.Observer;
2221
import rx.annotations.Experimental;
2322

2423
/**
@@ -178,4 +177,28 @@ public static void throwIfAny(List<? extends Throwable> exceptions) {
178177
"Multiple exceptions", exceptions);
179178
}
180179
}
180+
181+
/**
182+
* Forwards a fatal exception or reports it along with the value
183+
* caused it to the given Observer.
184+
* @param t the exception
185+
* @param o the observer to report to
186+
* @param value the value that caused the exception
187+
*/
188+
@Experimental
189+
public static void throwOrReport(Throwable t, Observer<?> o, Object value) {
190+
Exceptions.throwIfFatal(t);
191+
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
192+
}
193+
/**
194+
* Forwards a fatal exception or reports it to the given Observer.
195+
* @param t the exception
196+
* @param o the observer to report to
197+
* @param value the value that caused the exception
198+
*/
199+
@Experimental
200+
public static void throwOrReport(Throwable t, Observer<?> o) {
201+
Exceptions.throwIfFatal(t);
202+
o.onError(t);
203+
}
181204
}

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323

2424
import rx.Observable;
2525
import rx.Observable.OnSubscribe;
26+
import rx.exceptions.*;
2627
import rx.Producer;
2728
import rx.Subscriber;
28-
import rx.exceptions.MissingBackpressureException;
2929
import rx.functions.FuncN;
3030
import rx.internal.util.RxRingBuffer;
3131

@@ -202,7 +202,7 @@ public boolean onNext(int index, T t) {
202202
} catch (MissingBackpressureException e) {
203203
onError(e);
204204
} catch (Throwable e) {
205-
onError(e);
205+
Exceptions.throwOrReport(e, child);
206206
}
207207
}
208208
}

src/main/java/rx/internal/operators/OnSubscribeDefer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable;
1919
import rx.Observable.OnSubscribe;
2020
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
2122
import rx.functions.Func0;
2223
import rx.observers.Subscribers;
2324

@@ -44,7 +45,7 @@ public void call(final Subscriber<? super T> s) {
4445
try {
4546
o = observableFactory.call();
4647
} catch (Throwable t) {
47-
s.onError(t);
48+
Exceptions.throwOrReport(t, s);
4849
return;
4950
}
5051
o.unsafeSubscribe(Subscribers.wrap(s));

src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.*;
1919
import rx.Observable.OnSubscribe;
20+
import rx.exceptions.Exceptions;
2021
import rx.functions.Func0;
2122
import rx.observers.Subscribers;
2223

@@ -58,7 +59,7 @@ public void onNext(U t) {
5859

5960
});
6061
} catch (Throwable e) {
61-
child.onError(e);
62+
Exceptions.throwOrReport(e, child);
6263
}
6364
}
6465

src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.HashMap;
20-
import java.util.List;
21-
import java.util.Map;
18+
import java.util.*;
2219

20+
import rx.*;
2321
import rx.Observable;
2422
import rx.Observable.OnSubscribe;
2523
import rx.Observer;
26-
import rx.Subscriber;
27-
import rx.Subscription;
28-
import rx.functions.Func1;
29-
import rx.functions.Func2;
30-
import rx.observers.SerializedObserver;
31-
import rx.observers.SerializedSubscriber;
32-
import rx.subjects.PublishSubject;
33-
import rx.subjects.Subject;
34-
import rx.subscriptions.CompositeSubscription;
35-
import rx.subscriptions.RefCountSubscription;
24+
import rx.exceptions.Exceptions;
25+
import rx.functions.*;
26+
import rx.observers.*;
27+
import rx.subjects.*;
28+
import rx.subscriptions.*;
3629

3730
/**
3831
* Corrrelates two sequences when they overlap and groups the results.
@@ -192,7 +185,7 @@ public void onNext(T1 args) {
192185

193186

194187
} catch (Throwable t) {
195-
onError(t);
188+
Exceptions.throwOrReport(t, this);
196189
}
197190
}
198191

@@ -242,7 +235,7 @@ public void onNext(T2 args) {
242235
o.onNext(args);
243236
}
244237
} catch (Throwable t) {
245-
onError(t);
238+
Exceptions.throwOrReport(t, this);
246239
}
247240
}
248241

src/main/java/rx/internal/operators/OnSubscribeJoin.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,15 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.HashMap;
20-
import java.util.List;
21-
import java.util.Map;
18+
import java.util.*;
2219

20+
import rx.*;
2321
import rx.Observable;
2422
import rx.Observable.OnSubscribe;
25-
import rx.Subscriber;
26-
import rx.Subscription;
27-
import rx.functions.Func1;
28-
import rx.functions.Func2;
23+
import rx.exceptions.Exceptions;
24+
import rx.functions.*;
2925
import rx.observers.SerializedSubscriber;
30-
import rx.subscriptions.CompositeSubscription;
31-
import rx.subscriptions.SerialSubscription;
26+
import rx.subscriptions.*;
3227

3328
/**
3429
* Correlates the elements of two sequences based on overlapping durations.
@@ -154,7 +149,7 @@ public void onNext(TLeft args) {
154149
subscriber.onNext(result);
155150
}
156151
} catch (Throwable t) {
157-
onError(t);
152+
Exceptions.throwOrReport(t, this);
158153
}
159154
}
160155

@@ -266,7 +261,7 @@ public void onNext(TRight args) {
266261
}
267262

268263
} catch (Throwable t) {
269-
onError(t);
264+
Exceptions.throwOrReport(t, this);
270265
}
271266
}
272267

src/main/java/rx/internal/operators/OnSubscribeTimerOnce.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observable.OnSubscribe;
2020
import rx.Scheduler;
2121
import rx.Scheduler.Worker;
22+
import rx.exceptions.Exceptions;
2223
import rx.Subscriber;
2324
import rx.functions.Action0;
2425

@@ -47,7 +48,7 @@ public void call() {
4748
try {
4849
child.onNext(0L);
4950
} catch (Throwable t) {
50-
child.onError(t);
51+
Exceptions.throwOrReport(t, child);
5152
return;
5253
}
5354
child.onCompleted();

src/main/java/rx/internal/operators/OnSubscribeTimerPeriodically.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observable.OnSubscribe;
2020
import rx.Scheduler;
2121
import rx.Scheduler.Worker;
22+
import rx.exceptions.Exceptions;
2223
import rx.Subscriber;
2324
import rx.functions.Action0;
2425

@@ -51,9 +52,9 @@ public void call() {
5152
child.onNext(counter++);
5253
} catch (Throwable e) {
5354
try {
54-
child.onError(e);
55-
} finally {
5655
worker.unsubscribe();
56+
} finally {
57+
Exceptions.throwOrReport(e, child);
5758
}
5859
}
5960
}

src/main/java/rx/internal/operators/OnSubscribeToObservableFuture.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.TimeUnit;
2020

2121
import rx.Observable.OnSubscribe;
22+
import rx.exceptions.Exceptions;
2223
import rx.Subscriber;
2324
import rx.functions.Action0;
2425
import rx.subscriptions.Subscriptions;
@@ -83,7 +84,7 @@ public void call() {
8384
//refuse to emit onError if already unsubscribed
8485
return;
8586
}
86-
subscriber.onError(e);
87+
Exceptions.throwOrReport(e, subscriber);
8788
}
8889
}
8990
}

src/main/java/rx/internal/operators/OnSubscribeUsing.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import rx.*;
2222
import rx.Observable.OnSubscribe;
23-
import rx.exceptions.CompositeException;
23+
import rx.exceptions.*;
2424
import rx.functions.*;
2525
import rx.observers.Subscribers;
2626

@@ -72,6 +72,8 @@ public void call(final Subscriber<? super T> subscriber) {
7272
observable.unsafeSubscribe(Subscribers.wrap(subscriber));
7373
} catch (Throwable e) {
7474
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
75+
Exceptions.throwIfFatal(e);
76+
Exceptions.throwIfFatal(disposeError);
7577
if (disposeError != null)
7678
subscriber.onError(new CompositeException(Arrays.asList(e, disposeError)));
7779
else
@@ -80,7 +82,7 @@ public void call(final Subscriber<? super T> subscriber) {
8082
}
8183
} catch (Throwable e) {
8284
// then propagate error
83-
subscriber.onError(e);
85+
Exceptions.throwOrReport(e, subscriber);
8486
}
8587
}
8688

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
2020
import rx.exceptions.Exceptions;
21-
import rx.exceptions.OnErrorThrowable;
2221
import rx.functions.Func1;
2322
import rx.internal.producers.SingleDelayedProducer;
2423

@@ -47,8 +46,7 @@ public void onNext(T t) {
4746
try {
4847
result = predicate.call(t);
4948
} catch (Throwable e) {
50-
Exceptions.throwIfFatal(e);
51-
onError(OnErrorThrowable.addValueAsLastCause(e, t));
49+
Exceptions.throwOrReport(e, this, t);
5250
return;
5351
}
5452
if (!result && !done) {

src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616
package rx.internal.operators;
1717

1818

19-
import rx.Observable;
19+
import rx.*;
2020
import rx.Observable.Operator;
21-
import rx.Subscriber;
2221
import rx.exceptions.Exceptions;
23-
import rx.exceptions.OnErrorThrowable;
2422
import rx.functions.Func1;
2523
import rx.internal.producers.SingleDelayedProducer;
2624

@@ -51,8 +49,7 @@ public void onNext(T t) {
5149
try {
5250
result = predicate.call(t);
5351
} catch (Throwable e) {
54-
Exceptions.throwIfFatal(e);
55-
onError(OnErrorThrowable.addValueAsLastCause(e, t));
52+
Exceptions.throwOrReport(e, this, t);
5653
return;
5754
}
5855
if (result && !done) {

src/main/java/rx/internal/operators/OperatorBufferWithSingleObservable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Observable;
2222
import rx.Observable.Operator;
23+
import rx.exceptions.Exceptions;
2324
import rx.Observer;
2425
import rx.Subscriber;
2526
import rx.functions.Func0;
@@ -79,7 +80,7 @@ public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
7980
try {
8081
closing = bufferClosingSelector.call();
8182
} catch (Throwable t) {
82-
child.onError(t);
83+
Exceptions.throwOrReport(t, child);
8384
return Subscribers.empty();
8485
}
8586
final BufferingSubscriber bsub = new BufferingSubscriber(new SerializedSubscriber<List<T>>(child));
@@ -157,7 +158,7 @@ public void onCompleted() {
157158
}
158159
child.onNext(toEmit);
159160
} catch (Throwable t) {
160-
child.onError(t);
161+
Exceptions.throwOrReport(t, child);
161162
return;
162163
}
163164
child.onCompleted();
@@ -183,7 +184,7 @@ void emit() {
183184
}
184185
done = true;
185186
}
186-
child.onError(t);
187+
Exceptions.throwOrReport(t, child);
187188
}
188189
}
189190
}

src/main/java/rx/internal/operators/OperatorBufferWithSize.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Observable.Operator;
2525
import rx.Producer;
2626
import rx.Subscriber;
27+
import rx.exceptions.Exceptions;
2728

2829
/**
2930
* This operation takes
@@ -118,7 +119,7 @@ public void onCompleted() {
118119
try {
119120
child.onNext(oldBuffer);
120121
} catch (Throwable t) {
121-
onError(t);
122+
Exceptions.throwOrReport(t, this);
122123
return;
123124
}
124125
}
@@ -218,7 +219,7 @@ public void onCompleted() {
218219
try {
219220
child.onNext(chunk);
220221
} catch (Throwable t) {
221-
onError(t);
222+
Exceptions.throwOrReport(t, this);
222223
return;
223224
}
224225
}

src/main/java/rx/internal/operators/OperatorBufferWithStartEndObservable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import rx.Observable;
2323
import rx.Observable.Operator;
24+
import rx.exceptions.Exceptions;
2425
import rx.Observer;
2526
import rx.Subscriber;
2627
import rx.functions.Func1;
@@ -145,7 +146,7 @@ public void onCompleted() {
145146
child.onNext(chunk);
146147
}
147148
} catch (Throwable t) {
148-
child.onError(t);
149+
Exceptions.throwOrReport(t, child);
149150
return;
150151
}
151152
child.onCompleted();
@@ -163,7 +164,7 @@ void startBuffer(TOpening v) {
163164
try {
164165
cobs = bufferClosing.call(v);
165166
} catch (Throwable t) {
166-
onError(t);
167+
Exceptions.throwOrReport(t, this);
167168
return;
168169
}
169170
Subscriber<TClosing> closeSubscriber = new Subscriber<TClosing>() {

0 commit comments

Comments
 (0)