Skip to content

Commit f1ac778

Browse files
authored
revert(backPressure): add global listener. (PhantomThief#24)
1 parent dd530d8 commit f1ac778

File tree

5 files changed

+105
-0
lines changed

5 files changed

+105
-0
lines changed

src/main/java/com/github/phantomthief/collection/impl/BackPressureHandler.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.github.phantomthief.collection.impl;
22

3+
import static java.lang.System.nanoTime;
4+
35
import java.util.concurrent.locks.Condition;
46

57
import javax.annotation.Nullable;
@@ -15,13 +17,20 @@ class BackPressureHandler<T> implements RejectHandler<T> {
1517

1618
private static final Logger logger = LoggerFactory.getLogger(BackPressureHandler.class);
1719

20+
private static GlobalBackPressureListener globalBackPressureListener = null;
21+
1822
@Nullable
1923
private final BackPressureListener<T> listener;
24+
private String name;
2025

2126
BackPressureHandler(BackPressureListener<T> listener) {
2227
this.listener = listener;
2328
}
2429

30+
void setName(String name) {
31+
this.name = name;
32+
}
33+
2534
@Override
2635
public boolean onReject(T element, @Nullable Condition condition) {
2736
if (listener != null) {
@@ -31,8 +40,28 @@ public boolean onReject(T element, @Nullable Condition condition) {
3140
logger.error("", e);
3241
}
3342
}
43+
if (globalBackPressureListener != null) {
44+
try {
45+
globalBackPressureListener.onHandle(name, element);
46+
} catch (Throwable e) {
47+
logger.error("", e);
48+
}
49+
}
3450
assert condition != null;
51+
long startNano = nanoTime();
3552
condition.awaitUninterruptibly();
53+
long blockInNano = nanoTime() - startNano;
54+
if (globalBackPressureListener != null) {
55+
try {
56+
globalBackPressureListener.postHandle(name, element, blockInNano);
57+
} catch (Throwable e) {
58+
logger.error("", e);
59+
}
60+
}
3661
return true;
3762
}
63+
64+
static void setupGlobalBackPressureListener(GlobalBackPressureListener listener) {
65+
globalBackPressureListener = listener;
66+
}
3867
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.github.phantomthief.collection.impl;
2+
3+
import javax.annotation.Nullable;
4+
5+
/**
6+
* @author w.vela
7+
* Created on 2021-02-04.
8+
*/
9+
public interface GlobalBackPressureListener {
10+
11+
void onHandle(@Nullable String name, Object element);
12+
13+
void postHandle(@Nullable String name, Object element, long blockInNano);
14+
}

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTrigger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
4343

4444
private static final long DEFAULT_NEXT_TRIGGER_PERIOD = TimeUnit.SECONDS.toMillis(1);
4545

46+
private final String name;
4647
private final AtomicLong counter = new AtomicLong();
4748
private final ThrowableConsumer<C, Throwable> consumer;
4849
private final ToIntBiFunction<C, E> queueAdder;
@@ -63,6 +64,7 @@ public class SimpleBufferTrigger<E, C> implements BufferTrigger<E> {
6364
* 使用提供的构造器创建SimpleBufferTrigger实例
6465
*/
6566
SimpleBufferTrigger(SimpleBufferTriggerBuilder<E, C> builder) {
67+
this.name = builder.name;
6668
this.queueAdder = builder.queueAdder;
6769
this.bufferFactory = builder.bufferFactory;
6870
this.consumer = builder.consumer;
@@ -328,4 +330,8 @@ public void run() {
328330
}
329331
}
330332
}
333+
334+
public static void setupGlobalBackPressure(GlobalBackPressureListener listener) {
335+
BackPressureHandler.setupGlobalBackPressureListener(listener);
336+
}
331337
}

src/main/java/com/github/phantomthief/collection/impl/SimpleBufferTriggerBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,9 @@ private void ensure() {
352352
scheduledExecutorService = makeScheduleExecutor();
353353
usingInnerExecutor = true;
354354
}
355+
if (name != null && rejectHandler instanceof BackPressureHandler) {
356+
((BackPressureHandler<E>) rejectHandler).setName(name);
357+
}
355358
}
356359

357360
private ScheduledExecutorService makeScheduleExecutor() {

src/test/java/com/github/phantomthief/collection/impl/BackPressureTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import java.util.List;
1515
import java.util.concurrent.ExecutorService;
1616

17+
import javax.annotation.Nullable;
18+
1719
import org.junit.jupiter.api.Test;
1820
import org.slf4j.Logger;
1921
import org.slf4j.LoggerFactory;
2022

2123
import com.github.phantomthief.collection.BufferTrigger;
24+
import com.google.common.util.concurrent.AtomicLongMap;
2225

2326
/**
2427
* @author w.vela
@@ -89,4 +92,54 @@ void testNoBlock() {
8992
cost = System.currentTimeMillis() - cost;
9093
assertTrue(cost <= 1200);
9194
}
95+
96+
@Test
97+
void testGlobalHandler() {
98+
AtomicLongMap<String> onHandle = AtomicLongMap.create();
99+
AtomicLongMap<String> postHandle = AtomicLongMap.create();
100+
SimpleBufferTrigger.setupGlobalBackPressure(new GlobalBackPressureListener() {
101+
@Override
102+
public void onHandle(@Nullable String name, Object element) {
103+
onHandle.incrementAndGet(name);
104+
}
105+
106+
@Override
107+
public void postHandle(@Nullable String name, Object element, long blockInNano) {
108+
postHandle.addAndGet(name, blockInNano);
109+
}
110+
});
111+
List<String> consumed = new ArrayList<>();
112+
List<String> backPressured = Collections.synchronizedList(new ArrayList<>());
113+
String name = "test-1";
114+
BufferTrigger<String> buffer = BufferTrigger.<String, List<String>> simple()
115+
.maxBufferCount(10)
116+
.enableBackPressure(backPressured::add)
117+
.interval(1, SECONDS)
118+
.setContainer(() -> synchronizedList(new ArrayList<>()), List::add)
119+
.consumer(it -> {
120+
logger.info("do consuming...{}", it);
121+
sleepUninterruptibly(1, SECONDS);
122+
consumed.addAll(it);
123+
logger.info("consumer done.{}", it);
124+
})
125+
.name(name)
126+
.build();
127+
long cost = System.currentTimeMillis();
128+
ExecutorService executor = newFixedThreadPool(10);
129+
for (int i = 0; i < 30; i++) {
130+
int j = i;
131+
executor.execute(() -> {
132+
buffer.enqueue("" + j);
133+
logger.info("enqueued:{}", j);
134+
});
135+
}
136+
shutdownAndAwaitTermination(executor, 1, DAYS);
137+
assertTrue(backPressured.size() > 10);
138+
assertTrue(onHandle.get(name) > 10);
139+
buffer.manuallyDoTrigger();
140+
assertEquals(30, consumed.size());
141+
cost = System.currentTimeMillis() - cost;
142+
assertTrue(cost >= SECONDS.toMillis(3));
143+
assertTrue(postHandle.get(name) >= SECONDS.toNanos(3));
144+
}
92145
}

0 commit comments

Comments
 (0)