2222import java .io .IOException ;
2323import java .io .File ;
2424
25+ import org .apache .hadoop .mapred .MiniMRClientCluster ;
26+ import org .apache .hadoop .mapred .MiniMRClientClusterFactory ;
27+ import org .apache .hadoop .yarn .conf .YarnConfiguration ;
2528import org .junit .After ;
2629import org .junit .Before ;
2730import org .junit .Test ;
3235import org .apache .hadoop .fs .Path ;
3336import org .apache .hadoop .mapred .Counters ;
3437import org .apache .hadoop .mapred .JobConf ;
35- import org .apache .hadoop .mapred .MiniMRCluster ;
3638import org .apache .hadoop .mapred .TaskAttemptID ;
3739import org .apache .hadoop .mapred .TaskID ;
3840import org .apache .hadoop .mapred .TaskLog ;
@@ -90,7 +92,7 @@ public class TestStreamingStatus {
9092 "print STDERR \" my error msg after consuming input\\ n\" ;\n " +
9193 "print STDERR \" reporter:counter:myOwnCounterGroup,myOwnCounter,1\\ n\" ;\n " ;
9294
93- MiniMRCluster mr = null ;
95+ private MiniMRClientCluster mr ;
9496 FileSystem fs = null ;
9597 JobConf conf = null ;
9698
@@ -105,10 +107,10 @@ public void setUp() throws IOException {
105107 conf .setBoolean (JTConfig .JT_RETIREJOBS , false );
106108 conf .setBoolean (JTConfig .JT_PERSIST_JOBSTATUS , false );
107109
108- mr = new MiniMRCluster ( 1 , "file:///" , 3 , null , null , conf );
110+ mr = MiniMRClientClusterFactory . create ( this . getClass (), 3 , conf );
109111
110112 Path inFile = new Path (INPUT_FILE );
111- fs = inFile .getFileSystem (mr .createJobConf ());
113+ fs = inFile .getFileSystem (mr .getConfig ());
112114 clean (fs );
113115
114116 buildExpectedJobOutput ();
@@ -118,9 +120,13 @@ public void setUp() throws IOException {
118120 * Kill the cluster after the test is done.
119121 */
120122 @ After
121- public void tearDown () {
122- if (fs != null ) { clean (fs ); }
123- if (mr != null ) { mr .shutdown (); }
123+ public void tearDown () throws IOException {
124+ if (fs != null ) {
125+ clean (fs );
126+ }
127+ if (mr != null ) {
128+ mr .stop ();
129+ }
124130 }
125131
126132 // Updates expectedOutput to have the expected job output as a string
@@ -146,21 +152,24 @@ protected void createInputAndScript(boolean isEmptyInput,
146152 file .close ();
147153 }
148154
149- protected String [] genArgs (String jobtracker , String mapper , String reducer )
155+ protected String [] genArgs (String jobtracker , String rmAddress ,
156+ String mapper , String reducer )
150157 {
151158 return new String [] {
152- "-input" , INPUT_FILE ,
153- "-output" , OUTPUT_DIR ,
154- "-mapper" , mapper ,
155- "-reducer" , reducer ,
156- "-jobconf" , MRJobConfig .NUM_MAPS + "=1" ,
157- "-jobconf" , MRJobConfig .NUM_REDUCES + "=1" ,
158- "-jobconf" , MRJobConfig .PRESERVE_FAILED_TASK_FILES + "=true" ,
159- "-jobconf" , "stream.tmpdir=" + new Path (TEST_ROOT_DIR ).toUri ().getPath (),
160- "-jobconf" , JTConfig .JT_IPC_ADDRESS + "=" +jobtracker ,
161- "-jobconf" , "fs.default.name=file:///" ,
162- "-jobconf" , "mapred.jar=" + TestStreaming .STREAMING_JAR ,
163- "-jobconf" , "mapreduce.framework.name=yarn"
159+ "-input" , INPUT_FILE ,
160+ "-output" , OUTPUT_DIR ,
161+ "-mapper" , mapper ,
162+ "-reducer" , reducer ,
163+ "-jobconf" , MRJobConfig .NUM_MAPS + "=1" ,
164+ "-jobconf" , MRJobConfig .NUM_REDUCES + "=1" ,
165+ "-jobconf" , MRJobConfig .PRESERVE_FAILED_TASK_FILES + "=true" ,
166+ "-jobconf" , YarnConfiguration .RM_ADDRESS + "=" + rmAddress ,
167+ "-jobconf" , "stream.tmpdir=" +
168+ new Path (TEST_ROOT_DIR ).toUri ().getPath (),
169+ "-jobconf" , JTConfig .JT_IPC_ADDRESS + "=" +jobtracker ,
170+ "-jobconf" , "fs.default.name=file:///" ,
171+ "-jobconf" , "mapred.jar=" + TestStreaming .STREAMING_JAR ,
172+ "-jobconf" , "mapreduce.framework.name=yarn"
164173 };
165174 }
166175
@@ -218,10 +227,9 @@ public void testReporting() throws Exception {
218227 * Run another streaming job with the given script as reducer and validate.
219228 *
220229 * @param isEmptyInput Should the input to the script be empty ?
221- * @param script The content of the script that will run as the streaming task
222230 */
223231 private void testStreamJob (boolean isEmptyInput )
224- throws IOException {
232+ throws Exception {
225233
226234 createInputAndScript (isEmptyInput , script );
227235
@@ -249,11 +257,12 @@ private void testStreamJob(boolean isEmptyInput)
249257 // all "reporter:status" and "reporter:counter" lines.
250258 // (4) Validate stderr of task of given task type.
251259 // (5) Validate job output
252- void runStreamJob (TaskType type , boolean isEmptyInput ) throws IOException {
253- boolean mayExit = false ;
254- StreamJob job = new StreamJob (genArgs (
255- mr .createJobConf ().get (JTConfig .JT_IPC_ADDRESS ), map , reduce ), mayExit );
256- int returnValue = job .go ();
260+ private void runStreamJob (TaskType type , boolean isEmptyInput )
261+ throws Exception {
262+ StreamJob job = new StreamJob ();
263+ int returnValue = job .run (genArgs (
264+ mr .getConfig ().get (JTConfig .JT_IPC_ADDRESS ),
265+ mr .getConfig ().get (YarnConfiguration .RM_ADDRESS ), map , reduce ));
257266 assertEquals (0 , returnValue );
258267
259268 // If input to reducer is empty, dummy reporter(which ignores all
0 commit comments