Skip to content

Commit f3777df

Browse files
committed
Ability to create custom schedulers with behavior based on composing operators.
• made it private and added a instance method to Scheduler. • rewrote the test to get a little more coverage. • wrapping each of the onNext/onCompleted to ensure no overlapping calls • Break up the Worker as Completable into Worker as Observable. Now the schedules actions are the indivisible elements that are subscribed to. The user has additional choice at the cost of making the API more complicated.
1 parent 97c4e53 commit f3777df

File tree

3 files changed

+586
-0
lines changed

3 files changed

+586
-0
lines changed

src/main/java/rx/Scheduler.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
import java.util.concurrent.TimeUnit;
1919

20+
import rx.annotations.Experimental;
2021
import rx.functions.Action0;
22+
import rx.functions.Func1;
23+
import rx.internal.schedulers.SchedulerWhen;
2124
import rx.schedulers.Schedulers;
2225
import rx.subscriptions.MultipleAssignmentSubscription;
2326

@@ -182,4 +185,79 @@ public long now() {
182185
return System.currentTimeMillis();
183186
}
184187

188+
/**
189+
* Allows the use of operators for controlling the timing around when
190+
* actions scheduled on workers are actually done. This makes it possible to
191+
* layer additional behavior on this {@link Scheduler}. The only parameter
192+
* is a function that flattens an {@link Observable} of {@link Observable}
193+
* of {@link Completable}s into just one {@link Completable}. There must be
194+
* a chain of operators connecting the returned value to the source
195+
* {@link Observable} otherwise any work scheduled on the returned
196+
* {@link Scheduler} will not be executed.
197+
* <p>
198+
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
199+
* {@link Completable}s is onNext'd to the combinator to be flattened. If
200+
* the inner {@link Observable} is not immediately subscribed to an calls to
201+
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
202+
* subscribed to actions are then onNext'd as {@link Completable}s.
203+
* <p>
204+
* Finally the actions scheduled on the parent {@link Scheduler} when the
205+
* inner most {@link Completable}s are subscribed to.
206+
* <p>
207+
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
208+
* onComplete and triggers any behavior in the flattening operator. The
209+
* {@link Observable} and all {@link Completable}s give to the flattening
210+
* function never onError.
211+
* <p>
212+
* Limit the amount concurrency two at a time without creating a new fix
213+
* size thread pool:
214+
*
215+
* <pre>
216+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
217+
* // use merge max concurrent to limit the number of concurrent
218+
* // callbacks two at a time
219+
* return Completable.merge(Observable.merge(workers), 2);
220+
* });
221+
* </pre>
222+
* <p>
223+
* This is a slightly different way to limit the concurrency but it has some
224+
* interesting benefits and drawbacks to the method above. It works by
225+
* limited the number of concurrent {@link Worker}s rather than individual
226+
* actions. Generally each {@link Observable} uses its own {@link Worker}.
227+
* This means that this will essentially limit the number of concurrent
228+
* subscribes. The danger comes from using operators like
229+
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
230+
* subscribing to the first {@link Observable} could deadlock the
231+
* subscription to the second.
232+
*
233+
* <pre>
234+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
235+
* // use merge max concurrent to limit the number of concurrent
236+
* // Observables two at a time
237+
* return Completable.merge(Observable.merge(workers, 2));
238+
* });
239+
* </pre>
240+
*
241+
* Slowing down the rate to no more than than 1 a second. This suffers from
242+
* the same problem as the one above I could find an {@link Observable}
243+
* operator that limits the rate without dropping the values (aka leaky
244+
* bucket algorithm).
245+
*
246+
* <pre>
247+
* Scheduler slowSched = Schedulers.computation().when(workers -> {
248+
* // use concatenate to make each worker happen one at a time.
249+
* return Completable.concat(workers.map(actions -> {
250+
* // delay the starting of the next worker by 1 second.
251+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
252+
* }));
253+
* });
254+
* </pre>
255+
*
256+
* @param combine
257+
* @return
258+
*/
259+
@Experimental
260+
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
261+
return (S) new SchedulerWhen(combine, this);
262+
}
185263
}
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Completable;
23+
import rx.Completable.CompletableOnSubscribe;
24+
import rx.Completable.CompletableSubscriber;
25+
import rx.Scheduler.Worker;
26+
import rx.Observable;
27+
import rx.Observer;
28+
import rx.Scheduler;
29+
import rx.Subscription;
30+
import rx.annotations.Experimental;
31+
import rx.functions.Action0;
32+
import rx.functions.Func1;
33+
import rx.internal.operators.BufferUntilSubscriber;
34+
import rx.observers.SerializedObserver;
35+
import rx.subjects.PublishSubject;
36+
import rx.subscriptions.Subscriptions;
37+
38+
/**
39+
* Allows the use of operators for controlling the timing around when actions
40+
* scheduled on workers are actually done. This makes it possible to layer
41+
* additional behavior on this {@link Scheduler}. The only parameter is a
42+
* function that flattens an {@link Observable} of {@link Observable} of
43+
* {@link Completable}s into just one {@link Completable}. There must be a chain
44+
* of operators connecting the returned value to the source {@link Observable}
45+
* otherwise any work scheduled on the returned {@link Scheduler} will not be
46+
* executed.
47+
* <p>
48+
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
49+
* {@link Completable}s is onNext'd to the combinator to be flattened. If the
50+
* inner {@link Observable} is not immediately subscribed to an calls to
51+
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
52+
* subscribed to actions are then onNext'd as {@link Completable}s.
53+
* <p>
54+
* Finally the actions scheduled on the parent {@link Scheduler} when the inner
55+
* most {@link Completable}s are subscribed to.
56+
* <p>
57+
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
58+
* onComplete and triggers any behavior in the flattening operator. The
59+
* {@link Observable} and all {@link Completable}s give to the flattening
60+
* function never onError.
61+
* <p>
62+
* Limit the amount concurrency two at a time without creating a new fix size
63+
* thread pool:
64+
*
65+
* <pre>
66+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
67+
* // use merge max concurrent to limit the number of concurrent
68+
* // callbacks two at a time
69+
* return Completable.merge(Observable.merge(workers), 2);
70+
* });
71+
* </pre>
72+
* <p>
73+
* This is a slightly different way to limit the concurrency but it has some
74+
* interesting benefits and drawbacks to the method above. It works by limited
75+
* the number of concurrent {@link Worker}s rather than individual actions.
76+
* Generally each {@link Observable} uses its own {@link Worker}. This means
77+
* that this will essentially limit the number of concurrent subscribes. The
78+
* danger comes from using operators like
79+
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
80+
* subscribing to the first {@link Observable} could deadlock the subscription
81+
* to the second.
82+
*
83+
* <pre>
84+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
85+
* // use merge max concurrent to limit the number of concurrent
86+
* // Observables two at a time
87+
* return Completable.merge(Observable.merge(workers, 2));
88+
* });
89+
* </pre>
90+
*
91+
* Slowing down the rate to no more than than 1 a second. This suffers from the
92+
* same problem as the one above I could find an {@link Observable} operator
93+
* that limits the rate without dropping the values (aka leaky bucket
94+
* algorithm).
95+
*
96+
* <pre>
97+
* Scheduler slowSched = Schedulers.computation().when(workers -> {
98+
* // use concatenate to make each worker happen one at a time.
99+
* return Completable.concat(workers.map(actions -> {
100+
* // delay the starting of the next worker by 1 second.
101+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
102+
* }));
103+
* });
104+
* </pre>
105+
*
106+
* @param combine
107+
* @return
108+
*/
109+
@Experimental
110+
public class SchedulerWhen extends Scheduler implements Subscription {
111+
private final Scheduler actualScheduler;
112+
private final Observer<Observable<Completable>> workerObserver;
113+
private final Subscription subscription;
114+
115+
public SchedulerWhen(Func1<Observable<Observable<Completable>>, Completable> combine, Scheduler actualScheduler) {
116+
this.actualScheduler = actualScheduler;
117+
// workers are converted into completables and put in this queue.
118+
PublishSubject<Observable<Completable>> workerSubject = PublishSubject.create();
119+
this.workerObserver = new SerializedObserver<Observable<Completable>>(workerSubject);
120+
// send it to a custom combinator to pick the order and rate at which
121+
// workers are processed.
122+
this.subscription = combine.call(workerSubject.onBackpressureBuffer()).subscribe();
123+
}
124+
125+
@Override
126+
public void unsubscribe() {
127+
subscription.unsubscribe();
128+
}
129+
130+
@Override
131+
public boolean isUnsubscribed() {
132+
return subscription.isUnsubscribed();
133+
}
134+
135+
@Override
136+
public Worker createWorker() {
137+
final Worker actualWorker = actualScheduler.createWorker();
138+
// a queue for the actions submitted while worker is waiting to get to
139+
// the subscribe to off the workerQueue.
140+
BufferUntilSubscriber<ScheduledAction> actionSubject = BufferUntilSubscriber.<ScheduledAction>create();
141+
final Observer<ScheduledAction> actionObserver = new SerializedObserver<ScheduledAction>(actionSubject);
142+
// convert the work of scheduling all the actions into a completable
143+
Observable<Completable> actions = actionSubject.map(new Func1<ScheduledAction, Completable>() {
144+
@Override
145+
public Completable call(final ScheduledAction action) {
146+
return Completable.create(new CompletableOnSubscribe() {
147+
@Override
148+
public void call(CompletableSubscriber actionCompletable) {
149+
actionCompletable.onSubscribe(action);
150+
action.call(actualWorker);
151+
actionCompletable.onCompleted();
152+
}
153+
});
154+
}
155+
});
156+
157+
// a worker that queues the action to the actionQueue subject.
158+
Worker worker = new Worker() {
159+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
160+
161+
@Override
162+
public void unsubscribe() {
163+
// complete the actionQueue when worker is unsubscribed to make
164+
// room for the next worker in the workerQueue.
165+
if (unsubscribed.compareAndSet(false, true)) {
166+
actualWorker.unsubscribe();
167+
actionObserver.onCompleted();
168+
}
169+
}
170+
171+
@Override
172+
public boolean isUnsubscribed() {
173+
return unsubscribed.get();
174+
}
175+
176+
@Override
177+
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
178+
// send a scheduled action to the actionQueue
179+
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
180+
actionObserver.onNext(delayedAction);
181+
return delayedAction;
182+
}
183+
184+
@Override
185+
public Subscription schedule(final Action0 action) {
186+
// send a scheduled action to the actionQueue
187+
ImmediateAction immediateAction = new ImmediateAction(action);
188+
actionObserver.onNext(immediateAction);
189+
return immediateAction;
190+
}
191+
};
192+
193+
// enqueue the completable that process actions put in reply subject
194+
workerObserver.onNext(actions);
195+
196+
// return the worker that adds actions to the reply subject
197+
return worker;
198+
}
199+
200+
private static final Subscription SUBSCRIBED = new Subscription() {
201+
@Override
202+
public void unsubscribe() {
203+
}
204+
205+
@Override
206+
public boolean isUnsubscribed() {
207+
return false;
208+
}
209+
};
210+
211+
private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed();
212+
213+
@SuppressWarnings("serial")
214+
private static abstract class ScheduledAction extends AtomicReference<Subscription> implements Subscription {
215+
public ScheduledAction() {
216+
super(SUBSCRIBED);
217+
}
218+
219+
private final void call(Worker actualWorker) {
220+
Subscription oldState = get();
221+
// either SUBSCRIBED or UNSUBSCRIBED
222+
if (oldState == UNSUBSCRIBED) {
223+
// no need to schedule return
224+
return;
225+
}
226+
if (oldState != SUBSCRIBED) {
227+
// has already been scheduled return
228+
// should not be able to get here but handle it anyway by not
229+
// rescheduling.
230+
return;
231+
}
232+
233+
Subscription newState = callActual(actualWorker);
234+
235+
if (!compareAndSet(SUBSCRIBED, newState)) {
236+
// set would only fail if the new current state is some other
237+
// subscription from a concurrent call to this method.
238+
// Unsubscribe from the action just scheduled because it lost
239+
// the race.
240+
newState.unsubscribe();
241+
}
242+
}
243+
244+
protected abstract Subscription callActual(Worker actualWorker);
245+
246+
@Override
247+
public boolean isUnsubscribed() {
248+
return get().isUnsubscribed();
249+
}
250+
251+
@Override
252+
public void unsubscribe() {
253+
Subscription oldState;
254+
// no matter what the current state is the new state is going to be
255+
Subscription newState = UNSUBSCRIBED;
256+
do {
257+
oldState = get();
258+
if (oldState == UNSUBSCRIBED) {
259+
// the action has already been unsubscribed
260+
return;
261+
}
262+
} while (!compareAndSet(oldState, newState));
263+
264+
if (oldState != SUBSCRIBED) {
265+
// the action was scheduled. stop it.
266+
oldState.unsubscribe();
267+
}
268+
}
269+
}
270+
271+
@SuppressWarnings("serial")
272+
private static class ImmediateAction extends ScheduledAction {
273+
private final Action0 action;
274+
275+
public ImmediateAction(Action0 action) {
276+
this.action = action;
277+
}
278+
279+
@Override
280+
protected Subscription callActual(Worker actualWorker) {
281+
return actualWorker.schedule(action);
282+
}
283+
}
284+
285+
@SuppressWarnings("serial")
286+
private static class DelayedAction extends ScheduledAction {
287+
private final Action0 action;
288+
private final long delayTime;
289+
private final TimeUnit unit;
290+
291+
public DelayedAction(Action0 action, long delayTime, TimeUnit unit) {
292+
this.action = action;
293+
this.delayTime = delayTime;
294+
this.unit = unit;
295+
}
296+
297+
@Override
298+
protected Subscription callActual(Worker actualWorker) {
299+
return actualWorker.schedule(action, delayTime, unit);
300+
}
301+
}
302+
}

0 commit comments

Comments
 (0)