Skip to content

Commit e267e02

Browse files
authored
Merge pull request ReactiveX#4154 from abersnaze/sched
Ability to create custom schedulers with behavior based on composing operators.
2 parents 1354c34 + f3777df commit e267e02

File tree

3 files changed

+586
-0
lines changed

3 files changed

+586
-0
lines changed

src/main/java/rx/Scheduler.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
import java.util.concurrent.TimeUnit;
1919

20+
import rx.annotations.Experimental;
2021
import rx.functions.Action0;
22+
import rx.functions.Func1;
23+
import rx.internal.schedulers.SchedulerWhen;
2124
import rx.schedulers.Schedulers;
2225
import rx.subscriptions.MultipleAssignmentSubscription;
2326

@@ -182,4 +185,79 @@ public long now() {
182185
return System.currentTimeMillis();
183186
}
184187

188+
/**
189+
* Allows the use of operators for controlling the timing around when
190+
* actions scheduled on workers are actually done. This makes it possible to
191+
* layer additional behavior on this {@link Scheduler}. The only parameter
192+
* is a function that flattens an {@link Observable} of {@link Observable}
193+
* of {@link Completable}s into just one {@link Completable}. There must be
194+
* a chain of operators connecting the returned value to the source
195+
* {@link Observable} otherwise any work scheduled on the returned
196+
* {@link Scheduler} will not be executed.
197+
* <p>
198+
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
199+
* {@link Completable}s is onNext'd to the combinator to be flattened. If
200+
* the inner {@link Observable} is not immediately subscribed to an calls to
201+
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
202+
* subscribed to actions are then onNext'd as {@link Completable}s.
203+
* <p>
204+
* Finally the actions scheduled on the parent {@link Scheduler} when the
205+
* inner most {@link Completable}s are subscribed to.
206+
* <p>
207+
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
208+
* onComplete and triggers any behavior in the flattening operator. The
209+
* {@link Observable} and all {@link Completable}s give to the flattening
210+
* function never onError.
211+
* <p>
212+
* Limit the amount concurrency two at a time without creating a new fix
213+
* size thread pool:
214+
*
215+
* <pre>
216+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
217+
* // use merge max concurrent to limit the number of concurrent
218+
* // callbacks two at a time
219+
* return Completable.merge(Observable.merge(workers), 2);
220+
* });
221+
* </pre>
222+
* <p>
223+
* This is a slightly different way to limit the concurrency but it has some
224+
* interesting benefits and drawbacks to the method above. It works by
225+
* limited the number of concurrent {@link Worker}s rather than individual
226+
* actions. Generally each {@link Observable} uses its own {@link Worker}.
227+
* This means that this will essentially limit the number of concurrent
228+
* subscribes. The danger comes from using operators like
229+
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
230+
* subscribing to the first {@link Observable} could deadlock the
231+
* subscription to the second.
232+
*
233+
* <pre>
234+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
235+
* // use merge max concurrent to limit the number of concurrent
236+
* // Observables two at a time
237+
* return Completable.merge(Observable.merge(workers, 2));
238+
* });
239+
* </pre>
240+
*
241+
* Slowing down the rate to no more than than 1 a second. This suffers from
242+
* the same problem as the one above I could find an {@link Observable}
243+
* operator that limits the rate without dropping the values (aka leaky
244+
* bucket algorithm).
245+
*
246+
* <pre>
247+
* Scheduler slowSched = Schedulers.computation().when(workers -> {
248+
* // use concatenate to make each worker happen one at a time.
249+
* return Completable.concat(workers.map(actions -> {
250+
* // delay the starting of the next worker by 1 second.
251+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
252+
* }));
253+
* });
254+
* </pre>
255+
*
256+
* @param combine
257+
* @return
258+
*/
259+
@Experimental
260+
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
261+
return (S) new SchedulerWhen(combine, this);
262+
}
185263
}
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Completable;
23+
import rx.Completable.CompletableOnSubscribe;
24+
import rx.Completable.CompletableSubscriber;
25+
import rx.Scheduler.Worker;
26+
import rx.Observable;
27+
import rx.Observer;
28+
import rx.Scheduler;
29+
import rx.Subscription;
30+
import rx.annotations.Experimental;
31+
import rx.functions.Action0;
32+
import rx.functions.Func1;
33+
import rx.internal.operators.BufferUntilSubscriber;
34+
import rx.observers.SerializedObserver;
35+
import rx.subjects.PublishSubject;
36+
import rx.subscriptions.Subscriptions;
37+
38+
/**
39+
* Allows the use of operators for controlling the timing around when actions
40+
* scheduled on workers are actually done. This makes it possible to layer
41+
* additional behavior on this {@link Scheduler}. The only parameter is a
42+
* function that flattens an {@link Observable} of {@link Observable} of
43+
* {@link Completable}s into just one {@link Completable}. There must be a chain
44+
* of operators connecting the returned value to the source {@link Observable}
45+
* otherwise any work scheduled on the returned {@link Scheduler} will not be
46+
* executed.
47+
* <p>
48+
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
49+
* {@link Completable}s is onNext'd to the combinator to be flattened. If the
50+
* inner {@link Observable} is not immediately subscribed to an calls to
51+
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
52+
* subscribed to actions are then onNext'd as {@link Completable}s.
53+
* <p>
54+
* Finally the actions scheduled on the parent {@link Scheduler} when the inner
55+
* most {@link Completable}s are subscribed to.
56+
* <p>
57+
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
58+
* onComplete and triggers any behavior in the flattening operator. The
59+
* {@link Observable} and all {@link Completable}s give to the flattening
60+
* function never onError.
61+
* <p>
62+
* Limit the amount concurrency two at a time without creating a new fix size
63+
* thread pool:
64+
*
65+
* <pre>
66+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
67+
* // use merge max concurrent to limit the number of concurrent
68+
* // callbacks two at a time
69+
* return Completable.merge(Observable.merge(workers), 2);
70+
* });
71+
* </pre>
72+
* <p>
73+
* This is a slightly different way to limit the concurrency but it has some
74+
* interesting benefits and drawbacks to the method above. It works by limited
75+
* the number of concurrent {@link Worker}s rather than individual actions.
76+
* Generally each {@link Observable} uses its own {@link Worker}. This means
77+
* that this will essentially limit the number of concurrent subscribes. The
78+
* danger comes from using operators like
79+
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
80+
* subscribing to the first {@link Observable} could deadlock the subscription
81+
* to the second.
82+
*
83+
* <pre>
84+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
85+
* // use merge max concurrent to limit the number of concurrent
86+
* // Observables two at a time
87+
* return Completable.merge(Observable.merge(workers, 2));
88+
* });
89+
* </pre>
90+
*
91+
* Slowing down the rate to no more than than 1 a second. This suffers from the
92+
* same problem as the one above I could find an {@link Observable} operator
93+
* that limits the rate without dropping the values (aka leaky bucket
94+
* algorithm).
95+
*
96+
* <pre>
97+
* Scheduler slowSched = Schedulers.computation().when(workers -> {
98+
* // use concatenate to make each worker happen one at a time.
99+
* return Completable.concat(workers.map(actions -> {
100+
* // delay the starting of the next worker by 1 second.
101+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
102+
* }));
103+
* });
104+
* </pre>
105+
*
106+
* @param combine
107+
* @return
108+
*/
109+
@Experimental
110+
public class SchedulerWhen extends Scheduler implements Subscription {
111+
private final Scheduler actualScheduler;
112+
private final Observer<Observable<Completable>> workerObserver;
113+
private final Subscription subscription;
114+
115+
public SchedulerWhen(Func1<Observable<Observable<Completable>>, Completable> combine, Scheduler actualScheduler) {
116+
this.actualScheduler = actualScheduler;
117+
// workers are converted into completables and put in this queue.
118+
PublishSubject<Observable<Completable>> workerSubject = PublishSubject.create();
119+
this.workerObserver = new SerializedObserver<Observable<Completable>>(workerSubject);
120+
// send it to a custom combinator to pick the order and rate at which
121+
// workers are processed.
122+
this.subscription = combine.call(workerSubject.onBackpressureBuffer()).subscribe();
123+
}
124+
125+
@Override
126+
public void unsubscribe() {
127+
subscription.unsubscribe();
128+
}
129+
130+
@Override
131+
public boolean isUnsubscribed() {
132+
return subscription.isUnsubscribed();
133+
}
134+
135+
@Override
136+
public Worker createWorker() {
137+
final Worker actualWorker = actualScheduler.createWorker();
138+
// a queue for the actions submitted while worker is waiting to get to
139+
// the subscribe to off the workerQueue.
140+
BufferUntilSubscriber<ScheduledAction> actionSubject = BufferUntilSubscriber.<ScheduledAction>create();
141+
final Observer<ScheduledAction> actionObserver = new SerializedObserver<ScheduledAction>(actionSubject);
142+
// convert the work of scheduling all the actions into a completable
143+
Observable<Completable> actions = actionSubject.map(new Func1<ScheduledAction, Completable>() {
144+
@Override
145+
public Completable call(final ScheduledAction action) {
146+
return Completable.create(new CompletableOnSubscribe() {
147+
@Override
148+
public void call(CompletableSubscriber actionCompletable) {
149+
actionCompletable.onSubscribe(action);
150+
action.call(actualWorker);
151+
actionCompletable.onCompleted();
152+
}
153+
});
154+
}
155+
});
156+
157+
// a worker that queues the action to the actionQueue subject.
158+
Worker worker = new Worker() {
159+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
160+
161+
@Override
162+
public void unsubscribe() {
163+
// complete the actionQueue when worker is unsubscribed to make
164+
// room for the next worker in the workerQueue.
165+
if (unsubscribed.compareAndSet(false, true)) {
166+
actualWorker.unsubscribe();
167+
actionObserver.onCompleted();
168+
}
169+
}
170+
171+
@Override
172+
public boolean isUnsubscribed() {
173+
return unsubscribed.get();
174+
}
175+
176+
@Override
177+
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
178+
// send a scheduled action to the actionQueue
179+
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
180+
actionObserver.onNext(delayedAction);
181+
return delayedAction;
182+
}
183+
184+
@Override
185+
public Subscription schedule(final Action0 action) {
186+
// send a scheduled action to the actionQueue
187+
ImmediateAction immediateAction = new ImmediateAction(action);
188+
actionObserver.onNext(immediateAction);
189+
return immediateAction;
190+
}
191+
};
192+
193+
// enqueue the completable that process actions put in reply subject
194+
workerObserver.onNext(actions);
195+
196+
// return the worker that adds actions to the reply subject
197+
return worker;
198+
}
199+
200+
private static final Subscription SUBSCRIBED = new Subscription() {
201+
@Override
202+
public void unsubscribe() {
203+
}
204+
205+
@Override
206+
public boolean isUnsubscribed() {
207+
return false;
208+
}
209+
};
210+
211+
private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed();
212+
213+
@SuppressWarnings("serial")
214+
private static abstract class ScheduledAction extends AtomicReference<Subscription> implements Subscription {
215+
public ScheduledAction() {
216+
super(SUBSCRIBED);
217+
}
218+
219+
private final void call(Worker actualWorker) {
220+
Subscription oldState = get();
221+
// either SUBSCRIBED or UNSUBSCRIBED
222+
if (oldState == UNSUBSCRIBED) {
223+
// no need to schedule return
224+
return;
225+
}
226+
if (oldState != SUBSCRIBED) {
227+
// has already been scheduled return
228+
// should not be able to get here but handle it anyway by not
229+
// rescheduling.
230+
return;
231+
}
232+
233+
Subscription newState = callActual(actualWorker);
234+
235+
if (!compareAndSet(SUBSCRIBED, newState)) {
236+
// set would only fail if the new current state is some other
237+
// subscription from a concurrent call to this method.
238+
// Unsubscribe from the action just scheduled because it lost
239+
// the race.
240+
newState.unsubscribe();
241+
}
242+
}
243+
244+
protected abstract Subscription callActual(Worker actualWorker);
245+
246+
@Override
247+
public boolean isUnsubscribed() {
248+
return get().isUnsubscribed();
249+
}
250+
251+
@Override
252+
public void unsubscribe() {
253+
Subscription oldState;
254+
// no matter what the current state is the new state is going to be
255+
Subscription newState = UNSUBSCRIBED;
256+
do {
257+
oldState = get();
258+
if (oldState == UNSUBSCRIBED) {
259+
// the action has already been unsubscribed
260+
return;
261+
}
262+
} while (!compareAndSet(oldState, newState));
263+
264+
if (oldState != SUBSCRIBED) {
265+
// the action was scheduled. stop it.
266+
oldState.unsubscribe();
267+
}
268+
}
269+
}
270+
271+
@SuppressWarnings("serial")
272+
private static class ImmediateAction extends ScheduledAction {
273+
private final Action0 action;
274+
275+
public ImmediateAction(Action0 action) {
276+
this.action = action;
277+
}
278+
279+
@Override
280+
protected Subscription callActual(Worker actualWorker) {
281+
return actualWorker.schedule(action);
282+
}
283+
}
284+
285+
@SuppressWarnings("serial")
286+
private static class DelayedAction extends ScheduledAction {
287+
private final Action0 action;
288+
private final long delayTime;
289+
private final TimeUnit unit;
290+
291+
public DelayedAction(Action0 action, long delayTime, TimeUnit unit) {
292+
this.action = action;
293+
this.delayTime = delayTime;
294+
this.unit = unit;
295+
}
296+
297+
@Override
298+
protected Subscription callActual(Worker actualWorker) {
299+
return actualWorker.schedule(action, delayTime, unit);
300+
}
301+
}
302+
}

0 commit comments

Comments
 (0)