Skip to content

Commit 1274b20

Browse files
committed
Schedule Session work using Rx Schedulers; don't reinvent the wheel.
Also improved error handling.
1 parent 6e671f9 commit 1274b20

11 files changed

+166
-240
lines changed

src/main/java/crud/implementer/AbstractDataSink.java

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
*/
1515
package crud.implementer;
1616

17-
import java.util.concurrent.Callable;
18-
1917
import javax.annotation.Nonnull;
2018

2119
import crud.core.DataSink;
@@ -39,13 +37,12 @@ public abstract class AbstractDataSink<E, R>
3937
*/
4038
@Override
4139
public Observable<R> write(final E value) {
42-
final Observable<R> result = Observable.create(new SubmitWriteOnSubscribe(value)).cache();
43-
/* Start a subscription now, so that the write is scheduled
44-
* immediately. The no-argument subscribe() does not handle errors,
45-
* so materialize() to prevent it from seeing any.
46-
*/
47-
result.materialize().subscribe();
48-
return result;
40+
return getWorker().scheduleHot(new SessionWorker.Task<R>() {
41+
@Override
42+
public void call(final Subscriber<? super R> sub) throws Exception {
43+
doWrite(value, sub);
44+
}
45+
});
4946
}
5047

5148
protected AbstractDataSink(@Nonnull final SessionWorker worker) {
@@ -68,29 +65,13 @@ protected AbstractDataSink(@Nonnull final SessionWorker worker) {
6865
* @param writeMe The value to write.
6966
* @param resultSub The subscriber to which the result of the write should
7067
* be reported.
68+
*
69+
* @throws Exception Subclasses may throw whatever they wish.
70+
* Exceptions will be passed to
71+
* {@link Observer#onError(Throwable)}.
7172
*/
72-
protected void doWrite(final E writeMe, final Subscriber<? super R> resultSub) {
73+
protected void doWrite(final E writeMe, final Subscriber<? super R> resultSub) throws Exception {
7374
throw new AssertionError("Must override this method if not overriding write()");
7475
}
7576

76-
77-
private final class SubmitWriteOnSubscribe implements Observable.OnSubscribe<R> {
78-
private final E writeMe;
79-
80-
public SubmitWriteOnSubscribe(final E writeMe) {
81-
this.writeMe = writeMe;
82-
}
83-
84-
@Override
85-
public void call(final Subscriber<? super R> sub) {
86-
submit(new Callable<Void>() {
87-
@Override
88-
public Void call() throws Exception {
89-
doWrite(SubmitWriteOnSubscribe.this.writeMe, sub);
90-
return null;
91-
}
92-
});
93-
}
94-
}
95-
9677
}

src/main/java/crud/implementer/AbstractDataSource.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
*/
1515
package crud.implementer;
1616

17-
import java.util.concurrent.Callable;
18-
1917
import javax.annotation.Nonnull;
2018

2119
import crud.core.DataSource;
@@ -28,16 +26,10 @@ public abstract class AbstractDataSource<E> extends AbstractSessionParticipant i
2826

2927
@Override
3028
public Observable<E> read() {
31-
return Observable.create(new Observable.OnSubscribe<E>() {
29+
return getWorker().scheduleCold(new SessionWorker.Task<E>() {
3230
@Override
33-
public void call(final Subscriber<? super E> sub) {
34-
submit(new Callable<Void>() {
35-
@Override
36-
public Void call() throws Exception {
37-
onReadSubscribe(sub);
38-
return null;
39-
}
40-
});
31+
public void call(final Subscriber<? super E> sub) throws Exception {
32+
onReadSubscribe(sub);
4133
}
4234
});
4335
}
@@ -58,8 +50,12 @@ protected AbstractDataSource(@Nonnull final SessionWorker worker) {
5850
* directly, then there is no need to override this method.
5951
*
6052
* @param sub The {@link Subscriber} to notify of the elements being read.
53+
*
54+
* @throws Exception Subclasses may throw whatever they wish.
55+
* Exceptions will be passed to
56+
* {@link Observer#onError(Throwable)}.
6157
*/
62-
protected void onReadSubscribe(final Subscriber<? super E> sub) {
58+
protected void onReadSubscribe(final Subscriber<? super E> sub) throws Exception {
6359
throw new AssertionError("Must override this method if not overriding read()");
6460
}
6561

src/main/java/crud/implementer/AbstractSession.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,25 @@
1515
package crud.implementer;
1616

1717
import java.util.Objects;
18-
import java.util.concurrent.Callable;
1918
import java.util.concurrent.TimeUnit;
2019

2120
import javax.annotation.Nonnull;
2221

2322
import crud.core.Session;
2423
import rx.Observable;
2524
import rx.Observer;
25+
import rx.Subscriber;
2626

2727

2828
public abstract class AbstractSession extends AbstractAsyncCloseable implements Session {
2929

3030
private @Nonnull final SessionWorker worker;
3131
private @Nonnull final Session.Ordering ordering;
3232

33-
private final Callable<Void> shutdownTask = new Callable<Void>() {
33+
private final SessionWorker.Task<Void> shutdownTask = new SessionWorker.Task<Void>() {
3434
@Override
35-
public Void call() throws Exception {
35+
public void call(final Subscriber<? super Void> sub) throws Exception {
3636
doShutdown();
37-
return null;
3837
}
3938
};
4039

src/main/java/crud/implementer/AbstractSessionParticipant.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package crud.implementer;
1616

1717
import java.util.Objects;
18-
import java.util.concurrent.Callable;
1918

2019
import javax.annotation.Nonnull;
2120

@@ -24,6 +23,7 @@
2423
import crud.core.DataSource;
2524
import rx.Observable;
2625
import rx.Observer;
26+
import rx.Subscriber;
2727

2828

2929
/**
@@ -42,11 +42,10 @@ public abstract class AbstractSessionParticipant implements AsyncCloseable {
4242
*/
4343
@Override
4444
public Observable<Void> shutdown() {
45-
return submit(new Callable<Void>() {
45+
return this.worker.scheduleHot(new SessionWorker.Task<Void>() {
4646
@Override
47-
public Void call() throws Exception {
47+
public void call(final Subscriber<? super Void> sub) throws Exception {
4848
doShutdown();
49-
return null;
5049
}
5150
});
5251
}
@@ -72,8 +71,8 @@ protected void doShutdown() throws Exception {
7271
// do nothing
7372
}
7473

75-
protected final Observable<Void> submit(final Callable<Void> task) {
76-
return this.worker.submit(task);
74+
public final @Nonnull SessionWorker getWorker() {
75+
return this.worker;
7776
}
7877

7978
}

src/main/java/crud/implementer/SessionWorker.java

Lines changed: 73 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
*/
1515
package crud.implementer;
1616

17-
import java.util.concurrent.Callable;
1817
import java.util.concurrent.ExecutorService;
1918
import java.util.concurrent.Executors;
20-
import java.util.concurrent.RejectedExecutionException;
2119
import java.util.concurrent.TimeUnit;
2220
import java.util.concurrent.TimeoutException;
2321
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,7 +26,10 @@
2826
import crud.core.Session;
2927
import rx.Observable;
3028
import rx.Observer;
29+
import rx.Scheduler;
3130
import rx.Subscriber;
31+
import rx.functions.Action1;
32+
import rx.schedulers.Schedulers;
3233

3334

3435
/**
@@ -41,43 +42,81 @@ public final class SessionWorker {
4142

4243
private final AtomicBoolean stopped = new AtomicBoolean(false);
4344
private final ExecutorService executor = Executors.newSingleThreadExecutor();
45+
private final Scheduler scheduler = Schedulers.from(this.executor);
4446

4547

4648
/**
47-
* Schedule the given task to run on this worker's asynchronous thread.
48-
* Return a {@link Observable#cache() cached} {@link Observable} that
49-
* will emit the success or error result of the task to all subscribers.
50-
* <p/>
51-
* If {@link #shutdown(Callable, long, TimeUnit)} was already called, the
52-
* resulting {@link Observable} will emit
53-
* {@link RejectedExecutionException}.
49+
* Wrap the given {@link Action1 action} in an {@link Observable} and
50+
* schedule its subscription to run on the {@link Scheduler} encapsulated
51+
* by this {@link SessionWorker worker}. This method only creates the
52+
* {@link Observable}; it does not subscribe to it.
53+
*
54+
* @see #subscribeHot(Observable)
5455
*/
55-
public Observable<Void> submit(@Nonnull final Callable<Void> task) {
56-
if (!this.stopped.get()) {
57-
return doSubmit(task);
58-
} else {
59-
return Observable.error(new RejectedExecutionException("Already stopped"));
60-
}
56+
public <T> Observable<T> scheduleCold(@Nonnull final Task<T> task) {
57+
final Observable.OnSubscribe<T> onSubscribe = new Observable.OnSubscribe<T>() {
58+
@Override
59+
public void call(final Subscriber<? super T> sub) {
60+
try {
61+
task.call(sub);
62+
sub.onCompleted();
63+
} catch (final MiddlewareException mx) {
64+
sub.onError(mx);
65+
} catch (final Exception ex) {
66+
sub.onError(new MiddlewareException(ex.getMessage(), ex));
67+
}
68+
}
69+
};
70+
return Observable.create(onSubscribe).subscribeOn(this.scheduler);
71+
}
72+
73+
/**
74+
* Wrap the given {@link Action1 action} in an {@link Observable} and
75+
* schedule its subscription to run on the {@link Scheduler} encapsulated
76+
* by this {@link SessionWorker worker}. Then
77+
* {@link Observable#cache() cache} the result, and begin the
78+
* subscription. This allows the subscription action to begin immediately,
79+
* but allows the caller to see every resulting value. Note that the use
80+
* of cache() assumes that the number of those results is relatively
81+
* small.
82+
*
83+
* @see #scheduleCold(Task)
84+
* @see #subscribeHot(Observable)
85+
*/
86+
public <T> Observable<T> scheduleHot(@Nonnull final Task<T> task) {
87+
final Observable<T> result = scheduleCold(task).cache();
88+
doSubscribeHot(result);
89+
return result;
90+
}
91+
92+
/**
93+
* Start subscribing to the given {@link Observable} on the
94+
* {@link Scheduler} encapsulated by this {@link SessionWorker worker}.
95+
* Note that the caller may miss some results if it does not e.g.
96+
* {@link Observable#share() share} the input Observable before calling
97+
* this method.
98+
*/
99+
public <T> void subscribeHot(final Observable<T> obs) {
100+
doSubscribeHot(obs.subscribeOn(this.scheduler));
61101
}
62102

63103
/**
64-
* Stop accepting new tasks via {@link #submit(Callable)}, and initiate
104+
* {@link #scheduleHot(Task) Schedule} the given
105+
* task, then stop accepting any new tasks, and initiate
65106
* the termination of this worker's background thread. The resulting
66107
* {@link Observable} will emit {@link Observer#onCompleted()} once the
67108
* termination is complete, a {@link TimeoutException} if it fails to
68109
* complete within the given duration, or {@link InterruptedException} if
69110
* the shutdown is interrupted.
70111
*
71112
* @param finalTask The caller should perform any of its own cleanup in
72-
* this task, scheduled here as opposed to in
73-
* {@link #submit(Callable)} to avoid race conditions on
74-
* shutdown.
113+
* this task, scheduled here to avoid race conditions.
75114
*/
76115
public Observable<Void> shutdown(
77-
final @Nonnull Callable<Void> finalTask,
116+
@Nonnull final Task<Void> finalTask,
78117
final long waitDuration, @Nonnull final TimeUnit waitUnit) {
79118
if (!this.stopped.getAndSet(true)) {
80-
final Observable<Void> taskResult = doSubmit(finalTask);
119+
final Observable<Void> taskResult = scheduleHot(finalTask);
81120
this.executor.shutdown(); // non-blocking
82121
final Observable<Void> await = Observable.create(new Observable.OnSubscribe<Void>() {
83122
@Override
@@ -112,53 +151,23 @@ public void call(final Subscriber<? super Void> sub) {
112151
}
113152
}
114153

115-
private Observable<Void> doSubmit(final Callable<Void> task) {
116-
final Observable<Void> result = Observable.create(actionOf(task)).cache();
117-
/* Start a subscription now, so that the given task is immediately
118-
* scheduled. The no-argument subscribe() does not handle errors, so
119-
* materialize so that it won't see any.
154+
private static <T> void doSubscribeHot(final Observable<T> obs) {
155+
/* The no-argument subscribe() does not handle errors, so
156+
* materialize() so that it won't see any.
120157
*/
121-
result.materialize().subscribe();
122-
return result;
158+
obs.materialize().subscribe();
123159
}
124160

125-
private Observable.OnSubscribe<Void> actionOf(final Callable<Void> task) {
126-
return new Observable.OnSubscribe<Void>() {
127-
@Override
128-
public void call(final Subscriber<? super Void> sub) {
129-
SessionWorker.this.executor.submit(runAndNotify(task, sub));
130-
}
131-
};
132-
}
133161

134-
private static Runnable runAndNotify(
135-
final Callable<Void> runMe,
136-
final Subscriber<? super Void> notifyMe) {
137-
return new Runnable() {
138-
@Override
139-
public void run() {
140-
try {
141-
runMe.call();
142-
notifyMe.onCompleted();
143-
} catch (final RuntimeException rex) {
144-
/* RuntimeExceptions are assumed to indicate program bugs.
145-
* Report them to the subscriber as-is. This clause will
146-
* also include MiddlewareExceptions as-is, which is
147-
* desirable.
148-
*/
149-
notifyMe.onError(rex);
150-
} catch (final Exception ex) {
151-
/* Any checked exception is assumed to represent a
152-
* middleware-specific failure condition, and so is mapped
153-
* to MiddlewareException.
154-
*
155-
* TODO: Provide a pluggable exception-translation
156-
* capability.
157-
*/
158-
notifyMe.onError(new MiddlewareException(ex.getMessage(), ex));
159-
}
160-
}
161-
};
162+
/**
163+
* An asynchronous task to be scheduled by a {@link SessionWorker}.
164+
* It will be wrapped by an instance of {@link rx.Observable.OnSubscribe},
165+
* and execution of {@link Observer#onCompleted()} and
166+
* {@link Observer#onError(Throwable)} will be taken care of on behalf of
167+
* the task; it need only invoke {@link Observer#onNext(Object)}.
168+
*/
169+
public static interface Task<T> {
170+
void call(Subscriber<? super T> sub) throws Exception;
162171
}
163172

164173
}

0 commit comments

Comments
 (0)