Skip to content

Commit 47477cf

Browse files
authored
1.x: new hook management proposal (ReactiveX#4007)
* 1.x: new hook management proposal * Add missing javadoc
1 parent 544690b commit 47477cf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1696
-225
lines changed

src/main/java/rx/Completable.java

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@
3838
*/
3939
@Experimental
4040
public class Completable {
41-
/** The error handler instance. */
42-
static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler();
43-
44-
/** The completable hook. */
45-
static RxJavaCompletableExecutionHook HOOK = RxJavaPlugins.getInstance().getCompletableExecutionHook();
46-
4741
/**
4842
* Callback used for building deferred computations that takes a CompletableSubscriber.
4943
*/
@@ -146,7 +140,7 @@ public void onError(Throwable e) {
146140
set.unsubscribe();
147141
s.onError(e);
148142
} else {
149-
ERROR_HANDLER.handleError(e);
143+
RxJavaHooks.onError(e);
150144
}
151145
}
152146

@@ -167,7 +161,7 @@ public void onSubscribe(Subscription d) {
167161
set.unsubscribe();
168162
s.onError(npe);
169163
} else {
170-
ERROR_HANDLER.handleError(npe);
164+
RxJavaHooks.onError(npe);
171165
}
172166
return;
173167
}
@@ -215,7 +209,7 @@ public void onError(Throwable e) {
215209
set.unsubscribe();
216210
s.onError(e);
217211
} else {
218-
ERROR_HANDLER.handleError(e);
212+
RxJavaHooks.onError(e);
219213
}
220214
}
221215

