Skip to content

Commit 4410928

Browse files
authored
feat(common): add AutoCloseable for buffer trigger. (PhantomThief#20)
1 parent c8370a7 commit 4410928

File tree

9 files changed

+240
-3
lines changed

9 files changed

+240
-3
lines changed

src/main/java/com/github/phantomthief/collection/BufferTrigger.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
/**
1111
* @author w.vela
1212
*/
13-
public interface BufferTrigger<E> {
13+
public interface BufferTrigger<E> extends AutoCloseable {
1414

15+
/**
16+
* @throws IllegalStateException if the instance has been shutdown.
17+
*/
1518
void enqueue(E element);
1619

1720
void manuallyDoTrigger();
@@ -48,4 +51,7 @@ static <E> GenericBatchConsumerTriggerBuilder<E> batchBlocking() {
4851
static BatchConsumerTriggerBuilder<Object> batchBlockingTrigger() {
4952
return BatchConsumeBlockingQueueTrigger.newBuilder();
5053
}
54+
55+
@Override
56+
void close(); // override to remove throws Exception.
5157
}

src/main/java/com/github/phantomthief/collection/impl/BatchConsumeBlockingQueueTrigger.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
import static com.github.phantomthief.concurrent.MoreFutures.scheduleWithDynamicDelay;
44
import static com.github.phantomthief.util.MoreLocks.runWithLock;
55
import static com.github.phantomthief.util.MoreLocks.runWithTryLock;
6+
import static com.google.common.base.Preconditions.checkState;
7+
import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
68
import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
79
import static java.lang.Integer.max;
810
import static java.lang.Math.min;
11+
import static java.util.concurrent.TimeUnit.DAYS;
912
import static org.slf4j.LoggerFactory.getLogger;
1013

1114
import java.time.Duration;
1215
import java.util.ArrayList;
1316
import java.util.List;
1417
import java.util.concurrent.BlockingQueue;
18+
import java.util.concurrent.Future;
1519
import java.util.concurrent.LinkedBlockingQueue;
1620
import java.util.concurrent.ScheduledExecutorService;
1721
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +42,9 @@ public class BatchConsumeBlockingQueueTrigger<E> implements BufferTrigger<E> {
3842
private final ScheduledExecutorService scheduledExecutorService;
3943
private final ReentrantLock lock = new ReentrantLock();
4044
private final AtomicBoolean running = new AtomicBoolean();
45+
private final Runnable shutdownExecutor;
46+
47+
private volatile boolean shutdown;
4148

4249
BatchConsumeBlockingQueueTrigger(BatchConsumerTriggerBuilder<E> builder) {
4350
Supplier<Duration> linger = builder.linger;
@@ -46,7 +53,13 @@ public class BatchConsumeBlockingQueueTrigger<E> implements BufferTrigger<E> {
4653
this.consumer = builder.consumer;
4754
this.exceptionHandler = builder.exceptionHandler;
4855
this.scheduledExecutorService = builder.scheduledExecutorService;
49-
scheduleWithDynamicDelay(scheduledExecutorService, linger, () -> doBatchConsumer(TriggerType.LINGER));
56+
Future<?> future = scheduleWithDynamicDelay(scheduledExecutorService, linger, () -> doBatchConsumer(TriggerType.LINGER));
57+
this.shutdownExecutor = () -> {
58+
future.cancel(false);
59+
if (builder.usingInnerExecutor) {
60+
shutdownAndAwaitTermination(builder.scheduledExecutorService, 1, DAYS);
61+
}
62+
};
5063
}
5164

5265
/**
@@ -60,6 +73,8 @@ public static BatchConsumerTriggerBuilder<Object> newBuilder() {
6073

6174
@Override
6275
public void enqueue(E element) {
76+
checkState(!shutdown, "buffer trigger was shutdown.");
77+
6378
putUninterruptibly(queue, element);
6479
tryTrigBatchConsume();
6580
}
@@ -135,6 +150,16 @@ public long getPendingChanges() {
135150
return queue.size();
136151
}
137152

153+
@Override
154+
public void close() {
155+
shutdown = true;
156+
try {
157+
manuallyDoTrigger();
158+
} finally {
159+
shutdownExecutor.run();
160+
}
161+
}
162+
138163
/**
139164
* just for log and debug
140165
*/

src/main/java/com/github/phantomthief/collection/impl/BatchConsumerTriggerBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ public final class BatchConsumerTriggerBuilder<E> {
2626
private static final Duration DEFAULT_LINGER = ofSeconds(1);
2727

2828
ScheduledExecutorService scheduledExecutorService;
29+
boolean usingInnerExecutor;
2930
Supplier<Duration> linger;
3031
int batchSize;
3132
int bufferSize;
3233
ThrowableConsumer<List<E>, Exception> consumer;
3334
BiConsumer<Throwable, List<E>> exceptionHandler;
3435

36+
/**
37+
* If you create own ScheduledExecutorService, then you have to shutdown it yourself.
38+
*/
3539
public BatchConsumerTriggerBuilder<E>
3640
setScheduleExecutorService(ScheduledExecutorService scheduledExecutorService) {
3741
this.scheduledExecutorService = scheduledExecutorService;
@@ -135,6 +139,7 @@ private void ensure() {
135139
}
136140
if (scheduledExecutorService == null) {
137141
scheduledExecutorService = makeScheduleExecutor();
142+
usingInnerExecutor = true;
138143
}
139144
}
140145

src/main/java/com/github/phantomthief/collection/impl/GenericBatchConsumerTriggerBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public GenericBatchConsumerTriggerBuilder<E> forceConsumeEveryTick() {
2929
return this;
3030
}
3131

32+
/**
33+
* If you create own ScheduledExecutorService, then you have to shutdown it yourself.
34+
*/
3235
public GenericBatchConsumerTriggerBuilder<E>
3336
setScheduleExecutorService(ScheduledExecutorService scheduledExecutorService) {
3437
builder.setScheduleExecutorService(scheduledExecutorService);

src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public GenericSimpleBufferTriggerBuilder<E, C> setContainerEx(Supplier<? extends
5454
return this;
5555
}
5656

57+
/**
58+
* If you create own ScheduledExecutorService, then you have to shutdown it yourself.
59+
*/
5760
@CheckReturnValue
5861
public GenericSimpleBufferTriggerBuilder<E, C>
5962
setScheduleExecutorService(ScheduledExecutorService scheduledExecutorService) {

src/main/java/com/github/phantomthief/collection/impl/LazyBufferTrigger.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@ public void manuallyDoTrigger() {
3232
public long getPendingChanges() {
3333
return this.factory.map(BufferTrigger::getPendingChanges).orElse(0L);
3434
}
35+
36+
@Override
37+
public void close() {
38+
this.factory.ifPresent(BufferTrigger::close);
39+
}
3540
}

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.github.phantomthief.collection.impl;
22

3+
import static com.google.common.base.Preconditions.checkState;
34
import static com.google.common.base.Throwables.throwIfUnchecked;
5+
import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
46
import static java.lang.System.currentTimeMillis;
57
import static java.util.concurrent.TimeUnit.DAYS;
68
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -46,7 +48,9 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
4648
private final ReadLock readLock;
4749
private final WriteLock writeLock;
4850
private final Condition writeCondition;
51+
private final Runnable shutdownExecutor;
4952

53+
private volatile boolean shutdown;
5054
private volatile long lastConsumeTimestamp = currentTimeMillis();
5155

5256
SimpleBufferTrigger(SimpleBufferTriggerBuilder<E, C> builder) {
@@ -70,6 +74,11 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
7074
builder.scheduledExecutorService.schedule(
7175
new TriggerRunnable(builder.scheduledExecutorService, builder.triggerStrategy),
7276
DEFAULT_NEXT_TRIGGER_PERIOD, MILLISECONDS);
77+
this.shutdownExecutor = () -> {
78+
if (builder.usingInnerExecutor) {
79+
shutdownAndAwaitTermination(builder.scheduledExecutorService, 1, DAYS);
80+
}
81+
};
7382
}
7483

7584
/**
@@ -99,6 +108,8 @@ public static SimpleBufferTriggerBuilder<Object, Map<Object, Integer>> newCounte
99108

100109
@Override
101110
public void enqueue(E element) {
111+
checkState(!shutdown, "buffer trigger was shutdown.");
112+
102113
long currentCount = counter.get();
103114
long thisMaxBufferCount = maxBufferCount.getAsLong();
104115
if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
@@ -195,6 +206,16 @@ public long getPendingChanges() {
195206
return counter.get();
196207
}
197208

209+
@Override
210+
public void close() {
211+
shutdown = true;
212+
try {
213+
manuallyDoTrigger();
214+
} finally {
215+
shutdownExecutor.run();
216+
}
217+
}
218+
198219
public interface TriggerStrategy {
199220

200221
TriggerResult canTrigger(long lastConsumeTimestamp, long changedCount);
@@ -249,7 +270,9 @@ public void run() {
249270
logger.error("", e);
250271
}
251272
nextTrigPeriod = Math.max(0, nextTrigPeriod);
252-
scheduledExecutorService.schedule(this, nextTrigPeriod, MILLISECONDS);
273+
if (!shutdown) {
274+
scheduledExecutorService.schedule(this, nextTrigPeriod, MILLISECONDS);
275+
}
253276
}
254277
}
255278
}

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class SimpleBufferTriggerBuilder<E, C> {
3838

3939
TriggerStrategy triggerStrategy;
4040
ScheduledExecutorService scheduledExecutorService;
41+
boolean usingInnerExecutor;
4142
Supplier<C> bufferFactory;
4243
ToIntBiFunction<C, E> queueAdder;
4344
ThrowableConsumer<C, Throwable> consumer;
@@ -80,6 +81,9 @@ public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> setContainerEx(
8081
return thisBuilder;
8182
}
8283

84+
/**
85+
* If you create own ScheduledExecutorService, then you have to shutdown it yourself.
86+
*/
8387
public SimpleBufferTriggerBuilder<E, C>
8488
setScheduleExecutorService(ScheduledExecutorService scheduledExecutorService) {
8589
this.scheduledExecutorService = scheduledExecutorService;
@@ -261,6 +265,7 @@ private void ensure() {
261265
}
262266
if (scheduledExecutorService == null) {
263267
scheduledExecutorService = makeScheduleExecutor();
268+
usingInnerExecutor = true;
264269
}
265270
}
266271

0 commit comments

Comments
 (0)