Skip to content

Commit ee63aff

Browse files
authored
1.x: add RxJavaHooks tests, fix small bugs (ReactiveX#4142)
* 1.x: add RxJavaHooks tests, fix small bugs * Add onSchedule hook test * Use static assert, update Conpletable param name
1 parent d66d931 commit ee63aff

File tree

4 files changed

+428
-20
lines changed

4 files changed

+428
-20
lines changed

src/main/java/rx/Completable.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,21 +87,21 @@ public interface CompletableTransformer extends Func1<Completable, Completable>
8787
}
8888

8989
/** Single instance of a complete Completable. */
90-
static final Completable COMPLETE = create(new CompletableOnSubscribe() {
90+
static final Completable COMPLETE = new Completable(new CompletableOnSubscribe() {
9191
@Override
9292
public void call(CompletableSubscriber s) {
9393
s.onSubscribe(Subscriptions.unsubscribed());
9494
s.onCompleted();
9595
}
96-
});
96+
}, true); // hook is handled in complete()
9797

9898
/** Single instance of a never Completable. */
99-
static final Completable NEVER = create(new CompletableOnSubscribe() {
99+
static final Completable NEVER = new Completable(new CompletableOnSubscribe() {
100100
@Override
101101
public void call(CompletableSubscriber s) {
102102
s.onSubscribe(Subscriptions.unsubscribed());
103103
}
104-
});
104+
}, true); // hook is handled in never()
105105

106106
/**
107107
* Returns a Completable which terminates as soon as one of the source Completables
@@ -311,7 +311,11 @@ public void onSubscribe(Subscription d) {
311311
* @return a Completable instance that completes immediately
312312
*/
313313
public static Completable complete() {
314-
return COMPLETE;
314+
CompletableOnSubscribe cos = RxJavaHooks.onCreate(COMPLETE.onSubscribe);
315+
if (cos == COMPLETE.onSubscribe) {
316+
return COMPLETE;
317+
}
318+
return new Completable(cos, true);
315319
}
316320

317321
/**
@@ -734,7 +738,11 @@ public static Completable mergeDelayError(Observable<? extends Completable> sour
734738
* @return the singleton instance that never calls onError or onComplete
735739
*/
736740
public static Completable never() {
737-
return NEVER;
741+
CompletableOnSubscribe cos = RxJavaHooks.onCreate(NEVER.onSubscribe);
742+
if (cos == NEVER.onSubscribe) {
743+
return NEVER;
744+
}
745+
return new Completable(cos, true);
738746
}
739747

740748
/**
@@ -975,7 +983,18 @@ public void call() {
975983
protected Completable(CompletableOnSubscribe onSubscribe) {
976984
this.onSubscribe = RxJavaHooks.onCreate(onSubscribe);
977985
}
978-
986+
987+
/**
988+
* Constructs a Completable instance with the given onSubscribe callback without calling the onCreate
989+
* hook.
990+
* @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe,
991+
* not null (not verified)
992+
* @param useHook if false, RxJavaHooks.onCreate won't be called
993+
*/
994+
private Completable(CompletableOnSubscribe onSubscribe, boolean useHook) {
995+
this.onSubscribe = useHook ? RxJavaHooks.onCreate(onSubscribe) : onSubscribe;
996+
}
997+
979998
/**
980999
* Returns a Completable that emits the a terminated event of either this Completable
9811000
* or the other Completable whichever fires first.

src/main/java/rx/plugins/RxJavaHooks.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ public static void reset() {
171171
return;
172172
}
173173
init();
174+
175+
onComputationScheduler = null;
176+
onIOScheduler = null;
177+
onNewThreadScheduler = null;
174178
}
175179

176180
/**
@@ -224,7 +228,7 @@ public static void onError(Throwable ex) {
224228
Action1<Throwable> f = onError;
225229
if (f != null) {
226230
try {
227-
f.call(ex);
231+
f.call(ex);
228232
return;
229233
} catch (Throwable pluginException) {
230234
/*

src/main/java/rx/schedulers/Schedulers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public static Scheduler computation() {
137137
* @return a {@link Scheduler} meant for IO-bound work
138138
*/
139139
public static Scheduler io() {
140-
return RxJavaHooks.onComputationScheduler(getInstance().ioScheduler);
140+
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
141141
}
142142

143143
/**

0 commit comments

Comments
 (0)