@@ -256,7 +250,7 @@ public void onSubscribe(Subscription d) {
256250
set.unsubscribe();
257251
s.onError(e);
258252
} else {
259-
ERROR_HANDLER.handleError(e);
253+
RxJavaHooks.onError(e);
260254
}
261255
return;
262256
}
@@ -283,7 +277,7 @@ public void onSubscribe(Subscription d) {
283277
set.unsubscribe();
284278
s.onError(e);
285279
} else {
286-
ERROR_HANDLER.handleError(e);
280+
RxJavaHooks.onError(e);
287281
}
288282
return;
289283
}
@@ -294,7 +288,7 @@ public void onSubscribe(Subscription d) {
294288
set.unsubscribe();
295289
s.onError(npe);
296290
} else {
297-
ERROR_HANDLER.handleError(npe);
291+
RxJavaHooks.onError(npe);
298292
}
299293
return;
300294
}
@@ -386,7 +380,7 @@ public static Completable create(CompletableOnSubscribe onSubscribe) {
386380
} catch (NullPointerException ex) {
387381
throw ex;
388382
} catch (Throwable ex) {
389-
ERROR_HANDLER.handleError(ex);
383+
RxJavaHooks.onError(ex);
390384
throw toNpe(ex);
391385
}
392386
}
@@ -912,7 +906,7 @@ void dispose() {
912906
try {
913907
disposer.call(resource);
914908
} catch (Throwable ex) {
915-
ERROR_HANDLER.handleError(ex);
909+
RxJavaHooks.onError(ex);
916910
}
917911
}
918912
}
@@ -980,7 +974,7 @@ public void call() {
980974
* not null (not verified)
981975
*/
982976
protected Completable(CompletableOnSubscribe onSubscribe) {
983-
this.onSubscribe = HOOK.onCreate(onSubscribe);
977+
this.onSubscribe = RxJavaHooks.onCreate(onSubscribe);
984978
}
985979

986980
/**
@@ -1338,7 +1332,7 @@ public void onCompleted() {
13381332
try {
13391333
onAfterComplete.call();
13401334
} catch (Throwable e) {
1341-
ERROR_HANDLER.handleError(e);
1335+
RxJavaHooks.onError(e);
13421336
}
13431337
}
13441338

@@ -1371,7 +1365,7 @@ public void call() {
13711365
try {
13721366
onUnsubscribe.call();
13731367
} catch (Throwable e) {
1374-
ERROR_HANDLER.handleError(e);
1368+
RxJavaHooks.onError(e);
13751369
}
13761370
d.unsubscribe();
13771371
}
@@ -1552,7 +1546,7 @@ public final Completable lift(final CompletableOperator onLift) {
15521546
@Override
15531547
public void call(CompletableSubscriber s) {
15541548
try {
1555-
CompletableOperator onLiftDecorated = HOOK.onLift(onLift);
1549+
CompletableOperator onLiftDecorated = RxJavaHooks.onCompletableLift(onLift);
15561550
CompletableSubscriber sw = onLiftDecorated.call(s);
15571551

15581552
unsafeSubscribe(sw);
@@ -1879,7 +1873,7 @@ public void onCompleted() {
18791873

18801874
@Override
18811875
public void onError(Throwable e) {
1882-
ERROR_HANDLER.handleError(e);
1876+
RxJavaHooks.onError(e);
18831877
mad.unsubscribe();
18841878
deliverUncaughtException(e);
18851879
}
@@ -1896,7 +1890,7 @@ public void onSubscribe(Subscription d) {
18961890
* Subscribes to this Completable and calls the given Action0 when this Completable
18971891
* completes normally.
18981892
* <p>
1899-
* If this Completable emits an error, it is sent to ERROR_HANDLER.handleError and gets swallowed.
1893+
* If this Completable emits an error, it is sent to RxJavaHooks.onError and gets swallowed.
19001894
* @param onComplete the runnable called when this Completable completes normally
19011895
* @return the Subscription that allows cancelling the subscription
19021896
*/
@@ -1913,7 +1907,7 @@ public void onCompleted() {
19131907
try {
19141908
onComplete.call();
19151909
} catch (Throwable e) {
1916-
ERROR_HANDLER.handleError(e);
1910+
RxJavaHooks.onError(e);
19171911
deliverUncaughtException(e);
19181912
} finally {
19191913
mad.unsubscribe();
@@ -1923,7 +1917,7 @@ public void onCompleted() {
19231917

19241918
@Override
19251919
public void onError(Throwable e) {
1926-
ERROR_HANDLER.handleError(e);
1920+
RxJavaHooks.onError(e);
19271921
mad.unsubscribe();
19281922
deliverUncaughtException(e);
19291923
}
@@ -1972,7 +1966,7 @@ public void onError(Throwable e) {
19721966
done = true;
19731967
callOnError(e);
19741968
} else {
1975-
ERROR_HANDLER.handleError(e);
1969+
RxJavaHooks.onError(e);
19761970
deliverUncaughtException(e);
19771971
}
19781972
}
@@ -1982,7 +1976,7 @@ void callOnError(Throwable e) {
19821976
onError.call(e);
19831977
} catch (Throwable ex) {
19841978
e = new CompositeException(Arrays.asList(e, ex));
1985-
ERROR_HANDLER.handleError(e);
1979+
RxJavaHooks.onError(e);
19861980
deliverUncaughtException(e);
19871981
} finally {
19881982
mad.unsubscribe();
@@ -2011,15 +2005,15 @@ private static void deliverUncaughtException(Throwable e) {
20112005
public final void unsafeSubscribe(CompletableSubscriber s) {
20122006
requireNonNull(s);
20132007
try {
2014-
CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe);
2008+
CompletableOnSubscribe onSubscribeDecorated = RxJavaHooks.onCompletableStart(this, this.onSubscribe);
20152009

20162010
onSubscribeDecorated.call(s);
20172011
} catch (NullPointerException ex) {
20182012
throw ex;
20192013
} catch (Throwable ex) {
20202014
Exceptions.throwIfFatal(ex);
2021-
ex = HOOK.onSubscribeError(ex);
2022-
ERROR_HANDLER.handleError(ex);
2015+
ex = RxJavaHooks.onCompletableError(ex);
2016+
RxJavaHooks.onError(ex);
20232017
throw toNpe(ex);
20242018
}
20252019
}
@@ -2077,13 +2071,13 @@ public void onSubscribe(Subscription d) {
20772071
s.add(d);
20782072
}
20792073
});
2080-
RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s);
2074+
RxJavaHooks.onObservableReturn(s);
20812075
} catch (NullPointerException ex) {
20822076
throw ex;
20832077
} catch (Throwable ex) {
20842078
Exceptions.throwIfFatal(ex);
2085-
ex = HOOK.onSubscribeError(ex);
2086-
ERROR_HANDLER.handleError(ex);
2079+
ex = RxJavaHooks.onObservableError(ex);
2080+
RxJavaHooks.onError(ex);
20872081
throw toNpe(ex);
20882082
}
20892083
}

