Skip to content

Commit e41381b

Browse files
committed
Change TimeoutExecutor to use internal scheduler
Changes the TimeoutExecutor to use the internal scheduler, so that when a user-provided ScheduledExecutorService is shutdown, Timeout tasks will still fire.
1 parent 5deaac0 commit e41381b

File tree

3 files changed

+134
-41
lines changed

3 files changed

+134
-41
lines changed

src/main/java/net/jodah/failsafe/TimeoutExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, S
6464

6565
try {
6666
// Schedule timeout check
67-
timeoutFuture = scheduler.schedule(() -> {
67+
timeoutFuture = Scheduler.DEFAULT.schedule(() -> {
6868
// Guard against race with execution completion
6969
if (result.compareAndSet(null, ExecutionResult.failure(new TimeoutExceededException(policy)))) {
7070
// Cancel and interrupt
@@ -114,7 +114,7 @@ protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
114114
if (!future.isDone()) {
115115
try {
116116
// Schedule timeout check
117-
timeoutFuture.set(scheduler.schedule(() -> {
117+
timeoutFuture.set(Scheduler.DEFAULT.schedule(() -> {
118118
// Guard against race with execution completion
119119
if (executionResult.compareAndSet(null,
120120
ExecutionResult.failure(new TimeoutExceededException(policy)))) {

src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -539,45 +539,6 @@ public void shouldSkipExecutionWhenCircuitOpen() {
539539
assertThrows(future::get, ExecutionException.class, CircuitBreakerOpenException.class);
540540
}
541541

542-
/**
543-
* Asserts that Failsafe handles an initial scheduling failure.
544-
*/
545-
public void shouldHandleInitialSchedulingFailure() {
546-
// Given
547-
ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
548-
executor.shutdownNow();
549-
550-
// When
551-
Future future = Failsafe.with(Fallback.of(false), new RetryPolicy<>(), new CircuitBreaker<>())
552-
.with(executor)
553-
.runAsync(() -> waiter.fail("Should not execute supplier since executor has been shutdown"));
554-
555-
assertThrows(future::get, ExecutionException.class, RejectedExecutionException.class);
556-
}
557-
558-
/**
559-
* Asserts that Failsafe handles a rpRetry scheduling failure.
560-
*/
561-
public void shouldHandleRejectedRetryExecution() throws Throwable {
562-
// Given
563-
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
564-
AtomicInteger counter = new AtomicInteger();
565-
566-
// When
567-
Future future = Failsafe.with(new RetryPolicy<>().handleResult(null).handle(Exception.class))
568-
.with(executor)
569-
.getAsync(() -> {
570-
counter.incrementAndGet();
571-
Thread.sleep(200);
572-
return null;
573-
});
574-
575-
Thread.sleep(150);
576-
executor.shutdownNow();
577-
assertThrows(future::get, ExecutionException.class, RejectedExecutionException.class);
578-
assertEquals(counter.get(), 1, "Supplier should have been executed before executor was shutdown");
579-
}
580-
581542
private void assertInterruptedExceptionOnCancel(FailsafeExecutor<Boolean> failsafe) throws Throwable {
582543
CompletableFuture<Void> future = failsafe.runAsync(() -> {
583544
try {
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package net.jodah.failsafe.functional;
2+
3+
import net.jodah.concurrentunit.Waiter;
4+
import net.jodah.failsafe.*;
5+
import org.testng.annotations.BeforeMethod;
6+
import org.testng.annotations.Test;
7+
8+
import java.time.Duration;
9+
import java.util.concurrent.*;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
12+
import static net.jodah.failsafe.Asserts.assertThrows;
13+
import static net.jodah.failsafe.Testing.runAsync;
14+
import static org.testng.Assert.assertEquals;
15+
16+
/**
17+
* Tests the handling of an executor service that is shutdown.
18+
*/
19+
@Test
20+
public class ShutdownExecutorTest {
21+
Waiter waiter;
22+
23+
@BeforeMethod
24+
protected void beforeMethod() {
25+
waiter = new Waiter();
26+
}
27+
28+
/**
29+
* Asserts that Failsafe handles an initial scheduling failure due to an executor being shutdown.
30+
*/
31+
public void shouldHandleInitialSchedulingFailure() {
32+
// Given
33+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
34+
executor.shutdownNow();
35+
36+
// When
37+
Future future = Failsafe.with(Fallback.of(false), new RetryPolicy<>(), new CircuitBreaker<>())
38+
.with(executor)
39+
.runAsync(() -> waiter.fail("Should not execute supplier since executor has been shutdown"));
40+
41+
assertThrows(future::get, ExecutionException.class, RejectedExecutionException.class);
42+
}
43+
44+
/**
45+
* Asserts that an ExecutorService shutdown() will leave current tasks running while preventing new tasks.
46+
*/
47+
public void shouldHandleShutdown() throws Throwable {
48+
// Given
49+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
50+
AtomicInteger counter = new AtomicInteger();
51+
52+
// When
53+
Future future = Failsafe.with(new RetryPolicy<>()).with(executor).getAsync(() -> {
54+
Thread.sleep(200);
55+
counter.incrementAndGet();
56+
return "success";
57+
});
58+
59+
Thread.sleep(100);
60+
executor.shutdown();
61+
assertEquals("success", future.get());
62+
assertEquals(counter.get(), 1, "Supplier should have completed execution before executor was shutdown");
63+
64+
future = Failsafe.with(new RetryPolicy<>()).with(executor).getAsync(() -> "test");
65+
assertThrows(future::get, ExecutionException.class, RejectedExecutionException.class);
66+
}
67+
68+
/**
69+
* Asserts that an ExecutorService shutdown() will interrupt current tasks running and prevent new tasks.
70+
*/
71+
public void shouldHandleShutdownNow() throws Throwable {
72+
// Given
73+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
74+
AtomicInteger counter = new AtomicInteger();
75+
76+
// When
77+
Future future = Failsafe.with(new RetryPolicy<>()).with(executor).runAsync(() -> {
78+
Thread.sleep(200);
79+
counter.incrementAndGet();
80+
});
81+
82+
Thread.sleep(100);
83+
executor.shutdownNow();
84+
assertThrows(future::get, ExecutionException.class, RejectedExecutionException.class);
85+
assertEquals(counter.get(), 0, "Supplier should have been interrupted after executor shutdownNow");
86+
}
87+
88+
/**
89+
* Asserts that an ExecutorService shutdown() will not prevent internally scheduled Timeout tasks from cancelling a
90+
* sync execution.
91+
*/
92+
public void testShutdownDoesNotPreventTimeoutSync() throws Throwable {
93+
// Given
94+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
95+
Timeout<Object> timeout = Timeout.of(Duration.ofMillis(200)).withInterrupt(true);
96+
AtomicInteger counter = new AtomicInteger();
97+
98+
// When / then
99+
assertThrows(() -> Failsafe.with(timeout).with(executor).run(() -> {
100+
Thread.sleep(500);
101+
counter.incrementAndGet();
102+
}), TimeoutExceededException.class);
103+
runAsync(() -> {
104+
Thread.sleep(100);
105+
executor.shutdown();
106+
});
107+
assertEquals(counter.get(), 0, "Supplier should have been interrupted after Timeout");
108+
}
109+
110+
/**
111+
* Asserts that an ExecutorService shutdown() will not prevent internally scheduled Timeout tasks from cancelling an
112+
* async execution.
113+
*/
114+
public void testShutdownDoesNotPreventTimeoutAsync() throws Throwable {
115+
// Given
116+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
117+
Timeout<Object> timeout = Timeout.of(Duration.ofMillis(200)).withInterrupt(true);
118+
AtomicInteger counter = new AtomicInteger();
119+
120+
// When
121+
Future future = Failsafe.with(timeout).with(executor).runAsync(() -> {
122+
Thread.sleep(500);
123+
counter.incrementAndGet();
124+
});
125+
Thread.sleep(100);
126+
executor.shutdown();
127+
128+
// Then
129+
assertThrows(future::get, ExecutionException.class, TimeoutExceededException.class);
130+
assertEquals(counter.get(), 0, "Supplier should have been interrupted after Timeout");
131+
}
132+
}

0 commit comments

Comments
 (0)