Skip to content

Commit 96c848c

Browse files
committed
Take advantage of abstract base classes in JMS implementation
1 parent daf5838 commit 96c848c

7 files changed

+29
-47
lines changed

src/main/java/crud/jms/MessageConsumerDataSource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,24 @@
2525

2626
import crud.core.DataSource;
2727
import crud.core.MiddlewareException;
28+
import crud.implementer.SessionWorker;
2829
import rx.Observable;
2930
import rx.Subscriber;
3031
import rx.Subscription;
3132

3233

3334
/*package*/ final class MessageConsumerDataSource<M extends Message> implements DataSource<M> {
3435

35-
private @Nonnull final SessionWrapper session;
36+
private @Nonnull final SessionWorker worker;
3637
private @Nonnull final MessageConsumer consumer;
3738
private @Nonnull final Observable<M> hotObservable;
3839

3940

4041
public MessageConsumerDataSource(
41-
@Nonnull final SessionWrapper session,
42+
@Nonnull final SessionWorker worker,
4243
@Nonnull final MessageConsumer consumer,
4344
@Nonnull final Class<M> messageType) {
44-
this.session = Objects.requireNonNull(session);
45+
this.worker = Objects.requireNonNull(worker);
4546
this.consumer = Objects.requireNonNull(consumer);
4647

4748
this.hotObservable = Observable.create(new MessageListenerToSubscriberHandoff(messageType)).share();
@@ -59,7 +60,7 @@ public Observable<M> read() {
5960

6061
@Override
6162
public Observable<Void> shutdown() {
62-
return this.session.submit(new Callable<Void>() {
63+
return this.worker.submit(new Callable<Void>() {
6364
@Override
6465
public Void call() throws JMSException {
6566
/* TODO: Should this result in an onCompleted() to the
@@ -84,6 +85,7 @@ public MessageListenerToSubscriberHandoff(final Class<M> messageType) {
8485
public void call(final Subscriber<? super M> sub) {
8586
try {
8687
sub.add(new MessageListenerRemover());
88+
// FIXME: Adding listener in wrong thread?
8789
MessageConsumerDataSource.this.consumer.setMessageListener(new MessageListener() {
8890
@Override
8991
public void onMessage(final Message message) {

src/main/java/crud/jms/MessageConsumingDataSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public DataSource<M> dataSource(final String key, final Session session) {
5555
final MessageConsumer messageConsumer = key.isEmpty()
5656
? realSession.createConsumer(this.destination)
5757
: realSession.createConsumer(this.destination, key);
58-
return new MessageConsumerDataSource<>(sessionImpl, messageConsumer, this.id.getElementType());
58+
return new MessageConsumerDataSource<>(sessionImpl.worker(), messageConsumer, this.id.getElementType());
5959
} catch (final JMSException jx) {
6060
throw new MiddlewareException(jx.getMessage(), jx);
6161
}

src/main/java/crud/jms/MessageProducerDataSink.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,21 @@
2525

2626
import crud.core.DataSink;
2727
import crud.core.MiddlewareException;
28+
import crud.implementer.SessionWorker;
2829
import rx.Observable;
2930
import rx.Subscriber;
3031

3132

3233
/*package*/ final class MessageProducerDataSink<M extends Message> implements DataSink<M, Void> {
3334

34-
private @Nonnull final SessionWrapper session;
35+
private @Nonnull final SessionWorker worker;
3536
private @Nonnull final MessageProducer producer;
3637

3738

3839
public MessageProducerDataSink(
39-
@Nonnull final SessionWrapper session,
40+
@Nonnull final SessionWorker worker,
4041
@Nonnull final MessageProducer producer) {
41-
this.session = Objects.requireNonNull(session);
42+
this.worker = Objects.requireNonNull(worker);
4243
this.producer = Objects.requireNonNull(producer);
4344
}
4445

@@ -55,7 +56,7 @@ public Observable<Void> write(final M message) {
5556

5657
@Override
5758
public Observable<Void> shutdown() {
58-
return this.session.submit(new Callable<Void>() {
59+
return this.worker.submit(new Callable<Void>() {
5960
@Override
6061
public Void call() throws Exception {
6162
MessageProducerDataSink.this.producer.close();

src/main/java/crud/jms/MessageProducingDataSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public DataSink<M, Void> dataSink(final String key, final Session session) {
6262
final javax.jms.Session realSession = sessionImpl.getDelegate();
6363
try {
6464
final MessageProducer messageProducer = realSession.createProducer(this.destination);
65-
return new MessageProducerDataSink<>(sessionImpl, messageProducer);
65+
return new MessageProducerDataSink<>(sessionImpl.worker(), messageProducer);
6666
} catch (final JMSException jx) {
6767
throw new MiddlewareException(jx.getMessage(), jx);
6868
}

src/main/java/crud/jms/NonTransactedJmsSession.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,9 @@
2020
/*package*/ final class NonTransactedJmsSession extends SessionWrapper implements Session {
2121

2222
public NonTransactedJmsSession(final javax.jms.Session delegate) {
23-
super(delegate);
23+
super(Session.Ordering.ORDERED, delegate);
2424
// Assumed, but illegal to check in this thread:
2525
//assert !getDelegate().getTransacted();
2626
}
2727

28-
@Override
29-
public Session.Ordering getOrdering() {
30-
return Session.Ordering.ORDERED;
31-
}
32-
3328
}

src/main/java/crud/jms/SessionWrapper.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,36 @@
1515
package crud.jms;
1616

1717
import java.util.Objects;
18-
import java.util.concurrent.Callable;
19-
import java.util.concurrent.TimeUnit;
2018

2119
import javax.annotation.Nonnull;
2220
import javax.jms.JMSException;
2321

22+
import crud.core.Session;
23+
import crud.implementer.AbstractSession;
2424
import crud.implementer.SessionWorker;
2525

26-
import crud.core.AsyncCloseable;
27-
import rx.Observable;
2826

27+
/*package*/ abstract class SessionWrapper extends AbstractSession {
2928

30-
/*package*/ abstract class SessionWrapper implements AsyncCloseable {
31-
32-
private final SessionWorker worker = new SessionWorker();
3329
private @Nonnull final javax.jms.Session delegate;
3430

35-
private final Callable<Void> closeTask = new Callable<Void>() {
36-
@Override
37-
public Void call() throws JMSException {
38-
getDelegate().close();
39-
return null;
40-
}
41-
};
42-
43-
44-
@Override
45-
public final Observable<Void> shutdown() {
46-
return this.worker.shutdown(this.closeTask, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
47-
}
4831

49-
protected SessionWrapper(@Nonnull final javax.jms.Session delegate) {
32+
protected SessionWrapper(@Nonnull final Session.Ordering ordering, @Nonnull final javax.jms.Session delegate) {
33+
super(ordering);
5034
this.delegate = Objects.requireNonNull(delegate);
5135
}
5236

53-
protected final Observable<Void> submit(final Callable<Void> task) {
54-
return this.worker.submit(task);
37+
protected final @Nonnull SessionWorker worker() {
38+
return getWorker();
5539
}
5640

5741
protected final @Nonnull javax.jms.Session getDelegate() {
5842
return this.delegate;
5943
}
6044

45+
@Override
46+
protected void doShutdown() throws JMSException {
47+
getDelegate().close();
48+
}
49+
6150
}

src/main/java/crud/jms/TransactedJmsSession.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,14 @@
2626
/*package*/ final class TransactedJmsSession extends SessionWrapper implements TransactedSession {
2727

2828
public TransactedJmsSession(final javax.jms.Session delegate) {
29-
super(delegate);
29+
super(Session.Ordering.TRANSACTED, delegate);
3030
// Assumed, but illegal to check in this thread:
3131
//assert getDelegate().getTransacted();
3232
}
3333

34-
@Override
35-
public Session.Ordering getOrdering() {
36-
return Session.Ordering.TRANSACTED;
37-
}
38-
3934
@Override
4035
public Observable<Void> commit() {
41-
return submit(new Callable<Void>() {
36+
return getWorker().submit(new Callable<Void>() {
4237
@Override
4338
public Void call() throws JMSException {
4439
getDelegate().commit();
@@ -49,7 +44,7 @@ public Void call() throws JMSException {
4944

5045
@Override
5146
public Observable<Void> rollback() {
52-
return submit(new Callable<Void>() {
47+
return getWorker().submit(new Callable<Void>() {
5348
@Override
5449
public Void call() throws JMSException {
5550
getDelegate().rollback();

0 commit comments

Comments
 (0)