src/main/java/rx/Observable.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ protected Observable(OnSubscribe<T> f) {
5959
this.onSubscribe = f;
6060
}
6161

62-
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
63-
6462
/**
6563
* Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
6664
* it.
@@ -91,7 +89,7 @@ protected Observable(OnSubscribe<T> f) {
9189
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
9290
*/
9391
public static <T> Observable<T> create(OnSubscribe<T> f) {
94-
return new Observable<T>(hook.onCreate(f));
92+
return new Observable<T>(RxJavaHooks.onCreate(f));
9593
}
9694

9795
/**
@@ -133,7 +131,7 @@ public static <T> Observable<T> create(OnSubscribe<T> f) {
133131
*/
134132
@Beta
135133
public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
136-
return new Observable<T>(hook.onCreate(syncOnSubscribe));
134+
return create((OnSubscribe<T>)syncOnSubscribe);
137135
}
138136

139137
/**
@@ -174,7 +172,7 @@ public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe)
174172
*/
175173
@Experimental
176174
public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
177-
return new Observable<T>(hook.onCreate(asyncOnSubscribe));
175+
return create((OnSubscribe<T>)asyncOnSubscribe);
178176
}
179177

180178
/**
@@ -250,7 +248,7 @@ public void call(Subscriber<? super T> subscriber) {
250248
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
251249
*/
252250
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
253-
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
251+
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
254252
}
255253

256254
/**
@@ -9001,21 +8999,21 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
90018999
// new Subscriber so onStart it
90029000
subscriber.onStart();
90039001
// allow the hook to intercept and/or decorate
9004-
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
9005-
return hook.onSubscribeReturn(subscriber);
9002+
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
9003+
return RxJavaHooks.onObservableReturn(subscriber);
90069004
} catch (Throwable e) {
90079005
// special handling for certain Throwable/Error/Exception types
90089006
Exceptions.throwIfFatal(e);
90099007
// if an unhandled error occurs executing the onSubscribe we will propagate it
90109008
try {
9011-
subscriber.onError(hook.onSubscribeError(e));
9009+
subscriber.onError(RxJavaHooks.onObservableError(e));
90129010
} catch (Throwable e2) {
90139011
Exceptions.throwIfFatal(e2);
90149012
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
90159013
// so we are unable to propagate the error correctly and will just throw
90169014
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
90179015
// TODO could the hook be the cause of the error in the on error handling.
9018-
hook.onSubscribeError(r);
9016+
RxJavaHooks.onObservableError(r);
90199017
// TODO why aren't we throwing the hook's return value.
90209018
throw r;
90219019
}
@@ -9094,25 +9092,25 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
90949092
// add a significant depth to already huge call stacks.
90959093
try {
90969094
// allow the hook to intercept and/or decorate
9097-
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
9098-
return hook.onSubscribeReturn(subscriber);
9095+
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
9096+
return RxJavaHooks.onObservableReturn(subscriber);
90999097
} catch (Throwable e) {
91009098
// special handling for certain Throwable/Error/Exception types
91019099
Exceptions.throwIfFatal(e);
91029100
// in case the subscriber can't listen to exceptions anymore
91039101
if (subscriber.isUnsubscribed()) {
9104-
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
9102+
RxJavaPluginUtils.handleException(RxJavaHooks.onObservableError(e));
91059103
} else {
91069104
// if an unhandled error occurs executing the onSubscribe we will propagate it
91079105
try {
9108-
subscriber.onError(hook.onSubscribeError(e));
9106+
subscriber.onError(RxJavaHooks.onObservableError(e));
91099107
} catch (Throwable e2) {
91109108
Exceptions.throwIfFatal(e2);
91119109
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
91129110
// so we are unable to propagate the error correctly and will just throw
91139111
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
91149112
// TODO could the hook be the cause of the error in the on error handling.
9115-
hook.onSubscribeError(r);
9113+
RxJavaHooks.onObservableError(r);
91169114
// TODO why aren't we throwing the hook's return value.
91179115
throw r;
91189116
}

0 commit comments

Comments
 (0)