Skip to content

Commit bc78281

Browse files
committed
Reorganize FallbackExecutor supplyAsync and unify cancelFn
1 parent 9444335 commit bc78281

File tree

1 file changed

+35
-30
lines changed

1 file changed

+35
-30
lines changed

src/main/java/net/jodah/failsafe/FallbackExecutor.java

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -66,45 +66,50 @@ protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, S
6666
protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
6767
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, FailsafeFuture<Object> future) {
6868
return () -> supplier.get().thenCompose(result -> {
69-
if (result == null)
69+
if (result == null || future.isDone())
7070
return ExecutionResult.NULL_FUTURE;
71+
if (executionCancelled())
72+
return CompletableFuture.completedFuture(result);
73+
if (!isFailure(result))
74+
return postExecuteAsync(result, scheduler, future);
7175

7276
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
73-
if (executionCancelled()) {
74-
promise.complete(result);
75-
return promise;
76-
}
77-
78-
if (isFailure(result)) {
79-
Callable<Object> callable = () -> {
80-
try {
81-
CompletableFuture<Object> fallback = policy.applyStage(result.getResult(), result.getFailure(),
82-
execution.copy());
83-
fallback.whenComplete((innerResult, failure) -> {
84-
if (failure instanceof CompletionException)
85-
failure = failure.getCause();
86-
ExecutionResult r = failure == null ? result.withResult(innerResult) : ExecutionResult.failure(failure);
87-
promise.complete(r);
88-
});
89-
} catch (Throwable t) {
90-
promise.complete(ExecutionResult.failure(t));
91-
}
92-
return null;
93-
};
94-
77+
Callable<Object> callable = () -> {
9578
try {
96-
if (!policy.isAsync())
97-
callable.call();
98-
else
99-
future.injectPolicy(scheduler.schedule(callable, result.getWaitNanos(), TimeUnit.NANOSECONDS));
79+
CompletableFuture<Object> fallback = policy.applyStage(result.getResult(), result.getFailure(),
80+
execution.copy());
81+
fallback.whenComplete((innerResult, failure) -> {
82+
if (failure instanceof CompletionException)
83+
failure = failure.getCause();
84+
ExecutionResult r = failure == null ? result.withResult(innerResult) : ExecutionResult.failure(failure);
85+
promise.complete(r);
86+
});
10087
} catch (Throwable t) {
101-
promise.completeExceptionally(t);
88+
promise.complete(ExecutionResult.failure(t));
10289
}
90+
return null;
91+
};
92+
93+
try {
94+
if (!policy.isAsync())
95+
callable.call();
96+
else {
97+
Future<?> scheduledFallback = scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS);
10398

104-
return promise.thenCompose(ss -> postExecuteAsync(ss, scheduler, future));
99+
// Propagate cancellation to the scheduled retry and promise
100+
future.injectCancelFn(() -> {
101+
System.out.println("cancelling scheduled fallback isdone: " + scheduledFallback.isDone());
102+
scheduledFallback.cancel(false);
103+
if (executionCancelled())
104+
promise.complete(null);
105+
});
106+
}
107+
} catch (Throwable t) {
108+
// Hard scheduling failure
109+
promise.completeExceptionally(t);
105110
}
106111

107-
return postExecuteAsync(result, scheduler, future);
112+
return promise.thenCompose(ss -> postExecuteAsync(ss, scheduler, future));
108113
});
109114
}
110115

0 commit comments

Comments
 (0)