1+ package com .iluwatar .promise ;
2+
3+ import java .util .concurrent .Callable ;
4+ import java .util .concurrent .ExecutionException ;
5+ import java .util .concurrent .atomic .AtomicInteger ;
6+ import java .util .function .Consumer ;
7+ import java .util .function .Function ;
8+
9+ public class ThreadAsyncExecutor implements PromiseAsyncExecutor {
10+
11+ /** Index for thread naming */
12+ private final AtomicInteger idx = new AtomicInteger (0 );
13+
14+ @ Override
15+ public <T > ListenableAsyncResult <T > execute (Callable <T > task ) {
16+ Promise <T > promise = new Promise <>();
17+ new Thread (() -> {
18+ try {
19+ promise .setValue (task .call ());
20+ promise .postComplete ();
21+ } catch (Exception ex ) {
22+ promise .setException (ex );
23+ }
24+ } , "executor-" + idx .incrementAndGet ()).start ();
25+ return promise ;
26+ }
27+
28+ // TODO there is scope of extending the completable future from async method invocation project. Do that.
29+ private class Promise <T > implements ListenableAsyncResult <T > {
30+
31+ static final int RUNNING = 1 ;
32+ static final int FAILED = 2 ;
33+ static final int COMPLETED = 3 ;
34+
35+ final Object lock ;
36+ volatile int state = RUNNING ;
37+ T value ;
38+ Exception exception ;
39+ Runnable fulfilmentAction ;
40+
41+ public Promise () {
42+ this .lock = new Object ();
43+ }
44+
45+ void postComplete () {
46+ fulfilmentAction .run ();
47+ }
48+
49+ /**
50+ * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
51+ * completion.
52+ *
53+ * @param value
54+ * value of the evaluated task
55+ */
56+ public void setValue (T value ) {
57+ this .value = value ;
58+ this .state = COMPLETED ;
59+ synchronized (lock ) {
60+ lock .notifyAll ();
61+ }
62+ }
63+
64+ /**
65+ * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
66+ * completion.
67+ *
68+ * @param exception
69+ * exception of the failed task
70+ */
71+ public void setException (Exception exception ) {
72+ this .exception = exception ;
73+ this .state = FAILED ;
74+ synchronized (lock ) {
75+ lock .notifyAll ();
76+ }
77+ }
78+
79+ @ Override
80+ public boolean isCompleted () {
81+ return state > RUNNING ;
82+ }
83+
84+ @ Override
85+ public T getValue () throws ExecutionException {
86+ if (state == COMPLETED ) {
87+ return value ;
88+ } else if (state == FAILED ) {
89+ throw new ExecutionException (exception );
90+ } else {
91+ throw new IllegalStateException ("Execution not completed yet" );
92+ }
93+ }
94+
95+ @ Override
96+ public void await () throws InterruptedException {
97+ synchronized (lock ) {
98+ if (!isCompleted ()) {
99+ lock .wait ();
100+ }
101+ }
102+ }
103+
104+ @ Override
105+ public ListenableAsyncResult <Void > then (Consumer <? super T > action ) {
106+ Promise <Void > dest = new Promise <>();
107+ fulfilmentAction = new ConsumeAction (this , dest , action );
108+ return dest ;
109+ }
110+
111+ @ Override
112+ public <V > ListenableAsyncResult <V > then (Function <? super T , V > func ) {
113+ Promise <V > dest = new Promise <>();
114+ fulfilmentAction = new FunctionAction <V >(this , dest , func );
115+ return dest ;
116+ }
117+
118+ @ Override
119+ public ListenableAsyncResult <T > error (Consumer <? extends Throwable > action ) {
120+ return null ;
121+ }
122+
123+ private class ConsumeAction implements Runnable {
124+
125+ private Promise <T > current ;
126+ private Promise <Void > dest ;
127+ private Consumer <? super T > action ;
128+
129+ public ConsumeAction (Promise <T > current , Promise <Void > dest , Consumer <? super T > action ) {
130+ this .current = current ;
131+ this .dest = dest ;
132+ this .action = action ;
133+ }
134+
135+ @ Override
136+ public void run () {
137+ try {
138+ action .accept (current .getValue ());
139+ dest .setValue (null );
140+ } catch (ExecutionException e ) {
141+ // TODO Auto-generated catch block
142+ e .printStackTrace ();
143+ }
144+ dest .postComplete ();
145+ }
146+ }
147+
148+ private class FunctionAction <V > implements Runnable {
149+
150+ private Promise <T > current ;
151+ private Promise <V > dest ;
152+ private Function <? super T , V > func ;
153+
154+ public FunctionAction (Promise <T > current , Promise <V > dest , Function <? super T , V > func ) {
155+ this .current = current ;
156+ this .dest = dest ;
157+ this .func = func ;
158+ }
159+
160+ @ Override
161+ public void run () {
162+ try {
163+ V result = func .apply (current .getValue ());
164+ dest .setValue (result );
165+ } catch (ExecutionException e ) {
166+ // TODO Auto-generated catch block
167+ e .printStackTrace ();
168+ }
169+ dest .postComplete ();
170+ }
171+ }
172+ }
173+ }
0 commit comments