Skip to content

Bulkhead(Executor) does not always release permits #393

Open
@Techdaan

Description

@Techdaan

I believe the root cause is how the FutureLinkedList is used in BulkheadImpl.

When trying to acquire a permit from the BulkheadImpl with a maxWaitTime, if there are no permits immediately available, a new Future will be added to the FutureLinkedList. When that future is completed, the Future is removed from the FutureLinkedList. If a permit is immediately available, a completed future is returned.

The returned future is completed in BulkheadImpl#releasePermit, so unless BulkheadImpl#releasePermit is called, the future is never removed from the FutureLinkedList. This causes a memory leak (The node is never removed from the list), and over time new permits cannot be issued.

This is a big problem when using a bulkhead in a FailsafeExecutor. Permits are acquired with a maxWaitTime in BulkheadExecutor#preExecute. If you submit more work to the executor than there are permits, the permits rejected due to BulkheadFullException are never released. When that happens, the bulkhead enters an irrecoverable, broken state. The permits are never released because in this case neither onSuccess or onFailure is called in BulkheadExecutor.

I wrote a small example showcasing how this bug can be triggered.

public static void main(String[] args) {
    Bulkhead<Object> bulkhead = Bulkhead.builder(1) // Max concurrency of 1.
            .withMaxWaitTime(Duration.ZERO)
            .build();
    FailsafeExecutor<Object> failsafe = Failsafe.with(bulkhead);

    AtomicInteger numBulkheadFullErrors = new AtomicInteger();
    List<Thread> threads = new ArrayList<>();
    for (int i = 0; i < 3; i++) { // Create 3 threads submitting work to the bulkhead
        threads.add(new Thread(() -> {
            for (int j = 0; j < 10; j++) {
                try {
                    failsafe.get(() -> null); // Submit work to the bulkhead
                } catch (BulkheadFullException e) {
                    numBulkheadFullErrors.incrementAndGet();
                }
            }
        }));
    }

    // Start and join the threads
    threads.forEach(Thread::start);
    threads.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    // Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above.
    try {
        Thread.sleep(250);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    // Expected behaviour: Program outputs "Success" with a few errors.
    // Actual behaviour: Program outputs "Error: BulkheadFullException" with many errors (29 on my local machine).
    try {
        System.out.println("Success: " + failsafe.get(() -> true) + ", total during looping: " + numBulkheadFullErrors.get());
    } catch (BulkheadFullException e) {
        System.out.println("Error: BulkheadFullException, total during looping: " + numBulkheadFullErrors.get());
    }
}

Note that this bug can also be replicated using code found in the documentation, if a maxWaitTime is provided to the tryAcquirePermit function. Internally, tryAcquirePermit may create a future, but this future is never released (because tryAcquirePermit returned false).

if (bulkhead.tryAcquirePermit()) { 
  try {
    doSomething();
  } finally {
    bulkhead.releasePermit();
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions