Skip to content

Commit e5ec3b7

Browse files
committed
Shutting down a DataBus automatically shuts down any contained Sessions.
1 parent 11b8b3f commit e5ec3b7

16 files changed

+255
-37
lines changed

src/main/java/crud/core/DataBus.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,13 @@ public interface DataBus extends AsyncCloseable {
117117
*/
118118
public @Nonnull TransactedSession startTransactedSession();
119119

120+
/**
121+
* {@inheritDoc}
122+
* <p/>
123+
* Shutting down a {@link DataBus} implicitly shuts down all
124+
* {@link Session}s created in the context of that {@code DataBus}.
125+
*/
126+
@Override
127+
public Observable<Void> shutdown();
128+
120129
}

src/main/java/crud/file/FileSystem.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import crud.core.Session.Ordering;
2525
import crud.core.WritableResourceSet;
2626
import crud.implementer.AbstractDataBus;
27+
import crud.implementer.DataBusWorker;
2728
import crud.implementer.DefaultSession;
2829

2930

@@ -39,14 +40,18 @@ public final class FileSystem extends AbstractDataBus {
3940
Session.Ordering.ORDERED));
4041

4142

43+
public FileSystem() {
44+
super(DataBusWorker.create());
45+
}
46+
4247
@Override
4348
public Set<Session.Ordering> getSupportedSessionOrderings() {
4449
return supportedOrderings;
4550
}
4651

4752
@Override
4853
protected Session doStartOrderedSession() throws Exception {
49-
return new DefaultSession(Session.Ordering.ORDERED);
54+
return new DefaultSession(getWorker(), Session.Ordering.ORDERED);
5055
}
5156

5257
@Override

src/main/java/crud/implementer/AbstractDataBus.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515
package crud.implementer;
1616

17+
import java.util.Objects;
1718
import java.util.Set;
1819

1920
import javax.annotation.Nonnull;
@@ -28,7 +29,9 @@
2829
import crud.core.TransactedSession;
2930
import crud.core.UnsupportedSessionOrderingException;
3031
import crud.core.WritableResourceSet;
32+
import rx.Observable;
3133
import rx.Observer;
34+
import rx.Subscriber;
3235

3336

3437
/**
@@ -41,7 +44,28 @@
4144
*
4245
* @author Rick Warren
4346
*/
44-
public abstract class AbstractDataBus extends AbstractAsyncCloseable implements DataBus {
47+
public abstract class AbstractDataBus implements DataBus {
48+
49+
private @Nonnull final DataBusWorker worker;
50+
51+
private final Task<Void> shutdownTask = new Task<Void>() {
52+
@Override
53+
public void call(final Subscriber<? super Void> sub) throws Exception {
54+
doShutdown();
55+
}
56+
};
57+
58+
59+
/**
60+
* Provided for use by subclasses and their clients.
61+
* <p/>
62+
* <em>ATTN</em>: This method is not declared {@code final} in order to
63+
* support mocking in unit tests. Nevertheless, it is not intended for
64+
* overriding, and the behavior in that case is unspecified.
65+
*/
66+
public @Nonnull DataBusWorker getWorker() {
67+
return this.worker;
68+
}
4569

4670
/**
4771
* Do nothing. Subclasses can override this method to do more.
@@ -161,8 +185,16 @@ public TransactedSession startTransactedSession() {
161185
}
162186
}
163187

164-
protected AbstractDataBus() {
165-
// nothing to do
188+
/**
189+
* Subclasses should override {@link #doShutdown()} instead.
190+
*/
191+
@Override
192+
public final Observable<Void> shutdown() {
193+
return this.worker.shutdown(this.shutdownTask);
194+
}
195+
196+
protected AbstractDataBus(@Nonnull final DataBusWorker worker) {
197+
this.worker = Objects.requireNonNull(worker);
166198
}
167199

168200
/**
@@ -301,4 +333,20 @@ protected boolean isResourceSetAvailable(final WritableResourceSet.Id<?, ?, ?> i
301333
throw new AssertionError("isResourceSetAvailable() indicated ResourceSet available, but this method was not overridden");
302334
}
303335

336+
/**
337+
* Subclasses should override this method to perform any shutdown task
338+
* they have to do.
339+
*
340+
* By default, this method does nothing.
341+
*
342+
* @throws Exception Subclasses may throw whatever they wish.
343+
* Exceptions will be passed to
344+
* {@link Observer#onError(Throwable)}.
345+
*
346+
* @see #shutdown()
347+
*/
348+
protected void doShutdown() throws Exception {
349+
// do nothing
350+
}
351+
304352
}

src/main/java/crud/implementer/AbstractSession.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ protected AbstractSession(
7070
this.ordering = Objects.requireNonNull(ordering);
7171
}
7272

73+
protected AbstractSession(
74+
@Nonnull final DataBusWorker dataBusWorker,
75+
@Nonnull final SessionWorker myWorker,
76+
@Nonnull final Session.Ordering ordering) {
77+
this(myWorker, ordering);
78+
dataBusWorker.addPreShutdownHook(this);
79+
}
80+
7381
/**
7482
* Subclasses should override this method to perform any shutdown task
7583
* they have to do. It will be run in the context of the

src/main/java/crud/implementer/AbstractTransactedSession.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,16 @@ public final Observable<Void> rollback() {
4545
return this.tx.rollback();
4646
}
4747

48-
protected AbstractTransactedSession(@Nonnull final SessionWorker worker) {
49-
super(worker, Session.Ordering.TRANSACTED);
50-
51-
this.tx = new TransactionLifecycle(worker) {
52-
@Override
53-
protected void doCommit() throws Exception {
54-
AbstractTransactedSession.this.doCommit();
55-
}
48+
protected AbstractTransactedSession(@Nonnull final SessionWorker myWorker) {
49+
super(myWorker, Session.Ordering.TRANSACTED);
50+
this.tx = initTransactionLifecycle(myWorker);
51+
}
5652

57-
@Override
58-
protected void doRollback() throws Exception {
59-
AbstractTransactedSession.this.doRollback();
60-
}
61-
};
53+
protected AbstractTransactedSession(
54+
@Nonnull final DataBusWorker dataBusWorker,
55+
@Nonnull final SessionWorker myWorker) {
56+
super(dataBusWorker, myWorker, Session.Ordering.TRANSACTED);
57+
this.tx = initTransactionLifecycle(myWorker);
6258
}
6359

6460
/**
@@ -79,4 +75,18 @@ protected void doRollback() throws Exception {
7975
*/
8076
protected abstract void doRollback() throws Exception;
8177

78+
private TransactionLifecycle initTransactionLifecycle(final SessionWorker myWorker) {
79+
return new TransactionLifecycle(myWorker) {
80+
@Override
81+
protected void doCommit() throws Exception {
82+
AbstractTransactedSession.this.doCommit();
83+
}
84+
85+
@Override
86+
protected void doRollback() throws Exception {
87+
AbstractTransactedSession.this.doRollback();
88+
}
89+
};
90+
}
91+
8292
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/* Copyright 2015 Rick Warren
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
4+
* use this file except in compliance with the License. You may obtain a copy of
5+
* the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
* License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
package crud.implementer;
16+
17+
import javax.annotation.Nonnull;
18+
import javax.annotation.concurrent.ThreadSafe;
19+
20+
import crud.core.DataBus;
21+
import crud.core.Session;
22+
import rx.Observable;
23+
import rx.Observer;
24+
import rx.schedulers.Schedulers;
25+
26+
27+
/**
28+
* Manages lifecycle concerns on behalf of {@link DataBus} implementations.
29+
*
30+
* @author Rick Warren
31+
*/
32+
@ThreadSafe
33+
public class DataBusWorker {
34+
35+
private final WorkerDelegate delegate = new WorkerDelegate(Schedulers.immediate());
36+
37+
38+
public static DataBusWorker create() {
39+
return new DataBusWorker();
40+
}
41+
42+
/**
43+
* Add a listener that will be called prior to this {@link DataBusWorker}
44+
* being {@link #shutdown(Task) shut down}. The listener will only be
45+
* called once, just before shutting down.
46+
*
47+
* @throws IllegalStateException If this method is called after
48+
* {@link #shutdown(Task)} itself.
49+
*/
50+
public void addPreShutdownHook(@Nonnull final Session shutMeDown) {
51+
this.delegate.addPreShutdownHook(shutMeDown);
52+
}
53+
54+
/**
55+
* Schedule the given task after shutting down the
56+
* {@link #addPreShutdownHook(Session) pre-shutdown hooks}. The
57+
* resulting {@link Observable} will emit one of the following:
58+
* <ol>
59+
* <li>Any {@link Observer#onError(Throwable) error} emitted by the given
60+
* final task. (This task will be run after all
61+
* {@link #addPreShutdownHook(Session) pre-shutdown hooks}
62+
* have been shut down, but any error from it will receive precedence
63+
* with respect to reporting, because it is likely to be the most
64+
* relevant to the caller.)</li>
65+
* <li>The first {@link Observer#onError(Throwable) error}, if any,
66+
* emitted by any of the
67+
* {@link #addPreShutdownHook(Session) pre-shutdown hooks}.
68+
* (These hooks are shut down before the final task runs, but their
69+
* errors are not allowed to hide any errors from that task.)</li>
70+
* <li>{@link Observer#onCompleted()} once the termination is complete,
71+
* if no errors occurred.</li>
72+
* </ol>
73+
* <p/>
74+
* This method only operates once. Calling it additional times has no
75+
* effect, and will return an {@link Observable} that emits
76+
* {@link Observer#onCompleted()}.
77+
*
78+
* @param finalTask The caller should perform any of its own cleanup in
79+
* this task, scheduled here to avoid race conditions.
80+
*/
81+
public Observable<Void> shutdown(final Task<Void> finalTask) {
82+
return this.delegate.shutdown(finalTask);
83+
}
84+
85+
private DataBusWorker() {
86+
/* Private to prevent subclassing. We could just make the class final,
87+
* but that would prevent mocking as well. Fortunately, Mockito can
88+
* call a private constructor reflectively.
89+
*/
90+
}
91+
92+
}

src/main/java/crud/implementer/DefaultSession.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package crud.implementer;
1616

17+
import javax.annotation.Nonnull;
18+
1719
import com.google.common.base.Preconditions;
1820

1921
import crud.core.Session;
@@ -28,8 +30,10 @@
2830
*/
2931
public class DefaultSession extends AbstractSession {
3032

31-
public DefaultSession(final Session.Ordering ordering) {
32-
super(SessionWorker.create(), ordering);
33+
public DefaultSession(
34+
@Nonnull final DataBusWorker dataBusWorker,
35+
@Nonnull final Session.Ordering ordering) {
36+
super(dataBusWorker, SessionWorker.create(), ordering);
3337
Preconditions.checkArgument(
3438
getOrdering() != Session.Ordering.TRANSACTED,
3539
"Not a TransactedSession");

src/main/java/crud/jdbc/JdbcDataBus.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,23 @@
3030
import com.google.common.base.Optional;
3131
import com.google.common.base.Preconditions;
3232

33+
import crud.core.DataBus;
3334
import crud.core.ReadableResourceSet;
3435
import crud.core.Session;
3536
import crud.core.TransactedSession;
3637
import crud.core.WritableResourceSet;
3738
import crud.implementer.AbstractDataBus;
39+
import crud.implementer.DataBusWorker;
3840

3941

42+
/**
43+
* A {@link DataBus} implementation that wraps a JDBC {@link DataSource}.
44+
* It supports {@link TransactedSession}s by means of the usual JDBC
45+
* transactions. It also supports ordered {@link Session}s by means of
46+
* {@link Connection#setAutoCommit(boolean) auto-commit}.
47+
*
48+
* @author Rick Warren
49+
*/
4050
public class JdbcDataBus extends AbstractDataBus {
4151

4252
private static final Logger log = LoggerFactory.getLogger(JdbcDataBus.class);
@@ -55,15 +65,16 @@ public JdbcDataBus(@Nonnull final DataSource dataSource) {
5565

5666
public JdbcDataBus(
5767
@Nonnull final DataSource dataSource,
58-
final String username,
59-
final String password) {
68+
@Nonnull final String username,
69+
@Nonnull final String password) {
6070
this(dataSource, Optional.of(username), Optional.of(password));
6171
}
6272

6373
private JdbcDataBus(
6474
@Nonnull final DataSource dataSource,
6575
final Optional<String> username,
6676
final Optional<String> password) {
77+
super(DataBusWorker.create());
6778
this.dataSource = Objects.requireNonNull(dataSource);
6879
this.username = Objects.requireNonNull(username);
6980
this.password = Objects.requireNonNull(password);
@@ -82,15 +93,15 @@ public Set<Session.Ordering> getSupportedSessionOrderings() {
8293
protected Session doStartOrderedSession() throws SQLException {
8394
final Connection connection = getConnection();
8495
connection.setAutoCommit(true);
85-
return new JdbcSession(connection);
96+
return new JdbcSession(getWorker(), connection);
8697
}
8798

8899
@Override
89100
@SuppressWarnings("resource")
90101
protected TransactedSession doStartTransactedSession() throws SQLException {
91102
final Connection connection = getConnection();
92103
connection.setAutoCommit(false);
93-
return new JdbcTransactedSession(connection);
104+
return new JdbcTransactedSession(getWorker(), connection);
94105
}
95106

96107
@Override

src/main/java/crud/jdbc/JdbcSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import crud.core.Session;
2525
import crud.core.WritableResource;
2626
import crud.implementer.AbstractSession;
27+
import crud.implementer.DataBusWorker;
2728
import crud.implementer.SessionWorker;
2829

2930

@@ -32,8 +33,10 @@
3233
private @Nonnull final Connection connection;
3334

3435

35-
public JdbcSession(@Nonnull final Connection connection) {
36-
super(SessionWorker.create(), Session.Ordering.ORDERED);
36+
public JdbcSession(
37+
@Nonnull final DataBusWorker dataBusWorker,
38+
@Nonnull final Connection connection) {
39+
super(dataBusWorker, SessionWorker.create(), Session.Ordering.ORDERED);
3740
this.connection = Objects.requireNonNull(connection);
3841
}
3942

0 commit comments

Comments
 (0)