Open
Description
可以稳定复现的场景是这样的:
public class TestTrigger {
private AtomicLong enqueueCount = new AtomicLong();
private AtomicLong consumeCount = new AtomicLong();
private AtomicLong rejectCount = new AtomicLong();
private BufferTrigger<String> buffer = BufferTrigger.<String, Queue<String>>simple()
.name("test-trigger")
.setContainer(ConcurrentLinkedQueue::new, Queue::add)
.maxBufferCount(1000)
.interval(1, TimeUnit.SECONDS)
.consumer(this::doBatchReload)
.rejectHandler(this::onTaskRejected)
.build();
private void doBatchReload(Iterable<String> values) {
consumeCount.addAndGet(Iterables.size(values));
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1000));
}
private void onTaskRejected(String value) {
rejectCount.addAndGet(1);
}
private void test() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 1000000; i++) {
executor.submit(() -> {
enqueueCount.getAndAdd(1);
buffer.enqueue("test");
});
if (i % 353 == 0) {
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
}
}
executor.shutdown();
boolean finished = executor.awaitTermination(30, TimeUnit.SECONDS);
System.out.println(finished);
buffer.manuallyDoTrigger();
System.out.printf("enqueued: %d\n", enqueueCount.get());
System.out.printf("handled: %d + %d = %d\n", consumeCount.get(), rejectCount.get(), consumeCount.get() + rejectCount.get());
}
public static void main(String[] args) throws InterruptedException {
TestTrigger test = new TestTrigger();
test.test();
}
}
结果是:
true
enqueued: 1000000
handled: 150023 + 849973 = 999996
Metadata
Metadata
Assignees
Labels
No labels