Open
Description
version:
lastest
JVM version (java -version):
1.8
Description of the problem including expected versus actual behavior:
expected behavior
consumer function is called when the batchSize is reached or the linger time passed
actual behavior
consumer function is only callen when linger time passed
code to reproduce
public static void main(String[] args) {
ScheduledExecutorService threadPoolExecutorService = new ScheduledThreadPoolExecutor(1);
ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorService() {
@NotNull
@Override
public ScheduledFuture<?> schedule(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
return threadPoolExecutorService.schedule(command, delay, unit);
}
@NotNull
@Override
public <V> ScheduledFuture<V> schedule(@NotNull Callable<V> callable, long delay, @NotNull TimeUnit unit) {
return threadPoolExecutorService.schedule(callable, delay, unit);
}
@NotNull
@Override
public ScheduledFuture<?> scheduleAtFixedRate(@NotNull Runnable command, long initialDelay, long period, @NotNull TimeUnit unit) {
return threadPoolExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@NotNull
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(@NotNull Runnable command, long initialDelay, long delay, @NotNull TimeUnit unit) {
return null;
}
@Override
public void shutdown() {
}
@NotNull
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return false;
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
return null;
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
return null;
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
return null;
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return null;
}
@NotNull
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(@NotNull Runnable command) {
command.run();
}
};
BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.batchSize(20).linger(3000, TimeUnit.SECONDS)
.setConsumerEx(t -> {
System.out.println(Arrays.toString(t.toArray()));
}).setScheduleExecutorService(scheduledExecutorService).build();
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 10; i++) {
bufferTrigger.enqueue(String.valueOf(j * 100 + i));
}
System.out.println("pending:" + bufferTrigger.getPendingChanges());
}
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.HOURS);
}
Related code
change the running.set(true); before scheduledExecutorService.execute?
Metadata
Metadata
Assignees
Labels
No labels