8
8
import java .io .IOException ;
9
9
import java .nio .channels .SelectionKey ;
10
10
import java .nio .channels .Selector ;
11
- import java .util .Arrays ;
12
11
import java .util .ArrayList ;
12
+ import java .util .Arrays ;
13
13
import java .util .Collection ;
14
14
import java .util .HashMap ;
15
15
import java .util .Iterator ;
16
16
import java .util .List ;
17
+ import java .util .Map ;
17
18
import java .util .Set ;
18
19
import java .util .Timer ;
19
20
import java .util .TimerTask ;
20
- import java .util .logging .Level ;
21
- import java .util .logging .Logger ;
22
21
import java .util .concurrent .Callable ;
23
22
import java .util .concurrent .ExecutionException ;
24
23
import java .util .concurrent .Future ;
25
24
import java .util .concurrent .RejectedExecutionException ;
26
25
import java .util .concurrent .TimeUnit ;
27
26
import java .util .concurrent .TimeoutException ;
28
27
import java .util .concurrent .atomic .AtomicBoolean ;
28
+ import java .util .logging .Level ;
29
+ import java .util .logging .Logger ;
29
30
import org .gearman .common .Constants ;
30
31
import org .gearman .common .GearmanException ;
31
32
import org .gearman .common .GearmanJobServerConnection ;
32
- import org .gearman .common .GearmanPacket ;
33
33
import org .gearman .common .GearmanJobServerSession ;
34
34
import org .gearman .common .GearmanNIOJobServerConnection ;
35
- import org .gearman .common .GearmanTask ;
36
- import org .gearman .common .GearmanServerResponseHandler ;
37
- import org .gearman .common .GearmanSessionEvent ;
38
- import org .gearman .common .GearmanSessionEventHandler ;
35
+ import org .gearman .common .GearmanPacket ;
39
36
import org .gearman .common .GearmanPacketImpl ;
40
37
import org .gearman .common .GearmanPacketMagic ;
41
38
import org .gearman .common .GearmanPacketType ;
39
+ import org .gearman .common .GearmanServerResponseHandler ;
40
+ import org .gearman .common .GearmanSessionEvent ;
41
+ import org .gearman .common .GearmanSessionEventHandler ;
42
+ import org .gearman .common .GearmanTask ;
42
43
import org .gearman .util .ByteUtils ;
43
44
44
45
//TODO change the server selection to use open connection
@@ -66,19 +67,21 @@ private static enum state {
66
67
}
67
68
68
69
private static final String DESCRIPION_PREFIX = "GearmanClient" ;
69
- private final String DESCRIPTION ;
70
- private HashMap <SelectionKey , GearmanJobServerSession > sessionsMap = null ;
71
- private Selector ioAvailable = null ;
72
70
private static final Logger LOG = Logger .getLogger (
73
71
Constants .GEARMAN_CLIENT_LOGGER_NAME );
72
+ private static final String CLIENT_NOT_ACTIVE = "Client is not active" ;
73
+ private final String DESCRIPTION ;
74
+ private Map <SelectionKey , GearmanJobServerSession > sessionsMap = null ;
75
+ private Selector ioAvailable = null ;
74
76
private state runState = state .RUNNING ;
75
- private HashMap <JobHandle , GearmanJobImpl > jobsMaps = null ;
76
- private HashMap <GearmanJobServerSession , GearmanJobImpl > submitJobMap = null ;
77
- private Timer timer = new Timer ();
77
+ private final Map <JobHandle , GearmanJobImpl > jobsMaps ;
78
+ private final Map <GearmanJobServerSession , GearmanJobImpl > submitJobMap ;
79
+ private final Timer timer = new Timer ();
80
+
78
81
79
- private class Alarm extends TimerTask {
82
+ private static class Alarm extends TimerTask {
80
83
81
- private AtomicBoolean timesUp = new AtomicBoolean (false );
84
+ private final AtomicBoolean timesUp = new AtomicBoolean (false );
82
85
83
86
@ Override
84
87
public void run () {
@@ -90,11 +93,11 @@ public boolean hasFired() {
90
93
}
91
94
}
92
95
93
- private class JobHandle {
96
+ private static class JobHandle {
94
97
95
98
private final byte [] handle ;
96
99
97
- private JobHandle (byte [] handle ) {
100
+ JobHandle (byte [] handle ) {
98
101
this .handle = new byte [handle .length ];
99
102
System .arraycopy (handle , 0 , this .handle , 0 , handle .length );
100
103
}
@@ -125,13 +128,11 @@ public GearmanClientImpl() {
125
128
sessionsMap = new HashMap <SelectionKey , GearmanJobServerSession >();
126
129
jobsMaps = new HashMap <JobHandle , GearmanJobImpl >();
127
130
submitJobMap = new HashMap <GearmanJobServerSession , GearmanJobImpl >();
128
- DESCRIPTION = new String (DESCRIPION_PREFIX + ":" +
129
- Thread .currentThread ().getId ());
131
+ DESCRIPTION = DESCRIPION_PREFIX + ":" + Thread .currentThread ().getId ();
130
132
}
131
133
132
- public void addJobServer (GearmanJobServerConnection newconn )
133
- throws IllegalArgumentException ,
134
- IllegalStateException {
134
+ public boolean addJobServer (GearmanJobServerConnection newconn )
135
+ throws IllegalArgumentException ,IllegalStateException {
135
136
136
137
//TODO remove this restriction
137
138
if (!(newconn instanceof GearmanNIOJobServerConnection )) {
@@ -150,7 +151,9 @@ public void addJobServer(GearmanJobServerConnection newconn)
150
151
151
152
GearmanJobServerSession session = new GearmanJobServerSession (conn );
152
153
if (sessionsMap .values ().contains (session )) {
153
- return ;
154
+ LOG .log (Level .FINE ,"The server " + newconn + " was previously " +
155
+ "added to the client. Ignoring add request." );
156
+ return true ;
154
157
}
155
158
156
159
try {
@@ -162,10 +165,13 @@ public void addJobServer(GearmanJobServerConnection newconn)
162
165
SelectionKey key = session .getSelectionKey ();
163
166
sessionsMap .put (key , session );
164
167
} catch (IOException ioe ) {
165
- throw new RuntimeException (ioe );
168
+ LOG .log (Level .WARNING ,"Failed to connect to job server "
169
+ + newconn + "." ,ioe );
170
+ return false ;
166
171
}
167
172
168
173
LOG .log (Level .FINE , "Added connection " + conn + " to client " + this );
174
+ return true ;
169
175
}
170
176
171
177
public boolean hasConnection (GearmanJobServerConnection conn ) {
@@ -180,7 +186,7 @@ public boolean hasConnection(GearmanJobServerConnection conn) {
180
186
public List <GearmanJobServerConnection > getSetOfJobServers ()
181
187
throws IllegalStateException {
182
188
if (!runState .equals (state .RUNNING )) {
183
- throw new IllegalStateException ("Client is not active" );
189
+ throw new IllegalStateException (CLIENT_NOT_ACTIVE );
184
190
}
185
191
186
192
ArrayList <GearmanJobServerConnection > retSet =
@@ -222,7 +228,7 @@ public void removeJobServer(GearmanJobServerConnection conn)
222
228
public <T > Future <T > submit (Callable <T > task ) {
223
229
224
230
if (task == null ) {
225
- throw new NullPointerException ("Null task was submitted to " +
231
+ throw new IllegalStateException ("Null task was submitted to " +
226
232
"gearman client" );
227
233
}
228
234
@@ -255,19 +261,19 @@ public <T> Future<T> submit(Callable<T> task) {
255
261
GearmanTask submittedJob =
256
262
new GearmanTask (handler , submitRequest );
257
263
session .submitTask (submittedJob );
258
- LOG .log (Level .FINE , "Client " + this + " has submitted job " + job +
259
- " to session " + session + ". Job has been added to the " +
264
+ LOG .log (Level .FINE , "Client " + this + " has submitted job " + job + //NOPMD
265
+ " to session " + session + ". Job has been added to the " + //NOPMD
260
266
"active job queue" );
261
267
try {
262
268
submitJobMap .put (session , job );
263
- if (!(driveRequestTillState (session ,
264
- submittedJob , GearmanTask .State .RUNNING ))) {
265
- throw new RuntimeException ("Timed out waiting for submission " +
266
- " of " + job + " to complete" );
269
+ if (!(driveRequestTillState (submittedJob ,
270
+ GearmanTask .State .RUNNING ))) {
271
+ throw new RejectedExecutionException ("Timed out waiting for" +
272
+ " submission of " + job + " to complete" );
267
273
}
268
274
} catch (IOException ioe ) {
269
275
LOG .log (Level .WARNING , "Client " + this + " encounted an " +
270
- "IOException while drivingIO" );
276
+ "IOException while drivingIO" , ioe );
271
277
} finally {
272
278
submitJobMap .remove (session );
273
279
}
@@ -296,11 +302,9 @@ public void execute(Runnable command) {
296
302
// specified generic types), otherwise we will not be able to compile this
297
303
// class using both compilers.
298
304
299
- @ SuppressWarnings ("unchecked" )
300
- public List invokeAll (Collection tasks )
301
- throws InterruptedException {
305
+ @ SuppressWarnings ("unchecked" ) //NOPMD
306
+ public List invokeAll (Collection tasks ) throws InterruptedException {
302
307
ArrayList <Future > futures = new ArrayList <Future >();
303
-
304
308
Iterator <Callable <Future >> iter = tasks .iterator ();
305
309
while (iter .hasNext ()) {
306
310
Callable <Future > curTask = iter .next ();
@@ -310,7 +314,8 @@ public List invokeAll(Collection tasks)
310
314
try {
311
315
results .get ();
312
316
} catch (ExecutionException ee ) {
313
- //TODO
317
+ LOG .log (Level .WARNING ,"Failed to execute task " +
318
+ results + "." ,ee );
314
319
}
315
320
}
316
321
return futures ;
@@ -349,7 +354,7 @@ public GearmanJobStatus getJobStatus(GearmanJob job) throws IOException,
349
354
350
355
public byte [] echo (byte [] data ) throws IOException , GearmanException {
351
356
if (!runState .equals (state .RUNNING )) {
352
- throw new IllegalStateException ("Client is not active" );
357
+ throw new IllegalStateException (CLIENT_NOT_ACTIVE );
353
358
}
354
359
GearmanPacket echoRequest = new GearmanPacketImpl (GearmanPacketMagic .REQ ,
355
360
GearmanPacketType .ECHO_REQ ,data );
@@ -360,7 +365,7 @@ public byte[] echo(byte[] data) throws IOException, GearmanException {
360
365
LOG .log (Level .FINE , "Client " + this + " has submitted echo request " +
361
366
"(payload = " + ByteUtils .toHex (data ) + " to session " +
362
367
session );
363
- if (!driveRequestTillState (session , t , GearmanTask .State .FINISHED )) {
368
+ if (!driveRequestTillState (t , GearmanTask .State .FINISHED )) {
364
369
throw new GearmanException ("Failed to execute echo request " + t +
365
370
" to session " + session );
366
371
}
@@ -371,7 +376,7 @@ public byte[] echo(byte[] data) throws IOException, GearmanException {
371
376
372
377
public int getNumberofActiveJobs () throws IllegalStateException {
373
378
if (runState .equals (state .TERMINATED )) {
374
- throw new IllegalStateException ("Client is not active" );
379
+ throw new IllegalStateException (CLIENT_NOT_ACTIVE );
375
380
}
376
381
return jobsMaps == null ? 0 : jobsMaps .size ();
377
382
}
@@ -397,11 +402,15 @@ public void handleSessionEvent(GearmanSessionEvent event)
397
402
JobHandle handle = new JobHandle (p .getDataComponentValue (
398
403
GearmanPacket .DataComponentName .JOB_HANDLE ));
399
404
GearmanJobImpl job = jobsMaps .get (handle );
400
- if (job != null ) {
405
+ if (job == null ) {
406
+ LOG .log (Level .WARNING ,"Client received packet from server" +
407
+ " for unknown job ( job_handle = " + handle +
408
+ " packet = " + t +" )" );
409
+ } else {
401
410
job .handleEvent (p );
402
- }
403
- if ( job . isDone ()) {
404
- jobsMaps . remove ( handle );
411
+ if ( job . isDone ()) {
412
+ jobsMaps . remove ( handle );
413
+ }
405
414
}
406
415
break ;
407
416
case ERROR :
@@ -520,6 +529,13 @@ public String toString() {
520
529
}
521
530
522
531
private void driveClientIO () throws IOException , GearmanException {
532
+ for (GearmanJobServerSession sess : sessionsMap .values ()) {
533
+ int interestOps = SelectionKey .OP_READ ;
534
+ if (sess .sessionHasDataToWrite ()) {
535
+ interestOps |= SelectionKey .OP_WRITE ;
536
+ }
537
+ sess .getSelectionKey ().interestOps (interestOps );
538
+ }
523
539
ioAvailable .selectNow ();
524
540
Set <SelectionKey > keys = ioAvailable .selectedKeys ();
525
541
LOG .log (Level .FINEST , "Driving IO for client " + this + ". " +
@@ -536,7 +552,7 @@ private GearmanJobStatus updateJobStatus(byte[] jobhandle,
536
552
GearmanJobServerSession session ) throws IOException ,
537
553
IllegalStateException , GearmanException {
538
554
if (!runState .equals (state .RUNNING )) {
539
- throw new IllegalStateException ("Client is not active" );
555
+ throw new IllegalStateException (CLIENT_NOT_ACTIVE );
540
556
}
541
557
542
558
if (jobhandle == null || jobhandle .length == 0 ) {
@@ -550,7 +566,7 @@ private GearmanJobStatus updateJobStatus(byte[] jobhandle,
550
566
GearmanTask t = new GearmanTask (
551
567
handler , statusRequest );
552
568
session .submitTask (t );
553
- if (!driveRequestTillState (session , t ,GearmanTask .State .FINISHED )) {
569
+ if (!driveRequestTillState (t ,GearmanTask .State .FINISHED )) {
554
570
throw new GearmanException ("Failed to execute jobstatus request " +
555
571
t + " to session " + session );
556
572
}
@@ -575,13 +591,12 @@ private GearmanJobServerSession getSessionForTask() throws IOException {
575
591
return session ;
576
592
}
577
593
578
- private boolean driveRequestTillState (GearmanJobServerSession session ,
579
- GearmanTask r , GearmanTask .State state )
594
+ private boolean driveRequestTillState (GearmanTask r , GearmanTask .State state )
580
595
throws IOException , GearmanException {
581
596
Alarm alarm = new Alarm ();
582
597
timer .schedule (alarm , 2000 );
583
598
while (r .getState ().compareTo (state ) < 0 && !(alarm .hasFired ())) {
584
- session . driveSessionIO ();
599
+ driveClientIO ();
585
600
}
586
601
return r .getState ().compareTo (state ) >= 0 ;
587
602
}
0 commit comments