Skip to content

Commit d3979b4

Browse files
committed
Merge pull request Beh01der#1 from maximdim/master
Refactoring synchronized/notify constructs to CountdownLatch
2 parents 9f882b7 + daf30b4 commit d3979b4

File tree

4 files changed

+120
-21
lines changed

4 files changed

+120
-21
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/target
2+
/.classpath
3+
/.project
4+
/.settings

src/main/java/au/com/ds/ef/EasyFlow.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
public class EasyFlow<C extends StatefulContext> {
12-
private static Logger log = LoggerFactory.getLogger(EasyFlow.class);
12+
static Logger log = LoggerFactory.getLogger(EasyFlow.class);
1313
protected State<C> startState;
1414
private C context;
1515
private Executor executor;
@@ -63,7 +63,6 @@ public void start(final C context) {
6363
}
6464
}
6565

66-
@SuppressWarnings("unchecked")
6766
protected void setCurrentState(final State<C> state, final C context) {
6867
execute(new Runnable() {
6968
@Override
@@ -176,11 +175,7 @@ protected void callOnFinalState(final State<C> state, final C context) {
176175
log.debug("when final state {} for {} >>>", state, context);
177176
}
178177

179-
synchronized (context) {
180-
callOnTerminate(context);
181-
context.notifyAll();
182-
}
183-
178+
callOnTerminate(context);
184179
} catch (Exception e) {
185180
callOnError(new ExecutionError(state, null, e,
186181
"Execution Error in [EasyFlow.whenFinalState] handler", context));
@@ -202,13 +197,7 @@ public void waitForCompletion() {
202197
}
203198

204199
public void waitForCompletion(C context) {
205-
try {
206-
synchronized (context) {
207-
context.wait();
208-
}
209-
} catch (InterruptedException e) {
210-
log.error("Error", e);
211-
}
200+
context.awaitTermination();
212201
}
213202

214203
public EasyFlow<C> executor(Executor executor) {
@@ -229,11 +218,7 @@ protected void callOnError(final ExecutionError error) {
229218
if (onError != null) {
230219
onError.call(error);
231220
}
232-
233-
synchronized (error.getContext()) {
234-
callOnTerminate((C) error.getContext());
235-
error.getContext().notifyAll();
236-
}
221+
callOnTerminate((C) error.getContext());
237222
}
238223

239224
protected void callOnTerminate(final C context) {

src/main/java/au/com/ds/ef/StatefulContext.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
package au.com.ds.ef;
22

33
import java.io.Serializable;
4+
import java.util.concurrent.CountDownLatch;
45
import java.util.concurrent.atomic.AtomicBoolean;
56

67
@SuppressWarnings("rawtypes")
78
public class StatefulContext implements Serializable {
89
private static final long serialVersionUID = 2324535129909715649L;
910
private static long idCounter = 1;
1011

11-
private String id;
12+
private final String id;
1213
private State state;
13-
private AtomicBoolean terminated = new AtomicBoolean(false);
14+
private final AtomicBoolean terminated = new AtomicBoolean(false);
15+
private final CountDownLatch completionLatch = new CountDownLatch(1);
1416

1517
public StatefulContext() {
1618
id = newId() + ":" + getClass().getSimpleName();
@@ -69,8 +71,21 @@ public boolean isStarted() {
6971

7072
protected void setTerminated() {
7173
this.terminated.set(true);
74+
this.completionLatch.countDown();
7275
}
7376

77+
/**
78+
* Block current thread until Context terminated
79+
*/
80+
protected void awaitTermination() {
81+
try {
82+
this.completionLatch.await();
83+
}
84+
catch (InterruptedException e) {
85+
Thread.interrupted();
86+
}
87+
}
88+
7489
@Override
7590
public String toString() {
7691
return id;
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package au.com.ds.ef;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
6+
import au.com.ds.ef.call.StateHandler;
7+
8+
/**
9+
* With original synchronize()/notifyAll() approach this test never finishes
10+
* (thread(s) still waiting on notification)
11+
*/
12+
public class SynchronizationTest {
13+
void doRun() {
14+
ExecutorService threadPool = Executors.newCachedThreadPool();
15+
for(int i = 0; i < 5; i++) {
16+
Node n = new Node();
17+
threadPool.execute(n);
18+
}
19+
threadPool.shutdown();
20+
}
21+
22+
public static void main(String[] args) {
23+
new SynchronizationTest().doRun();
24+
}
25+
26+
static class Node implements Runnable {
27+
final Event<StatefulContext> onInitialize = FlowBuilder.event("onInitialize");
28+
final Event<StatefulContext> onTerminate = FlowBuilder.event("onTerminate");
29+
30+
final EasyFlow<StatefulContext> flow;
31+
32+
Node() {
33+
final State<StatefulContext> uninitialized = FlowBuilder.state("unitialized");
34+
final State<StatefulContext> running = FlowBuilder.state("running");
35+
final State<StatefulContext> done = FlowBuilder.state("done");
36+
37+
uninitialized.whenEnter(new StateHandler<StatefulContext>() {
38+
@Override
39+
public void call(State<StatefulContext> state, StatefulContext context) throws Exception {
40+
System.out.println(getThreadName()+" unitialized:Enter");
41+
unitializedEnter(state, context);
42+
}
43+
});
44+
running.whenEnter(new StateHandler<StatefulContext>() {
45+
@Override
46+
public void call(State<StatefulContext> state, StatefulContext context) throws Exception {
47+
System.out.println(getThreadName()+" runnig:Enter");
48+
runningEnter(state, context);
49+
}
50+
});
51+
52+
this.flow = FlowBuilder.from(uninitialized)
53+
.transit(onInitialize.to(running).transit(onTerminate.finish(done)));
54+
this.flow.executor(new SyncExecutor());
55+
56+
flow.whenFinalState(new StateHandler<StatefulContext>() {
57+
@Override
58+
public void call(State<StatefulContext> state, StatefulContext context) throws Exception {
59+
System.out.println(getThreadName()+" final");
60+
}
61+
});
62+
}
63+
64+
void unitializedEnter(State<StatefulContext> state, StatefulContext context) {
65+
onInitialize.trigger(context);
66+
}
67+
68+
void runningEnter(State<StatefulContext> state, StatefulContext context) {
69+
for(int i = 0; i < 10; i++) {
70+
System.out.println(getThreadName()+" running");
71+
sleep(1000);
72+
}
73+
onTerminate.trigger(context);
74+
}
75+
76+
@Override
77+
public void run() {
78+
flow.validate().trace().start(new StatefulContext());
79+
flow.waitForCompletion();
80+
System.out.println("Run method completed");
81+
}
82+
}
83+
84+
static String getThreadName() {
85+
return Thread.currentThread().getName();
86+
}
87+
88+
static void sleep(long ms) {
89+
try {
90+
Thread.sleep(ms);
91+
} catch (InterruptedException e) {
92+
Thread.interrupted();
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)