2929import java .io .PipedOutputStream ;
3030import java .io .PrintStream ;
3131
32+ import junit .framework .Assert ;
33+
3234import org .apache .commons .logging .Log ;
3335import org .apache .commons .logging .LogFactory ;
3436import org .apache .hadoop .conf .Configuration ;
@@ -60,6 +62,22 @@ private Job runJob(Configuration conf) throws Exception {
6062 return job ;
6163 }
6264
65+ private Job runJobInBackGround (Configuration conf ) throws Exception {
66+ String input = "hello1\n hello2\n hello3\n " ;
67+
68+ Job job = MapReduceTestUtil .createJob (conf , getInputDir (), getOutputDir (),
69+ 1 , 1 , input );
70+ job .setJobName ("mr" );
71+ job .setPriority (JobPriority .NORMAL );
72+ job .submit ();
73+ int i = 0 ;
74+ while (i ++ < 200 && job .getJobID () == null ) {
75+ LOG .info ("waiting for jobId..." );
76+ Thread .sleep (100 );
77+ }
78+ return job ;
79+ }
80+
6381 public static int runTool (Configuration conf , Tool tool , String [] args ,
6482 OutputStream out ) throws Exception {
6583 PrintStream oldOut = System .out ;
@@ -108,8 +126,10 @@ public void testJobClient() throws Exception {
108126 Job job = runJob (conf );
109127
110128 String jobId = job .getJobID ().toString ();
111- // test jobs list
112- testJobList (jobId , conf );
129+ // test all jobs list
130+ testAllJobList (jobId , conf );
131+ // test only submitted jobs list
132+ testSubmittedJobList (conf );
113133 // test job counter
114134 testGetCounter (jobId , conf );
115135 // status
@@ -131,37 +151,37 @@ public void testJobClient() throws Exception {
131151 // submit job from file
132152 testSubmit (conf );
133153 // kill a task
134- testKillTask (job , conf );
154+ testKillTask (conf );
135155 // fail a task
136- testfailTask (job , conf );
156+ testfailTask (conf );
137157 // kill job
138- testKillJob (jobId , conf );
158+ testKillJob (conf );
139159 }
140160
141161 /**
142162 * test fail task
143163 */
144- private void testfailTask (Job job , Configuration conf ) throws Exception {
164+ private void testfailTask (Configuration conf ) throws Exception {
165+ Job job = runJobInBackGround (conf );
145166 CLI jc = createJobClient ();
146167 TaskID tid = new TaskID (job .getJobID (), TaskType .MAP , 0 );
147168 TaskAttemptID taid = new TaskAttemptID (tid , 1 );
148169 ByteArrayOutputStream out = new ByteArrayOutputStream ();
149- // TaskAttemptId is not set
170+ // TaskAttemptId is not set
150171 int exitCode = runTool (conf , jc , new String [] { "-fail-task" }, out );
151172 assertEquals ("Exit code" , -1 , exitCode );
152173
153- try {
154- runTool (conf , jc , new String [] { "-fail-task" , taid .toString () }, out );
155- fail (" this task should field" );
156- } catch (IOException e ) {
157- // task completed !
158- assertTrue (e .getMessage ().contains ("_0001_m_000000_1" ));
159- }
174+ runTool (conf , jc , new String [] { "-fail-task" , taid .toString () }, out );
175+ String answer = new String (out .toByteArray (), "UTF-8" );
176+ Assert
177+ .assertTrue (answer .contains ("Killed task " + taid + " by failing it" ));
160178 }
179+
161180 /**
162181 * test a kill task
163182 */
164- private void testKillTask (Job job , Configuration conf ) throws Exception {
183+ private void testKillTask (Configuration conf ) throws Exception {
184+ Job job = runJobInBackGround (conf );
165185 CLI jc = createJobClient ();
166186 TaskID tid = new TaskID (job .getJobID (), TaskType .MAP , 0 );
167187 TaskAttemptID taid = new TaskAttemptID (tid , 1 );
@@ -170,20 +190,17 @@ private void testKillTask(Job job, Configuration conf) throws Exception {
170190 int exitCode = runTool (conf , jc , new String [] { "-kill-task" }, out );
171191 assertEquals ("Exit code" , -1 , exitCode );
172192
173- try {
174- runTool (conf , jc , new String [] { "-kill-task" , taid .toString () }, out );
175- fail (" this task should be killed" );
176- } catch (IOException e ) {
177- System .out .println (e );
178- // task completed
179- assertTrue (e .getMessage ().contains ("_0001_m_000000_1" ));
180- }
193+ runTool (conf , jc , new String [] { "-kill-task" , taid .toString () }, out );
194+ String answer = new String (out .toByteArray (), "UTF-8" );
195+ Assert .assertTrue (answer .contains ("Killed task " + taid ));
181196 }
182197
183198 /**
184199 * test a kill job
185200 */
186- private void testKillJob (String jobId , Configuration conf ) throws Exception {
201+ private void testKillJob (Configuration conf ) throws Exception {
202+ Job job = runJobInBackGround (conf );
203+ String jobId = job .getJobID ().toString ();
187204 CLI jc = createJobClient ();
188205
189206 ByteArrayOutputStream out = new ByteArrayOutputStream ();
@@ -434,7 +451,8 @@ public void testGetCounter(String jobId, Configuration conf) throws Exception {
434451 /**
435452 * print a job list
436453 */
437- protected void testJobList (String jobId , Configuration conf ) throws Exception {
454+ protected void testAllJobList (String jobId , Configuration conf )
455+ throws Exception {
438456 ByteArrayOutputStream out = new ByteArrayOutputStream ();
439457 // bad options
440458
@@ -457,23 +475,31 @@ protected void testJobList(String jobId, Configuration conf) throws Exception {
457475 }
458476 assertEquals (1 , counter );
459477 out .reset ();
460- // only submitted
461- exitCode = runTool (conf , createJobClient (), new String [] { "-list" }, out );
478+ }
479+
480+ protected void testSubmittedJobList (Configuration conf ) throws Exception {
481+ Job job = runJobInBackGround (conf );
482+ ByteArrayOutputStream out = new ByteArrayOutputStream ();
483+ String line ;
484+ int counter = 0 ;
485+ // only submitted
486+ int exitCode =
487+ runTool (conf , createJobClient (), new String [] { "-list" }, out );
462488 assertEquals ("Exit code" , 0 , exitCode );
463- br = new BufferedReader (new InputStreamReader (new ByteArrayInputStream (
464- out .toByteArray ())));
489+ BufferedReader br =
490+ new BufferedReader (new InputStreamReader (new ByteArrayInputStream (
491+ out .toByteArray ())));
465492 counter = 0 ;
466493 while ((line = br .readLine ()) != null ) {
467494 LOG .info ("line = " + line );
468- if (line .contains (jobId )) {
495+ if (line .contains (job . getJobID (). toString () )) {
469496 counter ++;
470497 }
471498 }
472499 // all jobs submitted! no current
473500 assertEquals (1 , counter );
474-
475501 }
476-
502+
477503 protected void verifyJobPriority (String jobId , String priority ,
478504 Configuration conf , CLI jc ) throws Exception {
479505 PipedInputStream pis = new PipedInputStream ();
0 commit comments