Skip to content

Commit 29e217d

Browse files
authored
feat(simpleBufferTrigger): add listener for back-pressure. (PhantomThief#19)
1 parent f215fd3 commit 29e217d

File tree

5 files changed

+58
-2
lines changed

5 files changed

+58
-2
lines changed

src/main/java/com/github/phantomthief/collection/impl/BackPressureHandler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,33 @@
44

55
import javax.annotation.Nullable;
66

7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
710
/**
811
* @author w.vela
912
* Created on 2019-07-30.
1013
*/
1114
class BackPressureHandler<T> implements RejectHandler<T> {
1215

16+
private static final Logger logger = LoggerFactory.getLogger(BackPressureHandler.class);
17+
18+
@Nullable
19+
private final BackPressureListener<T> listener;
20+
21+
BackPressureHandler(BackPressureListener<T> listener) {
22+
this.listener = listener;
23+
}
24+
1325
@Override
1426
public boolean onReject(T element, @Nullable Condition condition) {
27+
if (listener != null) {
28+
try {
29+
listener.onHandle(element);
30+
} catch (Throwable e) {
31+
logger.error("", e);
32+
}
33+
}
1534
assert condition != null;
1635
condition.awaitUninterruptibly();
1736
return true;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.github.phantomthief.collection.impl;
2+
3+
/**
4+
* @author w.vela
5+
* Created on 2020-06-08.
6+
*/
7+
public interface BackPressureListener<T> {
8+
9+
void onHandle(T element);
10+
}

src/main/java/com/github/phantomthief/collection/impl/GenericSimpleBufferTriggerBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,19 @@ public GenericSimpleBufferTriggerBuilder<E, C> enableBackPressure() {
173173
return this;
174174
}
175175

176+
/**
177+
* 开启背压(back-pressure)能力
178+
* 注意,当开启背压时,需要配合 {@link #maxBufferCount(long)}
179+
* 并且不要设置 {@link #rejectHandler}
180+
*
181+
* 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行
182+
*/
183+
@CheckReturnValue
184+
public GenericSimpleBufferTriggerBuilder<E, C> enableBackPressure(BackPressureListener<E> listener) {
185+
builder.enableBackPressure(listener);
186+
return this;
187+
}
188+
176189
/**
177190
* use for debug and stats, like trigger thread's name.
178191
*/

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,22 @@ public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> rejectHandler(Consumer<? supe
186186
* 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行
187187
*/
188188
public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> enableBackPressure() {
189+
return enableBackPressure(null);
190+
}
191+
192+
/**
193+
* 开启背压(back-pressure)能力
194+
* 注意,当开启背压时,需要配合 {@link #maxBufferCount(long)}
195+
* 并且不要设置 {@link #rejectHandler}
196+
*
197+
* 当buffer达到最大值时,会阻塞入队线程,直到消费完当前buffer后再继续执行
198+
*/
199+
public <E1, C1> SimpleBufferTriggerBuilder<E1, C1> enableBackPressure(BackPressureListener<E1> listener) {
189200
if (this.rejectHandler != null) {
190201
throw new IllegalStateException("cannot enable back-pressure while reject handler was set.");
191202
}
192203
SimpleBufferTriggerBuilder<E1, C1> thisBuilder = (SimpleBufferTriggerBuilder<E1, C1>) this;
193-
thisBuilder.rejectHandler = new BackPressureHandler<>();
204+
thisBuilder.rejectHandler = new BackPressureHandler<>(listener);
194205
return thisBuilder;
195206
}
196207

src/test/java/com/github/phantomthief/collection/impl/BackPressureTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.junit.jupiter.api.Assertions.assertTrue;
1111

1212
import java.util.ArrayList;
13+
import java.util.Collections;
1314
import java.util.List;
1415
import java.util.concurrent.ExecutorService;
1516

@@ -30,8 +31,9 @@ class BackPressureTest {
3031
@Test
3132
void test() {
3233
List<String> consumed = new ArrayList<>();
34+
List<String> backPressured = Collections.synchronizedList(new ArrayList<>());
3335
BufferTrigger<String> buffer = BufferTrigger.<String, List<String>> simple()
34-
.enableBackPressure()
36+
.enableBackPressure(backPressured::add)
3537
.maxBufferCount(10)
3638
.interval(1, SECONDS)
3739
.setContainer(() -> synchronizedList(new ArrayList<>()), List::add)
@@ -52,6 +54,7 @@ void test() {
5254
});
5355
}
5456
shutdownAndAwaitTermination(executor, 1, DAYS);
57+
assertTrue(backPressured.size() > 10);
5558
buffer.manuallyDoTrigger();
5659
assertEquals(30, consumed.size());
5760
cost = System.currentTimeMillis() - cost;

0 commit comments

Comments
 (0)