Skip to content

BatchConsumeBlockingQueueTrigger may block consume if passing a special executorService and the lingerMs is long #10

Open
@liudunxu

Description

@liudunxu

version:
lastest
JVM version (java -version):
1.8
Description of the problem including expected versus actual behavior:
expected behavior
consumer function is called when the batchSize is reached or the linger time passed
actual behavior
consumer function is only callen when linger time passed
code to reproduce

    public static void main(String[] args) {

ScheduledExecutorService threadPoolExecutorService = new ScheduledThreadPoolExecutor(1);

        ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorService() {
            @NotNull
            @Override
            public ScheduledFuture<?> schedule(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
                return threadPoolExecutorService.schedule(command, delay, unit);
            }

            @NotNull
            @Override
            public <V> ScheduledFuture<V> schedule(@NotNull Callable<V> callable, long delay, @NotNull TimeUnit unit) {
                return threadPoolExecutorService.schedule(callable, delay, unit);
            }

            @NotNull
            @Override
            public ScheduledFuture<?> scheduleAtFixedRate(@NotNull Runnable command, long initialDelay, long period, @NotNull TimeUnit unit) {
                return threadPoolExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
            }

            @NotNull
            @Override
            public ScheduledFuture<?> scheduleWithFixedDelay(@NotNull Runnable command, long initialDelay, long delay, @NotNull TimeUnit unit) {
                return null;
            }

            @Override
            public void shutdown() {

            }

            @NotNull
            @Override
            public List<Runnable> shutdownNow() {
                return null;
            }

            @Override
            public boolean isShutdown() {
                return false;
            }

            @Override
            public boolean isTerminated() {
                return false;
            }

            @Override
            public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
                return false;
            }

            @NotNull
            @Override
            public <T> Future<T> submit(@NotNull Callable<T> task) {
                return null;
            }

            @NotNull
            @Override
            public <T> Future<T> submit(@NotNull Runnable task, T result) {
                return null;
            }

            @NotNull
            @Override
            public Future<?> submit(@NotNull Runnable task) {
                return null;
            }

            @NotNull
            @Override
            public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
                return null;
            }

            @NotNull
            @Override
            public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException {
                return null;
            }

            @NotNull
            @Override
            public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
                return null;
            }

            @Override
            public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }

            @Override
            public void execute(@NotNull Runnable command) {
                command.run();
            }
        };

        BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
                .batchSize(20).linger(3000, TimeUnit.SECONDS)
                .setConsumerEx(t -> {
                    System.out.println(Arrays.toString(t.toArray()));
                }).setScheduleExecutorService(scheduledExecutorService).build();


        for (int j = 0; j < 100; j++) {
            for (int i = 0; i < 10; i++) {
                bufferTrigger.enqueue(String.valueOf(j * 100 + i));
            }
            System.out.println("pending:" + bufferTrigger.getPendingChanges());
        }

        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.HOURS);

    }

Related code
change the running.set(true); before scheduledExecutorService.execute?

if (!running.get()) { // prevent repeat enqueue
this.scheduledExecutorService.execute(this::doBatchConsumer);
running.set(true);

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions