Description
I believe the root cause is how the FutureLinkedList
is used in BulkheadImpl
.
When trying to acquire a permit from the BulkheadImpl
with a maxWaitTime
, if there are no permits immediately available, a new Future
will be added to the FutureLinkedList
. When that future is completed, the Future
is removed from the FutureLinkedList
. If a permit is immediately available, a completed future is returned.
The returned future is completed in BulkheadImpl#releasePermit
, so unless BulkheadImpl#releasePermit
is called, the future is never removed from the FutureLinkedList
. This causes a memory leak (The node is never removed from the list), and over time new permits cannot be issued.
This is a big problem when using a bulkhead in a FailsafeExecutor
. Permits are acquired with a maxWaitTime
in BulkheadExecutor#preExecute
. If you submit more work to the executor than there are permits, the permits rejected due to BulkheadFullException
are never released. When that happens, the bulkhead enters an irrecoverable, broken state. The permits are never released because in this case neither onSuccess
or onFailure
is called in BulkheadExecutor
.
I wrote a small example showcasing how this bug can be triggered.
public static void main(String[] args) {
Bulkhead<Object> bulkhead = Bulkhead.builder(1) // Max concurrency of 1.
.withMaxWaitTime(Duration.ZERO)
.build();
FailsafeExecutor<Object> failsafe = Failsafe.with(bulkhead);
AtomicInteger numBulkheadFullErrors = new AtomicInteger();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; i++) { // Create 3 threads submitting work to the bulkhead
threads.add(new Thread(() -> {
for (int j = 0; j < 10; j++) {
try {
failsafe.get(() -> null); // Submit work to the bulkhead
} catch (BulkheadFullException e) {
numBulkheadFullErrors.incrementAndGet();
}
}
}));
}
// Start and join the threads
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above.
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Expected behaviour: Program outputs "Success" with a few errors.
// Actual behaviour: Program outputs "Error: BulkheadFullException" with many errors (29 on my local machine).
try {
System.out.println("Success: " + failsafe.get(() -> true) + ", total during looping: " + numBulkheadFullErrors.get());
} catch (BulkheadFullException e) {
System.out.println("Error: BulkheadFullException, total during looping: " + numBulkheadFullErrors.get());
}
}
Note that this bug can also be replicated using code found in the documentation, if a maxWaitTime
is provided to the tryAcquirePermit
function. Internally, tryAcquirePermit
may create a future, but this future is never released (because tryAcquirePermit
returned false
).
if (bulkhead.tryAcquirePermit()) {
try {
doSomething();
} finally {
bulkhead.releasePermit();
}
}