Skip to content

Commit 124093d

Browse files
committed
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1531222 13f79535-47bb-0310-9956-ffa450edef68
1 parent a1dedcb commit 124093d

File tree

4 files changed

+85
-8
lines changed

4 files changed

+85
-8
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ Release 2.3.0 - UNRELEASED
3434

3535
IMPROVEMENTS
3636

37+
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
38+
3739
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
3840

3941
YARN-1098. Separate out RM services into Always On and Active (Karthik

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ public class ApplicationMaster {
180180
private int numTotalContainers = 1;
181181
// Memory to request for the container on which the shell command will run
182182
private int containerMemory = 10;
183+
// VirtualCores to request for the container on which the shell command will run
184+
private int containerVirtualCores = 1;
183185
// Priority of the request
184186
private int requestPriority;
185187

@@ -309,6 +311,8 @@ public boolean init(String[] args) throws ParseException, IOException {
309311
"Environment for shell script. Specified as env_key=env_val pairs");
310312
opts.addOption("container_memory", true,
311313
"Amount of memory in MB to be requested to run the shell command");
314+
opts.addOption("container_vcores", true,
315+
"Amount of virtual cores to be requested to run the shell command");
312316
opts.addOption("num_containers", true,
313317
"No. of containers on which the shell command needs to be executed");
314318
opts.addOption("priority", true, "Application Priority. Default 0");
@@ -421,6 +425,8 @@ public boolean init(String[] args) throws ParseException, IOException {
421425

422426
containerMemory = Integer.parseInt(cliParser.getOptionValue(
423427
"container_memory", "10"));
428+
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
429+
"container_vcores", "1"));
424430
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
425431
"num_containers", "1"));
426432
if (numTotalContainers == 0) {
@@ -492,6 +498,9 @@ public boolean run() throws YarnException, IOException {
492498
// resource manager
493499
int maxMem = response.getMaximumResourceCapability().getMemory();
494500
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
501+
502+
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
503+
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
495504

496505
// A resource ask cannot exceed the max.
497506
if (containerMemory > maxMem) {
@@ -501,6 +510,13 @@ public boolean run() throws YarnException, IOException {
501510
containerMemory = maxMem;
502511
}
503512

513+
if (containerVirtualCores > maxVCores) {
514+
LOG.info("Container virtual cores specified above max threshold of cluster."
515+
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
516+
+ maxVCores);
517+
containerVirtualCores = maxVCores;
518+
}
519+
504520
// Setup ask for containers from RM
505521
// Send request for containers to RM
506522
// Until we get our fully allocated quota, we keep on polling RM for
@@ -645,7 +661,9 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
645661
+ ":" + allocatedContainer.getNodeId().getPort()
646662
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
647663
+ ", containerResourceMemory"
648-
+ allocatedContainer.getResource().getMemory());
664+
+ allocatedContainer.getResource().getMemory()
665+
+ ", containerResourceVirtualCores"
666+
+ allocatedContainer.getResource().getVirtualCores());
649667
// + ", containerToken"
650668
// +allocatedContainer.getContainerToken().getIdentifier().toString());
651669

@@ -872,9 +890,10 @@ private ContainerRequest setupContainerAskForRM() {
872890
pri.setPriority(requestPriority);
873891

874892
// Set up resource type requirements
875-
// For now, only memory is supported so we set memory requirements
893+
// For now, memory and CPU are supported so we set memory and cpu requirements
876894
Resource capability = Records.newRecord(Resource.class);
877895
capability.setMemory(containerMemory);
896+
capability.setVirtualCores(containerVirtualCores);
878897

879898
ContainerRequest request = new ContainerRequest(capability, null, null,
880899
pri);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ public class Client {
121121
private String amQueue = "";
122122
// Amt. of memory resource to request for to run the App Master
123123
private int amMemory = 10;
124+
// Amt. of virtual core resource to request for to run the App Master
125+
private int amVCores = 1;
124126

125127
// Application master jar file
126128
private String appMasterJar = "";
@@ -140,6 +142,8 @@ public class Client {
140142

141143
// Amt of memory to request for container in which shell script will be executed
142144
private int containerMemory = 10;
145+
// Amt. of virtual cores to request for container in which shell script will be executed
146+
private int containerVirtualCores = 1;
143147
// No. of containers in which the shell script needs to be executed
144148
private int numContainers = 1;
145149

@@ -208,13 +212,15 @@ public Client(Configuration conf) throws Exception {
208212
opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
209213
opts.addOption("timeout", true, "Application timeout in milliseconds");
210214
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
215+
opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
211216
opts.addOption("jar", true, "Jar file containing the application master");
212217
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
213218
opts.addOption("shell_script", true, "Location of the shell script to be executed");
214219
opts.addOption("shell_args", true, "Command line args for the shell script");
215220
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
216221
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
217222
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
223+
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
218224
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
219225
opts.addOption("log_properties", true, "log4j.properties file");
220226
opts.addOption("debug", false, "Dump out debug information");
@@ -263,11 +269,16 @@ public boolean init(String[] args) throws ParseException {
263269
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
264270
amQueue = cliParser.getOptionValue("queue", "default");
265271
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
266-
272+
amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
273+
267274
if (amMemory < 0) {
268275
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
269276
+ " Specified memory=" + amMemory);
270277
}
278+
if (amVCores < 0) {
279+
throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
280+
+ " Specified virtual cores=" + amVCores);
281+
}
271282

272283
if (!cliParser.hasOption("jar")) {
273284
throw new IllegalArgumentException("No jar file specified for application master");
@@ -306,11 +317,14 @@ public boolean init(String[] args) throws ParseException {
306317
shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
307318

308319
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
320+
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
309321
numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
310322

311-
if (containerMemory < 0 || numContainers < 1) {
312-
throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
323+
if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
324+
throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
325+
+ " exiting."
313326
+ " Specified containerMemory=" + containerMemory
327+
+ ", containerVirtualCores=" + containerVirtualCores
314328
+ ", numContainer=" + numContainers);
315329
}
316330

@@ -383,6 +397,16 @@ public boolean run() throws IOException, YarnException {
383397
amMemory = maxMem;
384398
}
385399

400+
int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
401+
LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores);
402+
403+
if (amVCores > maxVCores) {
404+
LOG.info("AM virtual cores specified above max threshold of cluster. "
405+
+ "Using max value." + ", specified=" + amVCores
406+
+ ", max=" + maxVCores);
407+
amVCores = maxVCores;
408+
}
409+
386410
// set the application name
387411
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
388412
ApplicationId appId = appContext.getApplicationId();
@@ -514,6 +538,7 @@ public boolean run() throws IOException, YarnException {
514538
vargs.add(appMasterMainClass);
515539
// Set params for Application Master
516540
vargs.add("--container_memory " + String.valueOf(containerMemory));
541+
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
517542
vargs.add("--num_containers " + String.valueOf(numContainers));
518543
vargs.add("--priority " + String.valueOf(shellCmdPriority));
519544
if (!shellCommand.isEmpty()) {
@@ -544,9 +569,11 @@ public boolean run() throws IOException, YarnException {
544569
amContainer.setCommands(commands);
545570

546571
// Set up resource type requirements
547-
// For now, only memory is supported so we set memory requirements
572+
// For now, both memory and vcores are supported, so we set memory and
573+
// vcores requirements
548574
Resource capability = Records.newRecord(Resource.class);
549575
capability.setMemory(amMemory);
576+
capability.setVirtualCores(amVCores);
550577
appContext.setResource(capability);
551578

552579
// Service data is a binary blob that can be passed to the application

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public static void tearDown() throws IOException {
105105
}
106106
}
107107
}
108-
108+
109109
@Test(timeout=90000)
110110
public void testDSShell() throws Exception {
111111

@@ -118,8 +118,12 @@ public void testDSShell() throws Exception {
118118
Shell.WINDOWS ? "dir" : "ls",
119119
"--master_memory",
120120
"512",
121+
"--master_vcores",
122+
"2",
121123
"--container_memory",
122-
"128"
124+
"128",
125+
"--container_vcores",
126+
"1"
123127
};
124128

125129
LOG.info("Initializing DS Client");
@@ -237,6 +241,31 @@ public void testDSShellWithInvalidArgs() throws Exception {
237241
Assert.assertTrue("The throw exception is not expected",
238242
e.getMessage().contains("Invalid no. of containers"));
239243
}
244+
245+
LOG.info("Initializing DS Client with invalid no. of vcores");
246+
try {
247+
String[] args = {
248+
"--jar",
249+
APPMASTER_JAR,
250+
"--num_containers",
251+
"2",
252+
"--shell_command",
253+
Shell.WINDOWS ? "dir" : "ls",
254+
"--master_memory",
255+
"512",
256+
"--master_vcores",
257+
"-2",
258+
"--container_memory",
259+
"128",
260+
"--container_vcores",
261+
"1"
262+
};
263+
client.init(args);
264+
Assert.fail("Exception is expected");
265+
} catch (IllegalArgumentException e) {
266+
Assert.assertTrue("The throw exception is not expected",
267+
e.getMessage().contains("Invalid virtual cores specified"));
268+
}
240269
}
241270

242271
protected static void waitForNMToRegister(NodeManager nm)

0 commit comments

Comments
 (0)