2525import java .util .concurrent .ConcurrentHashMap ;
2626import java .util .concurrent .ConcurrentMap ;
2727
28+ import com .google .common .annotations .VisibleForTesting ;
2829import org .apache .commons .logging .Log ;
2930import org .apache .commons .logging .LogFactory ;
31+ import org .apache .hadoop .classification .InterfaceAudience ;
3032import org .apache .hadoop .conf .Configuration ;
3133import org .apache .hadoop .fs .FileContext ;
3234import org .apache .hadoop .fs .Path ;
33- import org .apache .hadoop .http . HttpConfig ;
35+ import org .apache .hadoop .ha . HAServiceProtocol ;
3436import org .apache .hadoop .metrics2 .lib .DefaultMetricsSystem ;
3537import org .apache .hadoop .service .AbstractService ;
3638import org .apache .hadoop .service .CompositeService ;
3739import org .apache .hadoop .util .Shell ;
3840import org .apache .hadoop .util .Shell .ShellCommandExecutor ;
3941import org .apache .hadoop .yarn .api .records .ApplicationAttemptId ;
42+ import org .apache .hadoop .yarn .conf .HAUtil ;
4043import org .apache .hadoop .yarn .conf .YarnConfiguration ;
4144import org .apache .hadoop .yarn .event .Dispatcher ;
4245import org .apache .hadoop .yarn .event .EventHandler ;
@@ -87,7 +90,7 @@ public class MiniYARNCluster extends CompositeService {
8790 }
8891
8992 private NodeManager [] nodeManagers ;
90- private ResourceManager resourceManager ;
93+ private ResourceManager [] resourceManagers ;
9194
9295 private ResourceManagerWrapper resourceManagerWrapper ;
9396
@@ -103,12 +106,14 @@ public class MiniYARNCluster extends CompositeService {
103106
104107 /**
105108 * @param testName name of the test
106- * @param noOfNodeManagers the number of node managers in the cluster
109+ * @param numResourceManagers the number of resource managers in the cluster
110+ * @param numNodeManagers the number of node managers in the cluster
107111 * @param numLocalDirs the number of nm-local-dirs per nodemanager
108112 * @param numLogDirs the number of nm-log-dirs per nodemanager
109113 */
110- public MiniYARNCluster (String testName , int noOfNodeManagers ,
111- int numLocalDirs , int numLogDirs ) {
114+ public MiniYARNCluster (
115+ String testName , int numResourceManagers , int numNodeManagers ,
116+ int numLocalDirs , int numLogDirs ) {
112117 super (testName .replace ("$" , "" ));
113118 this .numLocalDirs = numLocalDirs ;
114119 this .numLogDirs = numLogDirs ;
@@ -157,28 +162,103 @@ public MiniYARNCluster(String testName, int noOfNodeManagers,
157162 this .testWorkDir = targetWorkDir ;
158163 }
159164
160- resourceManagerWrapper = new ResourceManagerWrapper ();
161- addService (resourceManagerWrapper );
162- nodeManagers = new CustomNodeManager [noOfNodeManagers ];
163- for (int index = 0 ; index < noOfNodeManagers ; index ++) {
165+ resourceManagers = new ResourceManager [numResourceManagers ];
166+ for (int i = 0 ; i < numResourceManagers ; i ++) {
167+ resourceManagers [i ] = new ResourceManager ();
168+ addService (new ResourceManagerWrapper (i ));
169+ }
170+ nodeManagers = new CustomNodeManager [numNodeManagers ];
171+ for (int index = 0 ; index < numNodeManagers ; index ++) {
164172 addService (new NodeManagerWrapper (index ));
165173 nodeManagers [index ] = new CustomNodeManager ();
166174 }
167175 }
168-
169- @ Override
176+
177+ /**
178+ * @param testName name of the test
179+ * @param numNodeManagers the number of node managers in the cluster
180+ * @param numLocalDirs the number of nm-local-dirs per nodemanager
181+ * @param numLogDirs the number of nm-log-dirs per nodemanager
182+ */
183+ public MiniYARNCluster (String testName , int numNodeManagers ,
184+ int numLocalDirs , int numLogDirs ) {
185+ this (testName , 1 , numNodeManagers , numLocalDirs , numLogDirs );
186+ }
187+
188+ @ Override
170189 public void serviceInit (Configuration conf ) throws Exception {
171- super .serviceInit (conf instanceof YarnConfiguration ? conf
172- : new YarnConfiguration (
173- conf ));
190+ if (resourceManagers .length > 1 ) {
191+ conf .setBoolean (YarnConfiguration .RM_HA_ENABLED , true );
192+
193+ StringBuilder rmIds = new StringBuilder ();
194+ for (int i = 0 ; i < resourceManagers .length ; i ++) {
195+ if (i != 0 ) {
196+ rmIds .append ("," );
197+ }
198+ rmIds .append ("rm" + i );
199+ }
200+ conf .set (YarnConfiguration .RM_HA_IDS , rmIds .toString ());
201+ }
202+ super .serviceInit (
203+ conf instanceof YarnConfiguration ? conf : new YarnConfiguration (conf ));
174204 }
175205
176206 public File getTestWorkDir () {
177207 return testWorkDir ;
178208 }
179209
210+ /**
211+ * In a HA cluster, go through all the RMs and find the Active RM. If none
212+ * of them are active, wait upto 5 seconds for them to transition to Active.
213+ *
214+ * In an non-HA cluster, return the index of the only RM.
215+ *
216+ * @return index of the active RM
217+ */
218+ @ InterfaceAudience .Private
219+ @ VisibleForTesting
220+ int getActiveRMIndex () {
221+ if (resourceManagers .length == 1 ) {
222+ return 0 ;
223+ }
224+
225+ int numRetriesForRMBecomingActive = 5 ;
226+ while (numRetriesForRMBecomingActive -- > 0 ) {
227+ for (int i = 0 ; i < resourceManagers .length ; i ++) {
228+ try {
229+ if (HAServiceProtocol .HAServiceState .ACTIVE ==
230+ resourceManagers [i ].getRMContext ().getRMAdminService ()
231+ .getServiceStatus ().getState ()) {
232+ return i ;
233+ }
234+ } catch (IOException e ) {
235+ throw new YarnRuntimeException ("Couldn't read the status of " +
236+ "a ResourceManger in the HA ensemble." , e );
237+ }
238+ }
239+ try {
240+ Thread .sleep (1000 );
241+ } catch (InterruptedException e ) {
242+ throw new YarnRuntimeException ("Interrupted while waiting for one " +
243+ "of the ResourceManagers to become active" );
244+ }
245+ }
246+ return -1 ;
247+ }
248+
249+ /**
250+ * @return the active {@link ResourceManager} of the cluster,
251+ * null if none of them are active.
252+ */
180253 public ResourceManager getResourceManager () {
181- return this .resourceManager ;
254+ int activeRMIndex = getActiveRMIndex ();
255+ return activeRMIndex == -1
256+ ? null
257+ : this .resourceManagers [getActiveRMIndex ()];
258+ }
259+
260+ public ResourceManager getResourceManager (int i ) {
261+ return this .resourceManagers [i ];
182262 }
183263
184264 public NodeManager getNodeManager (int i ) {
@@ -195,8 +275,29 @@ public static String getHostname() {
195275 }
196276
197277 private class ResourceManagerWrapper extends AbstractService {
198- public ResourceManagerWrapper () {
199- super (ResourceManagerWrapper .class .getName ());
278+ private int index ;
279+
280+ public ResourceManagerWrapper (int i ) {
281+ super (ResourceManagerWrapper .class .getName () + "_" + i );
282+ index = i ;
283+ }
284+
285+ private void setNonHARMConfiguration (Configuration conf ) {
286+ String hostname = MiniYARNCluster .getHostname ();
287+ conf .set (YarnConfiguration .RM_ADDRESS , hostname + ":0" );
288+ conf .set (YarnConfiguration .RM_ADMIN_ADDRESS , hostname + ":0" );
289+ conf .set (YarnConfiguration .RM_SCHEDULER_ADDRESS , hostname + ":0" );
290+ conf .set (YarnConfiguration .RM_RESOURCE_TRACKER_ADDRESS , hostname + ":0" );
291+ WebAppUtils .setRMWebAppHostnameAndPort (conf , hostname , 0 );
292+ }
293+
294+ private void setHARMConfiguration (Configuration conf ) {
295+ String rmId = "rm" + index ;
296+ String hostname = MiniYARNCluster .getHostname ();
297+ conf .set (YarnConfiguration .RM_HA_ID , rmId );
298+ for (String confKey : YarnConfiguration .RM_RPC_ADDRESS_CONF_KEYS ) {
299+ conf .set (HAUtil .addSuffix (confKey , rmId ), hostname + ":0" );
300+ }
200301 }
201302
202303 @ Override
@@ -206,22 +307,15 @@ protected synchronized void serviceInit(Configuration conf)
206307 if (!conf .getBoolean (
207308 YarnConfiguration .YARN_MINICLUSTER_FIXED_PORTS ,
208309 YarnConfiguration .DEFAULT_YARN_MINICLUSTER_FIXED_PORTS )) {
209- // pick free random ports.
210- String hostname = MiniYARNCluster .getHostname ();
211- conf .set (YarnConfiguration .RM_ADDRESS , hostname + ":0" );
212- conf .set (YarnConfiguration .RM_ADMIN_ADDRESS , hostname + ":0" );
213- conf .set (YarnConfiguration .RM_SCHEDULER_ADDRESS , hostname + ":0" );
214- conf .set (YarnConfiguration .RM_RESOURCE_TRACKER_ADDRESS , hostname + ":0" );
215- WebAppUtils .setRMWebAppHostnameAndPort (conf , hostname , 0 );
310+ if (HAUtil .isHAEnabled (conf )) {
311+ setHARMConfiguration (conf );
312+ } else {
313+ setNonHARMConfiguration (conf );
314+ }
216315 }
217- resourceManager = new ResourceManager () {
218- @ Override
219- protected void doSecureLogin () throws IOException {
220- // Don't try to login using keytab in the testcase.
221- };
222- };
223- resourceManager .init (conf );
224- resourceManager .getRMContext ().getDispatcher ().register (RMAppAttemptEventType .class ,
316+ resourceManagers [index ].init (conf );
317+ resourceManagers [index ].getRMContext ().getDispatcher ().register
318+ (RMAppAttemptEventType .class ,
225319 new EventHandler <RMAppAttemptEvent >() {
226320 public void handle (RMAppAttemptEvent event ) {
227321 if (event instanceof RMAppAttemptRegistrationEvent ) {
@@ -239,20 +333,20 @@ protected synchronized void serviceStart() throws Exception {
239333 try {
240334 new Thread () {
241335 public void run () {
242- resourceManager .start ();
243- };
336+ resourceManagers [ index ] .start ();
337+ }
244338 }.start ();
245339 int waitCount = 0 ;
246- while (resourceManager .getServiceState () == STATE .INITED
340+ while (resourceManagers [ index ] .getServiceState () == STATE .INITED
247341 && waitCount ++ < 60 ) {
248342 LOG .info ("Waiting for RM to start..." );
249343 Thread .sleep (1500 );
250344 }
251- if (resourceManager .getServiceState () != STATE .STARTED ) {
345+ if (resourceManagers [ index ] .getServiceState () != STATE .STARTED ) {
252346 // RM could have failed.
253347 throw new IOException (
254348 "ResourceManager failed to start. Final state is "
255- + resourceManager .getServiceState ());
349+ + resourceManagers [ index ] .getServiceState ());
256350 }
257351 super .serviceStart ();
258352 } catch (Throwable t ) {
@@ -278,9 +372,9 @@ private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedExc
278372
279373 @ Override
280374 protected synchronized void serviceStop () throws Exception {
281- if (resourceManager != null ) {
375+ if (resourceManagers [ index ] != null ) {
282376 waitForAppMastersToFinish (5000 );
283- resourceManager .stop ();
377+ resourceManagers [ index ] .stop ();
284378 }
285379 super .serviceStop ();
286380
@@ -372,7 +466,7 @@ protected synchronized void serviceStart() throws Exception {
372466 new Thread () {
373467 public void run () {
374468 nodeManagers [index ].start ();
375- };
469+ }
376470 }.start ();
377471 int waitCount = 0 ;
378472 while (nodeManagers [index ].getServiceState () == STATE .INITED
@@ -398,12 +492,12 @@ protected synchronized void serviceStop() throws Exception {
398492 super .serviceStop ();
399493 }
400494 }
401-
495+
402496 private class CustomNodeManager extends NodeManager {
403497 @ Override
404498 protected void doSecureLogin () throws IOException {
405499 // Don't try to login using keytab in the testcase.
406- };
500+ }
407501
408502 @ Override
409503 protected NodeStatusUpdater createNodeStatusUpdater (Context context ,
@@ -412,8 +506,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
412506 healthChecker , metrics ) {
413507 @ Override
414508 protected ResourceTracker getRMClient () {
415- final ResourceTrackerService rt = resourceManager
416- .getResourceTrackerService ();
509+ final ResourceTrackerService rt =
510+ getResourceManager () .getResourceTrackerService ();
417511 final RecordFactory recordFactory =
418512 RecordFactoryProvider .getRecordFactory (null );
419513
@@ -424,8 +518,7 @@ protected ResourceTracker getRMClient() {
424518 public NodeHeartbeatResponse nodeHeartbeat (
425519 NodeHeartbeatRequest request ) throws YarnException ,
426520 IOException {
427- NodeHeartbeatResponse response = recordFactory .newRecordInstance (
428- NodeHeartbeatResponse .class );
521+ NodeHeartbeatResponse response ;
429522 try {
430523 response = rt .nodeHeartbeat (request );
431524 } catch (YarnException e ) {
@@ -440,8 +533,7 @@ public NodeHeartbeatResponse nodeHeartbeat(
440533 public RegisterNodeManagerResponse registerNodeManager (
441534 RegisterNodeManagerRequest request )
442535 throws YarnException , IOException {
443- RegisterNodeManagerResponse response = recordFactory .
444- newRecordInstance (RegisterNodeManagerResponse .class );
536+ RegisterNodeManagerResponse response ;
445537 try {
446538 response = rt .registerNodeManager (request );
447539 } catch (YarnException e ) {
@@ -452,13 +544,11 @@ public RegisterNodeManagerResponse registerNodeManager(
452544 return response ;
453545 }
454546 };
455- };
547+ }
456548
457549 @ Override
458- protected void stopRMProxy () {
459- return ;
460- }
550+ protected void stopRMProxy () { }
461551 };
462- };
552+ }
463553 }
464554}
0 commit comments