Skip to content

Commit 2ac1d32

Browse files
authored
auto ticker (PhantomThief#4)
* remove const ticker. make interval align to start consume time.
1 parent 29be6b2 commit 2ac1d32

File tree

6 files changed

+190
-43
lines changed

6 files changed

+190
-43
lines changed

src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ public GenericSimpleBufferTriggerBuilder<E, C> setContainerEx(Supplier<? extends
5959
return this;
6060
}
6161

62-
public GenericSimpleBufferTriggerBuilder<E, C> tickTime(long time, TimeUnit unit) {
63-
builder.tickTime(time, unit);
64-
return this;
65-
}
66-
6762
public GenericSimpleBufferTriggerBuilder<E, C>
6863
triggerStrategy(TriggerStrategy triggerStrategy) {
6964
builder.triggerStrategy(triggerStrategy);

src/main/java/com/github/phantomthief/collection/impl/MultiIntervalTriggerStrategy.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package com.github.phantomthief.collection.impl;
22

3+
import static com.github.phantomthief.collection.impl.SimpleBufferTrigger.TriggerResult.trig;
34
import static com.google.common.base.Preconditions.checkArgument;
5+
import static java.lang.Math.abs;
6+
import static java.lang.Math.min;
47

58
import java.util.Map.Entry;
69
import java.util.SortedMap;
710
import java.util.TreeMap;
811
import java.util.concurrent.TimeUnit;
912

13+
import com.github.phantomthief.collection.impl.SimpleBufferTrigger.TriggerResult;
1014
import com.github.phantomthief.collection.impl.SimpleBufferTrigger.TriggerStrategy;
1115

1216
/**
@@ -21,17 +25,32 @@
2125
*/
2226
public class MultiIntervalTriggerStrategy implements TriggerStrategy {
2327

28+
private long minTriggerPeriod = Long.MAX_VALUE;
2429
private final SortedMap<Long, Long> triggerMap = new TreeMap<>();
2530

2631
public MultiIntervalTriggerStrategy on(long interval, TimeUnit unit, long count) {
27-
triggerMap.put(unit.toMillis(interval), count);
28-
checkTriggerMap();
32+
long intervalInMs = unit.toMillis(interval);
33+
triggerMap.put(intervalInMs, count);
34+
minTriggerPeriod = checkAndCalcMinPeriod();
2935
return this;
3036
}
3137

32-
private void checkTriggerMap() {
38+
long minTriggerPeriod() { // for test case
39+
return minTriggerPeriod;
40+
}
41+
42+
private long checkAndCalcMinPeriod() {
43+
long minPeriod = Long.MAX_VALUE;
3344
Long maxTrigChangeCount = null;
34-
for (Long trigChangedCount : triggerMap.values()) {
45+
long lastPeriod = 0;
46+
47+
for (Entry<Long, Long> entry : triggerMap.entrySet()) {
48+
long period = entry.getKey();
49+
minPeriod = min(minPeriod, period);
50+
if (lastPeriod > 0) {
51+
minPeriod = min(minPeriod, abs(lastPeriod - period));
52+
}
53+
long trigChangedCount = entry.getValue();
3554
if (maxTrigChangeCount == null) {
3655
maxTrigChangeCount = trigChangedCount;
3756
} else {
@@ -40,23 +59,28 @@ private void checkTriggerMap() {
4059
"found invalid trigger setting:" + triggerMap);
4160
}
4261
}
62+
lastPeriod = period;
4363
}
64+
return minPeriod;
4465
}
4566

4667
@Override
47-
public boolean canTrigger(long lastConsumeTimestamp, long changedCount) {
68+
public TriggerResult canTrigger(long lastConsumeTimestamp, long changedCount) {
4869
checkArgument(!triggerMap.isEmpty());
4970

71+
boolean doConsumer = false;
72+
5073
long now = System.currentTimeMillis();
5174

5275
for (Entry<Long, Long> entry : triggerMap.entrySet()) {
5376
if (now - lastConsumeTimestamp < entry.getKey()) {
5477
continue;
5578
}
5679
if (changedCount >= entry.getValue()) {
57-
return true;
80+
doConsumer = true;
81+
break;
5882
}
5983
}
60-
return false;
84+
return trig(doConsumer, minTriggerPeriod);
6185
}
6286
}

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.github.phantomthief.collection.impl;
55

66
import static java.lang.System.currentTimeMillis;
7+
import static java.util.concurrent.TimeUnit.DAYS;
78
import static java.util.concurrent.TimeUnit.MILLISECONDS;
89
import static java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
910
import static java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -12,6 +13,7 @@
1213
import java.util.Map;
1314
import java.util.concurrent.ConcurrentHashMap;
1415
import java.util.concurrent.ScheduledExecutorService;
16+
import java.util.concurrent.TimeUnit;
1517
import java.util.concurrent.atomic.AtomicLong;
1618
import java.util.concurrent.atomic.AtomicReference;
1719
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -32,6 +34,8 @@ public class SimpleBufferTrigger<E> implements BufferTrigger<E> {
3234

3335
private static final Logger logger = getLogger(SimpleBufferTrigger.class);
3436

37+
private static final long DEFAULT_NEXT_TRIGGER_PERIOD = TimeUnit.SECONDS.toMillis(1);
38+
3539
private final AtomicLong counter = new AtomicLong();
3640
private final ThrowableConsumer<Object, Throwable> consumer;
3741
private final ToIntBiFunction<Object, E> queueAdder;
@@ -43,11 +47,11 @@ public class SimpleBufferTrigger<E> implements BufferTrigger<E> {
4347
private final ReadLock readLock;
4448
private final WriteLock writeLock;
4549

46-
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
50+
private volatile long lastConsumeTimestamp = currentTimeMillis();
4751

4852
SimpleBufferTrigger(Supplier<Object> bufferFactory, ToIntBiFunction<Object, E> queueAdder,
4953
ScheduledExecutorService scheduledExecutorService,
50-
ThrowableConsumer<Object, Throwable> consumer, long tickTime,
54+
ThrowableConsumer<Object, Throwable> consumer,
5155
TriggerStrategy triggerStrategy, BiConsumer<Throwable, Object> exceptionHandler,
5256
long maxBufferCount, Consumer<E> rejectHandler) {
5357
this.queueAdder = queueAdder;
@@ -60,18 +64,9 @@ public class SimpleBufferTrigger<E> implements BufferTrigger<E> {
6064
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
6165
readLock = lock.readLock();
6266
writeLock = lock.writeLock();
63-
scheduledExecutorService.scheduleWithFixedDelay(() -> {
64-
synchronized (SimpleBufferTrigger.this) {
65-
try {
66-
if (triggerStrategy.canTrigger(lastConsumeTimestamp, counter.get())) {
67-
lastConsumeTimestamp = currentTimeMillis();
68-
doConsume();
69-
}
70-
} catch (Throwable e) {
71-
logger.error("", e);
72-
}
73-
}
74-
}, tickTime, tickTime, MILLISECONDS);
67+
scheduledExecutorService.schedule(
68+
new TriggerRunnable(scheduledExecutorService, triggerStrategy),
69+
DEFAULT_NEXT_TRIGGER_PERIOD, MILLISECONDS);
7570
}
7671

7772
public static SimpleBufferTriggerBuilder<Object, Object> newBuilder() {
@@ -160,6 +155,60 @@ public long getPendingChanges() {
160155

161156
public interface TriggerStrategy {
162157

163-
boolean canTrigger(long lastConsumeTimestamp, long changedCount);
158+
TriggerResult canTrigger(long lastConsumeTimestamp, long changedCount);
159+
}
160+
161+
public static class TriggerResult {
162+
163+
private static final TriggerResult EMPTY = new TriggerResult(false, DAYS.toMillis(1));
164+
private final boolean doConsumer;
165+
private final long nextPeriod;
166+
167+
private TriggerResult(boolean doConsumer, long nextPeriod) {
168+
this.doConsumer = doConsumer;
169+
this.nextPeriod = nextPeriod;
170+
}
171+
172+
public static TriggerResult trig(boolean doConsumer, long nextPeriod) {
173+
return new TriggerResult(doConsumer, nextPeriod);
174+
}
175+
176+
public static TriggerResult empty() {
177+
return EMPTY;
178+
}
179+
}
180+
181+
private class TriggerRunnable implements Runnable {
182+
183+
private final ScheduledExecutorService scheduledExecutorService;
184+
private final TriggerStrategy triggerStrategy;
185+
186+
TriggerRunnable(ScheduledExecutorService scheduledExecutorService,
187+
TriggerStrategy triggerStrategy) {
188+
this.scheduledExecutorService = scheduledExecutorService;
189+
this.triggerStrategy = triggerStrategy;
190+
}
191+
192+
@Override
193+
public void run() {
194+
synchronized (SimpleBufferTrigger.this) {
195+
long nextTrigPeriod = DEFAULT_NEXT_TRIGGER_PERIOD;
196+
try {
197+
TriggerResult triggerResult = triggerStrategy.canTrigger(lastConsumeTimestamp,
198+
counter.get());
199+
nextTrigPeriod = triggerResult.nextPeriod;
200+
long beforeConsume = currentTimeMillis();
201+
if (triggerResult.doConsumer) {
202+
lastConsumeTimestamp = beforeConsume;
203+
doConsume();
204+
}
205+
nextTrigPeriod = nextTrigPeriod - (currentTimeMillis() - beforeConsume);
206+
} catch (Throwable e) {
207+
logger.error("", e);
208+
}
209+
nextTrigPeriod = Math.max(0, nextTrigPeriod);
210+
scheduledExecutorService.schedule(this, nextTrigPeriod, MILLISECONDS);
211+
}
212+
}
164213
}
165214
}

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
*/
44
package com.github.phantomthief.collection.impl;
55

6+
import static com.github.phantomthief.collection.impl.SimpleBufferTrigger.TriggerResult.empty;
7+
import static com.github.phantomthief.collection.impl.SimpleBufferTrigger.TriggerResult.trig;
68
import static com.google.common.base.Preconditions.checkArgument;
79
import static com.google.common.base.Preconditions.checkNotNull;
810
import static java.lang.System.currentTimeMillis;
911
import static java.util.Collections.newSetFromMap;
1012
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
11-
import static java.util.concurrent.TimeUnit.SECONDS;
1213

1314
import java.util.Set;
1415
import java.util.concurrent.ConcurrentHashMap;
@@ -33,9 +34,7 @@
3334
public class SimpleBufferTriggerBuilder<E, C> {
3435

3536
private static final Logger logger = LoggerFactory.getLogger(SimpleBufferTriggerBuilder.class);
36-
private static final long DEFAULT_TICK_TIME = SECONDS.toMillis(1);
3737

38-
private long tickTime = DEFAULT_TICK_TIME;
3938
private TriggerStrategy triggerStrategy;
4039
private ScheduledExecutorService scheduledExecutorService;
4140
private Supplier<C> bufferFactory;
@@ -78,11 +77,6 @@ public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> setContainerEx(
7877
return thisBuilder;
7978
}
8079

81-
public SimpleBufferTriggerBuilder<E, C> tickTime(long time, TimeUnit unit) {
82-
this.tickTime = unit.toMillis(time);
83-
return this;
84-
}
85-
8680
public SimpleBufferTriggerBuilder<E, C>
8781
setScheduleExecutorService(ScheduledExecutorService scheduledExecutorService) {
8882
this.scheduledExecutorService = scheduledExecutorService;
@@ -120,15 +114,14 @@ public SimpleBufferTriggerBuilder<E, C> on(long interval, TimeUnit unit, long co
120114
}
121115

122116
public SimpleBufferTriggerBuilder<E, C> interval(long interval, TimeUnit unit) {
123-
if (unit.toMillis(interval) < tickTime) {
124-
tickTime(interval, unit);
125-
}
126117
return interval(() -> interval, unit);
127118
}
128119

129120
public SimpleBufferTriggerBuilder<E, C> interval(LongSupplier interval, TimeUnit unit) {
130-
this.triggerStrategy = (last, change) -> change > 0
131-
&& currentTimeMillis() - last >= unit.toMillis(interval.getAsLong());
121+
this.triggerStrategy = (last, change) -> {
122+
long intervalInMs = unit.toMillis(interval.getAsLong());
123+
return trig(change > 0 && currentTimeMillis() - last >= intervalInMs, intervalInMs);
124+
};
132125
return this;
133126
}
134127

@@ -175,19 +168,18 @@ public <E1> BufferTrigger<E1> build() {
175168
ensure();
176169
return new SimpleBufferTrigger<>((Supplier<Object>) bufferFactory,
177170
(ToIntBiFunction<Object, E1>) queueAdder, scheduledExecutorService,
178-
(ThrowableConsumer<Object, Throwable>) consumer, tickTime, triggerStrategy,
171+
(ThrowableConsumer<Object, Throwable>) consumer, triggerStrategy,
179172
(BiConsumer<Throwable, Object>) exceptionHandler, maxBufferCount,
180173
(Consumer<E1>) rejectHandler);
181174
});
182175
}
183176

184177
private void ensure() {
185178
checkNotNull(consumer);
186-
checkArgument(tickTime > 0);
187179

188180
if (triggerStrategy == null) {
189181
logger.warn("no trigger strategy found. using NO-OP trigger");
190-
triggerStrategy = (t, n) -> false;
182+
triggerStrategy = (t, n) -> empty();
191183
}
192184

193185
if (bufferFactory == null && queueAdder == null) {
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.github.phantomthief.collection.impl;
2+
3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4+
import static java.util.concurrent.TimeUnit.SECONDS;
5+
import static org.junit.Assert.assertEquals;
6+
7+
import org.junit.Test;
8+
9+
/**
10+
* @author w.vela
11+
* Created on 05/08/2016.
12+
*/
13+
public class MultiIntervalTriggerStrategyTest {
14+
15+
@Test
16+
public void test() {
17+
MultiIntervalTriggerStrategy multiIntervalTriggerStrategy = new MultiIntervalTriggerStrategy();
18+
multiIntervalTriggerStrategy.on(5, SECONDS, 10);
19+
multiIntervalTriggerStrategy.on(6, SECONDS, 8);
20+
multiIntervalTriggerStrategy.on(10, SECONDS, 3);
21+
assertEquals(multiIntervalTriggerStrategy.minTriggerPeriod(), SECONDS.toMillis(1));
22+
multiIntervalTriggerStrategy.on(500, MILLISECONDS, 999);
23+
assertEquals(multiIntervalTriggerStrategy.minTriggerPeriod(), 500);
24+
}
25+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.github.phantomthief.test;
2+
3+
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
4+
import static java.lang.System.currentTimeMillis;
5+
import static java.lang.System.out;
6+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
7+
import static java.util.concurrent.TimeUnit.SECONDS;
8+
import static org.junit.Assert.assertTrue;
9+
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
12+
import org.junit.Test;
13+
14+
import com.github.phantomthief.collection.BufferTrigger;
15+
import com.github.phantomthief.collection.impl.SimpleBufferTrigger;
16+
17+
/**
18+
* @author w.vela
19+
* Created on 09/08/2016.
20+
*/
21+
public class LongConsumerTest {
22+
23+
@Test
24+
public void testShort() {
25+
AtomicInteger counter = new AtomicInteger();
26+
BufferTrigger<Long> bufferTrigger = SimpleBufferTrigger.newBuilder() //
27+
.interval(1, SECONDS) //
28+
.consumer(set -> {
29+
sleepUninterruptibly(700, MILLISECONDS);
30+
counter.incrementAndGet();
31+
}) //
32+
.build();
33+
long now = currentTimeMillis();
34+
while (currentTimeMillis() - now < SECONDS.toMillis(20)) {
35+
bufferTrigger.enqueue(1L);
36+
sleepUninterruptibly(10, MILLISECONDS);
37+
}
38+
int count = counter.get();
39+
out.println("short consumer count:" + count);
40+
assertTrue(count >= 19 && count <= 21);
41+
}
42+
43+
@Test
44+
public void testLong() {
45+
AtomicInteger counter = new AtomicInteger();
46+
BufferTrigger<Long> bufferTrigger = SimpleBufferTrigger.newBuilder() //
47+
.interval(1, SECONDS) //
48+
.consumer(set -> {
49+
sleepUninterruptibly(2000, MILLISECONDS);
50+
counter.incrementAndGet();
51+
}) //
52+
.build();
53+
long now = currentTimeMillis();
54+
while (currentTimeMillis() - now < SECONDS.toMillis(20)) {
55+
bufferTrigger.enqueue(1L);
56+
sleepUninterruptibly(10, MILLISECONDS);
57+
}
58+
int count = counter.get();
59+
out.println("long consumer count:" + count);
60+
assertTrue(count >= 9 && count <= 11);
61+
}
62+
}

0 commit comments

Comments
 (0)