Skip to content

Commit 038b46d

Browse files
committed
YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1530542 13f79535-47bb-0310-9956-ffa450edef68
1 parent 9066531 commit 038b46d

File tree

3 files changed

+124
-48
lines changed
  • hadoop-yarn-project
    • hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src

3 files changed

+124
-48
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ Release 2.3.0 - UNRELEASED
5959

6060
YARN-976. Document the meaning of a virtual core. (Sandy Ryza)
6161

62+
YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
63+
6264
OPTIMIZATIONS
6365

6466
BUG FIXES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -302,55 +302,70 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
302302
throw new AllocationConfigurationException("Bad fair scheduler config " +
303303
"file: top-level element not <allocations>");
304304
NodeList elements = root.getChildNodes();
305+
List<Element> queueElements = new ArrayList<Element>();
305306
for (int i = 0; i < elements.getLength(); i++) {
306307
Node node = elements.item(i);
307-
if (!(node instanceof Element))
308-
continue;
309-
Element element = (Element)node;
310-
if ("queue".equals(element.getTagName()) ||
311-
"pool".equals(element.getTagName())) {
312-
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
313-
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
314-
queueAcls, queueNamesInAllocFile);
315-
} else if ("user".equals(element.getTagName())) {
316-
String userName = element.getAttribute("name");
317-
NodeList fields = element.getChildNodes();
318-
for (int j = 0; j < fields.getLength(); j++) {
319-
Node fieldNode = fields.item(j);
320-
if (!(fieldNode instanceof Element))
321-
continue;
322-
Element field = (Element) fieldNode;
323-
if ("maxRunningApps".equals(field.getTagName())) {
324-
String text = ((Text)field.getFirstChild()).getData().trim();
325-
int val = Integer.parseInt(text);
326-
userMaxApps.put(userName, val);
308+
if (node instanceof Element) {
309+
Element element = (Element)node;
310+
if ("queue".equals(element.getTagName()) ||
311+
"pool".equals(element.getTagName())) {
312+
queueElements.add(element);
313+
} else if ("user".equals(element.getTagName())) {
314+
String userName = element.getAttribute("name");
315+
NodeList fields = element.getChildNodes();
316+
for (int j = 0; j < fields.getLength(); j++) {
317+
Node fieldNode = fields.item(j);
318+
if (!(fieldNode instanceof Element))
319+
continue;
320+
Element field = (Element) fieldNode;
321+
if ("maxRunningApps".equals(field.getTagName())) {
322+
String text = ((Text)field.getFirstChild()).getData().trim();
323+
int val = Integer.parseInt(text);
324+
userMaxApps.put(userName, val);
325+
}
327326
}
327+
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
328+
String text = ((Text)element.getFirstChild()).getData().trim();
329+
int val = Integer.parseInt(text);
330+
userMaxAppsDefault = val;
331+
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
332+
String text = ((Text)element.getFirstChild()).getData().trim();
333+
long val = Long.parseLong(text) * 1000L;
334+
fairSharePreemptionTimeout = val;
335+
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
336+
String text = ((Text)element.getFirstChild()).getData().trim();
337+
long val = Long.parseLong(text) * 1000L;
338+
defaultMinSharePreemptionTimeout = val;
339+
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
340+
String text = ((Text)element.getFirstChild()).getData().trim();
341+
int val = Integer.parseInt(text);
342+
queueMaxAppsDefault = val;
343+
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
344+
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
345+
String text = ((Text)element.getFirstChild()).getData().trim();
346+
SchedulingPolicy.setDefault(text);
347+
defaultSchedPolicy = SchedulingPolicy.getDefault();
348+
} else {
349+
LOG.warn("Bad element in allocations file: " + element.getTagName());
328350
}
329-
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
330-
String text = ((Text)element.getFirstChild()).getData().trim();
331-
int val = Integer.parseInt(text);
332-
userMaxAppsDefault = val;
333-
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
334-
String text = ((Text)element.getFirstChild()).getData().trim();
335-
long val = Long.parseLong(text) * 1000L;
336-
fairSharePreemptionTimeout = val;
337-
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
338-
String text = ((Text)element.getFirstChild()).getData().trim();
339-
long val = Long.parseLong(text) * 1000L;
340-
defaultMinSharePreemptionTimeout = val;
341-
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
342-
String text = ((Text)element.getFirstChild()).getData().trim();
343-
int val = Integer.parseInt(text);
344-
queueMaxAppsDefault = val;
345-
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
346-
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
347-
String text = ((Text)element.getFirstChild()).getData().trim();
348-
SchedulingPolicy.setDefault(text);
349-
defaultSchedPolicy = SchedulingPolicy.getDefault();
350-
} else {
351-
LOG.warn("Bad element in allocations file: " + element.getTagName());
352351
}
353352
}
353+
354+
// Load queue elements. A root queue can either be included or omitted. If
355+
// it's included, all other queues must be inside it.
356+
for (Element element : queueElements) {
357+
String parent = "root";
358+
if (element.getAttribute("name").equalsIgnoreCase("root")) {
359+
if (queueElements.size() > 1) {
360+
throw new AllocationConfigurationException("If configuring root queue,"
361+
+ " no other queues can be placed alongside it.");
362+
}
363+
parent = null;
364+
}
365+
loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
366+
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
367+
queueAcls, queueNamesInAllocFile);
368+
}
354369

355370
// Commit the reload; also create any queue defined in the alloc file
356371
// if it does not already exist, so it can be displayed on the web UI.
@@ -398,7 +413,10 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
398413
Map<String, Long> minSharePreemptionTimeouts,
399414
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
400415
throws AllocationConfigurationException {
401-
String queueName = parentName + "." + element.getAttribute("name");
416+
String queueName = element.getAttribute("name");
417+
if (parentName != null) {
418+
queueName = parentName + "." + queueName;
419+
}
402420
Map<QueueACL, AccessControlList> acls =
403421
new HashMap<QueueACL, AccessControlList>();
404422
NodeList fields = element.getChildNodes();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -923,14 +923,70 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX
923923

924924
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
925925
Assert.assertEquals(4, leafQueues.size());
926-
Assert.assertNotNull(queueManager.getLeafQueue("queueA", true));
927-
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", true));
928-
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", true));
929-
Assert.assertNotNull(queueManager.getLeafQueue("default", true));
926+
Assert.assertNotNull(queueManager.getLeafQueue("queueA", false));
927+
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", false));
928+
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", false));
929+
Assert.assertNotNull(queueManager.getLeafQueue("default", false));
930930
// Make sure querying for queues didn't create any new ones:
931931
Assert.assertEquals(4, leafQueues.size());
932932
}
933933

934+
@Test
935+
public void testConfigureRootQueue() throws Exception {
936+
Configuration conf = createConfiguration();
937+
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
938+
scheduler.reinitialize(conf, resourceManager.getRMContext());
939+
940+
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
941+
out.println("<?xml version=\"1.0\"?>");
942+
out.println("<allocations>");
943+
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
944+
out.println("<queue name=\"root\">");
945+
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
946+
out.println(" <queue name=\"child1\">");
947+
out.println(" <minResources>1024mb,1vcores</minResources>");
948+
out.println(" </queue>");
949+
out.println(" <queue name=\"child2\">");
950+
out.println(" <minResources>1024mb,4vcores</minResources>");
951+
out.println(" </queue>");
952+
out.println("</queue>");
953+
out.println("</allocations>");
954+
out.close();
955+
956+
QueueManager queueManager = scheduler.getQueueManager();
957+
queueManager.initialize();
958+
959+
FSQueue root = queueManager.getRootQueue();
960+
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
961+
962+
assertNotNull(queueManager.getLeafQueue("child1", false));
963+
assertNotNull(queueManager.getLeafQueue("child2", false));
964+
}
965+
966+
/**
967+
* Verify that you can't place queues at the same level as the root queue in
968+
* the allocations file.
969+
*/
970+
@Test (expected = AllocationConfigurationException.class)
971+
public void testQueueAlongsideRoot() throws Exception {
972+
Configuration conf = createConfiguration();
973+
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
974+
scheduler.reinitialize(conf, resourceManager.getRMContext());
975+
976+
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
977+
out.println("<?xml version=\"1.0\"?>");
978+
out.println("<allocations>");
979+
out.println("<queue name=\"root\">");
980+
out.println("</queue>");
981+
out.println("<queue name=\"other\">");
982+
out.println("</queue>");
983+
out.println("</allocations>");
984+
out.close();
985+
986+
QueueManager queueManager = scheduler.getQueueManager();
987+
queueManager.initialize();
988+
}
989+
934990
@Test
935991
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
936992
Configuration conf = createConfiguration();

0 commit comments

Comments
 (0)