Skip to content

Commit a13e5e3

Browse files
committed
YARN-1498. Common scheduler changes for moving apps between queues (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563021 13f79535-47bb-0310-9956-ffa450edef68
1 parent 5133e9b commit a13e5e3

File tree

10 files changed

+284
-26
lines changed

10 files changed

+284
-26
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ Trunk - Unreleased
99
YARN-1496. Protocol additions to allow moving apps between queues (Sandy
1010
Ryza)
1111

12+
YARN-1498. Common scheduler changes for moving apps between queues (Sandy
13+
Ryza)
14+
1215
IMPROVEMENTS
1316

1417
OPTIMIZATIONS

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class AppSchedulingInfo {
6464
private Set<String> blacklist = new HashSet<String>();
6565

6666
//private final ApplicationStore store;
67-
private final ActiveUsersManager activeUsersManager;
67+
private ActiveUsersManager activeUsersManager;
6868

6969
/* Allocated by scheduler */
7070
boolean pending = true; // for app metrics
@@ -171,11 +171,10 @@ synchronized public void updateResourceRequests(
171171
.getNumContainers() : 0;
172172
Resource lastRequestCapability = lastRequest != null ? lastRequest
173173
.getCapability() : Resources.none();
174-
metrics.incrPendingResources(user, request.getNumContainers()
175-
- lastRequestContainers, Resources.subtractFrom( // save a clone
176-
Resources.multiply(request.getCapability(), request
177-
.getNumContainers()), Resources.multiply(lastRequestCapability,
178-
lastRequestContainers)));
174+
metrics.incrPendingResources(user, request.getNumContainers(),
175+
request.getCapability());
176+
metrics.decrPendingResources(user, lastRequestContainers,
177+
lastRequestCapability);
179178
}
180179
}
181180
}
@@ -262,14 +261,15 @@ synchronized public void allocate(NodeType type, SchedulerNode node,
262261
pending = false;
263262
metrics.runAppAttempt(applicationId, user);
264263
}
264+
265265
if (LOG.isDebugEnabled()) {
266266
LOG.debug("allocate: applicationId=" + applicationId
267267
+ " container=" + container.getId()
268268
+ " host=" + container.getNodeId().toString()
269269
+ " user=" + user
270270
+ " resource=" + request.getCapability());
271271
}
272-
metrics.allocateResources(user, 1, request.getCapability());
272+
metrics.allocateResources(user, 1, request.getCapability(), true);
273273
}
274274

275275
/**
@@ -359,15 +359,34 @@ synchronized private void checkForDeactivation() {
359359
}
360360
}
361361

362+
synchronized public void move(Queue newQueue) {
363+
QueueMetrics oldMetrics = queue.getMetrics();
364+
QueueMetrics newMetrics = newQueue.getMetrics();
365+
for (Map<String, ResourceRequest> asks : requests.values()) {
366+
ResourceRequest request = asks.get(ResourceRequest.ANY);
367+
if (request != null) {
368+
oldMetrics.decrPendingResources(user, request.getNumContainers(),
369+
request.getCapability());
370+
newMetrics.incrPendingResources(user, request.getNumContainers(),
371+
request.getCapability());
372+
}
373+
}
374+
oldMetrics.moveAppFrom(this);
375+
newMetrics.moveAppTo(this);
376+
activeUsersManager.deactivateApplication(user, applicationId);
377+
activeUsersManager = newQueue.getActiveUsersManager();
378+
activeUsersManager.activateApplication(user, applicationId);
379+
this.queue = newQueue;
380+
}
381+
362382
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
363383
// clear pending resources metrics for the application
364384
QueueMetrics metrics = queue.getMetrics();
365385
for (Map<String, ResourceRequest> asks : requests.values()) {
366386
ResourceRequest request = asks.get(ResourceRequest.ANY);
367387
if (request != null) {
368388
metrics.decrPendingResources(user, request.getNumContainers(),
369-
Resources.multiply(request.getCapability(), request
370-
.getNumContainers()));
389+
request.getCapability());
371390
}
372391
}
373392
metrics.finishAppAttempt(applicationId, pending, user);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ public interface Queue {
5858
List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
5959

6060
boolean hasAccess(QueueACL acl, UserGroupInformation user);
61+
62+
public ActiveUsersManager getActiveUsersManager();
6163
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,36 @@ public void finishApp(String user, RMAppState rmAppFinalState) {
280280
parent.finishApp(user, rmAppFinalState);
281281
}
282282
}
283+
284+
public void moveAppFrom(AppSchedulingInfo app) {
285+
if (app.isPending()) {
286+
appsPending.decr();
287+
} else {
288+
appsRunning.decr();
289+
}
290+
QueueMetrics userMetrics = getUserMetrics(app.getUser());
291+
if (userMetrics != null) {
292+
userMetrics.moveAppFrom(app);
293+
}
294+
if (parent != null) {
295+
parent.moveAppFrom(app);
296+
}
297+
}
298+
299+
public void moveAppTo(AppSchedulingInfo app) {
300+
if (app.isPending()) {
301+
appsPending.incr();
302+
} else {
303+
appsRunning.incr();
304+
}
305+
QueueMetrics userMetrics = getUserMetrics(app.getUser());
306+
if (userMetrics != null) {
307+
userMetrics.moveAppTo(app);
308+
}
309+
if (parent != null) {
310+
parent.moveAppTo(app);
311+
}
312+
}
283313

284314
/**
285315
* Set available resources. To be called by scheduler periodically as
@@ -324,8 +354,8 @@ public void incrPendingResources(String user, int containers, Resource res) {
324354

325355
private void _incrPendingResources(int containers, Resource res) {
326356
pendingContainers.incr(containers);
327-
pendingMB.incr(res.getMemory());
328-
pendingVCores.incr(res.getVirtualCores());
357+
pendingMB.incr(res.getMemory() * containers);
358+
pendingVCores.incr(res.getVirtualCores() * containers);
329359
}
330360

331361
public void decrPendingResources(String user, int containers, Resource res) {
@@ -341,22 +371,25 @@ public void decrPendingResources(String user, int containers, Resource res) {
341371

342372
private void _decrPendingResources(int containers, Resource res) {
343373
pendingContainers.decr(containers);
344-
pendingMB.decr(res.getMemory());
345-
pendingVCores.decr(res.getVirtualCores());
374+
pendingMB.decr(res.getMemory() * containers);
375+
pendingVCores.decr(res.getVirtualCores() * containers);
346376
}
347377

348-
public void allocateResources(String user, int containers, Resource res) {
378+
public void allocateResources(String user, int containers, Resource res,
379+
boolean decrPending) {
349380
allocatedContainers.incr(containers);
350381
aggregateContainersAllocated.incr(containers);
351382
allocatedMB.incr(res.getMemory() * containers);
352383
allocatedVCores.incr(res.getVirtualCores() * containers);
353-
_decrPendingResources(containers, Resources.multiply(res, containers));
384+
if (decrPending) {
385+
_decrPendingResources(containers, res);
386+
}
354387
QueueMetrics userMetrics = getUserMetrics(user);
355388
if (userMetrics != null) {
356-
userMetrics.allocateResources(user, containers, res);
389+
userMetrics.allocateResources(user, containers, res, decrPending);
357390
}
358391
if (parent != null) {
359-
parent.allocateResources(user, containers, res);
392+
parent.allocateResources(user, containers, res, decrPending);
360393
}
361394
}
362395

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
*/
5858
@Private
5959
@Unstable
60-
public abstract class SchedulerApplicationAttempt {
60+
public class SchedulerApplicationAttempt {
6161

6262
private static final Log LOG = LogFactory
6363
.getLog(SchedulerApplicationAttempt.class);
@@ -91,7 +91,7 @@ public abstract class SchedulerApplicationAttempt {
9191
protected Map<Priority, Long> lastScheduledContainer =
9292
new HashMap<Priority, Long>();
9393

94-
protected final Queue queue;
94+
protected Queue queue;
9595
protected boolean isStopped = false;
9696

9797
protected final RMContext rmContext;
@@ -431,4 +431,25 @@ public synchronized void transferStateFromPreviousAttempt(
431431
this.appSchedulingInfo
432432
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
433433
}
434+
435+
public void move(Queue newQueue) {
436+
QueueMetrics oldMetrics = queue.getMetrics();
437+
QueueMetrics newMetrics = newQueue.getMetrics();
438+
String user = getUser();
439+
for (RMContainer liveContainer : liveContainers.values()) {
440+
Resource resource = liveContainer.getContainer().getResource();
441+
oldMetrics.releaseResources(user, 1, resource);
442+
newMetrics.allocateResources(user, 1, resource, false);
443+
}
444+
for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
445+
for (RMContainer reservedContainer : map.values()) {
446+
Resource resource = reservedContainer.getReservedResource();
447+
oldMetrics.unreserveResource(user, resource);
448+
newMetrics.reserveResource(user, resource);
449+
}
450+
}
451+
452+
appSchedulingInfo.move(newQueue);
453+
this.queue = newQueue;
454+
}
434455
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.yarn.api.records.QueueACL;
3434
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
3535
import org.apache.hadoop.yarn.api.records.Resource;
36+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
3637
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
3738
import org.apache.hadoop.yarn.util.resource.Resources;
3839
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue {
5455
private long lastTimeAtMinShare;
5556
private long lastTimeAtHalfFairShare;
5657

58+
private final ActiveUsersManager activeUsersManager;
59+
5760
public FSLeafQueue(String name, FairScheduler scheduler,
5861
FSParentQueue parent) {
5962
super(name, scheduler, parent);
6063
this.lastTimeAtMinShare = scheduler.getClock().getTime();
6164
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
65+
activeUsersManager = new ActiveUsersManager(getMetrics());
6266
}
6367

6468
public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -245,4 +249,9 @@ public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
245249
public int getNumRunnableApps() {
246250
return runnableAppScheds.size();
247251
}
252+
253+
@Override
254+
public ActiveUsersManager getActiveUsersManager() {
255+
return activeUsersManager;
256+
}
248257
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
3434
import org.apache.hadoop.yarn.api.records.Resource;
3535
import org.apache.hadoop.yarn.util.resource.Resources;
36-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
36+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
3737

3838
@Private
3939
@Unstable
@@ -194,4 +194,10 @@ public void collectSchedulerApplications(
194194
childQueue.collectSchedulerApplications(apps);
195195
}
196196
}
197+
198+
@Override
199+
public ActiveUsersManager getActiveUsersManager() {
200+
// Should never be called since all applications are submitted to LeafQueues
201+
return null;
202+
}
197203
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ public List<QueueUserACLInfo> getQueueUserAclInfo(
184184
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
185185
return getQueueAcls().get(acl).isUserAllowed(user);
186186
}
187+
188+
@Override
189+
public ActiveUsersManager getActiveUsersManager() {
190+
return activeUsersManager;
191+
}
187192
};
188193

189194
@Override

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,15 @@ public void setUp() {
7373
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
7474

7575
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
76-
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
76+
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
7777
// Available resources is set externally, as it depends on dynamic
7878
// configurable cluster/queue resources
7979
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
8080

8181
metrics.runAppAttempt(app.getApplicationId(), user);
8282
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
8383

84-
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
84+
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
8585
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
8686

8787
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
@@ -171,7 +171,7 @@ public void testQueueAppMetricsForMultipleFailures() {
171171

172172
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
173173
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
174-
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
174+
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
175175
// Available resources is set externally, as it depends on dynamic
176176
// configurable cluster/queue resources
177177
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@@ -181,7 +181,7 @@ public void testQueueAppMetricsForMultipleFailures() {
181181
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
182182
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
183183

184-
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
184+
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
185185
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
186186
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
187187

@@ -232,7 +232,7 @@ public void testQueueAppMetricsForMultipleFailures() {
232232
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
233233
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
234234
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
235-
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
235+
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
236236
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
237237
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
238238
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
@@ -242,7 +242,7 @@ public void testQueueAppMetricsForMultipleFailures() {
242242
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
243243
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
244244

245-
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
245+
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
246246
metrics.reserveResource(user, Resources.createResource(3*GB, 3));
247247
// Available resources is set externally, as it depends on dynamic
248248
// configurable cluster/queue resources

0 commit comments

Comments
 (0)