Skip to content

Commit dc32432

Browse files
committed
YARN-596. Use scheduling policies throughout the queue hierarchy to decide which containers to preempt (Wei Yan via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1598197 13f79535-47bb-0310-9956-ffa450edef68
1 parent e9e1b3f commit dc32432

File tree

15 files changed

+309
-162
lines changed

15 files changed

+309
-162
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ Release 2.5.0 - UNRELEASED
114114
YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package. (Vinod
115115
Kumar Vavilapalli via zjshen)
116116

117+
YARN-596. Use scheduling policies throughout the queue hierarchy to decide
118+
which containers to preempt (Wei Yan via Sandy Ryza)
119+
117120
OPTIMIZATIONS
118121

119122
BUG FIXES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
2020

21+
import java.io.Serializable;
2122
import java.util.Arrays;
2223
import java.util.Collection;
24+
import java.util.Comparator;
2325

2426
import org.apache.commons.logging.Log;
2527
import org.apache.commons.logging.LogFactory;
@@ -31,8 +33,6 @@
3133
import org.apache.hadoop.yarn.api.records.Priority;
3234
import org.apache.hadoop.yarn.api.records.Resource;
3335
import org.apache.hadoop.yarn.api.records.ResourceRequest;
34-
import org.apache.hadoop.yarn.factories.RecordFactory;
35-
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
3636
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
3737
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
3838
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -58,6 +58,8 @@ public class AppSchedulable extends Schedulable {
5858
private Priority priority;
5959
private ResourceWeights resourceWeights;
6060

61+
private RMContainerComparator comparator = new RMContainerComparator();
62+
6163
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
6264
this.scheduler = scheduler;
6365
this.app = app;
@@ -111,7 +113,10 @@ public long getStartTime() {
111113

112114
@Override
113115
public Resource getResourceUsage() {
114-
return app.getCurrentConsumption();
116+
// Here the getPreemptedResources() always return zero, except in
117+
// a preemption round
118+
return Resources.subtract(app.getCurrentConsumption(),
119+
app.getPreemptedResources());
115120
}
116121

117122

@@ -383,6 +388,27 @@ public Resource assignContainer(FSSchedulerNode node) {
383388
return assignContainer(node, false);
384389
}
385390

391+
/**
392+
* Preempt a running container according to the priority
393+
*/
394+
@Override
395+
public RMContainer preemptContainer() {
396+
if (LOG.isDebugEnabled()) {
397+
LOG.debug("App " + getName() + " is going to preempt a running " +
398+
"container");
399+
}
400+
401+
RMContainer toBePreempted = null;
402+
for (RMContainer container : app.getLiveContainers()) {
403+
if (! app.getPreemptionContainers().contains(container) &&
404+
(toBePreempted == null ||
405+
comparator.compare(toBePreempted, container) > 0)) {
406+
toBePreempted = container;
407+
}
408+
}
409+
return toBePreempted;
410+
}
411+
386412
/**
387413
* Whether this app has containers requests that could be satisfied on the
388414
* given node, if the node had full space.
@@ -407,4 +433,17 @@ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
407433
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
408434
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
409435
}
436+
437+
static class RMContainerComparator implements Comparator<RMContainer>,
438+
Serializable {
439+
@Override
440+
public int compare(RMContainer c1, RMContainer c2) {
441+
int ret = c1.getContainer().getPriority().compareTo(
442+
c2.getContainer().getPriority());
443+
if (ret == 0) {
444+
return c2.getContainerId().compareTo(c1.getContainerId());
445+
}
446+
return ret;
447+
}
448+
}
410449
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
import org.apache.hadoop.yarn.api.records.QueueACL;
3434
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
3535
import org.apache.hadoop.yarn.api.records.Resource;
36+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
3637
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
3738
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
3839
import org.apache.hadoop.yarn.util.resource.Resources;
39-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
4040

4141
@Private
4242
@Unstable
@@ -208,6 +208,36 @@ public Resource assignContainer(FSSchedulerNode node) {
208208
return assigned;
209209
}
210210

211+
@Override
212+
public RMContainer preemptContainer() {
213+
RMContainer toBePreempted = null;
214+
if (LOG.isDebugEnabled()) {
215+
LOG.debug("Queue " + getName() + " is going to preempt a container " +
216+
"from its applications.");
217+
}
218+
219+
// If this queue is not over its fair share, reject
220+
if (!preemptContainerPreCheck()) {
221+
return toBePreempted;
222+
}
223+
224+
// Choose the app that is most over fair share
225+
Comparator<Schedulable> comparator = policy.getComparator();
226+
AppSchedulable candidateSched = null;
227+
for (AppSchedulable sched : runnableAppScheds) {
228+
if (candidateSched == null ||
229+
comparator.compare(sched, candidateSched) > 0) {
230+
candidateSched = sched;
231+
}
232+
}
233+
234+
// Preempt from the selected app
235+
if (candidateSched != null) {
236+
toBePreempted = candidateSched.preemptContainer();
237+
}
238+
return toBePreempted;
239+
}
240+
211241
@Override
212242
public List<FSQueue> getChildQueues() {
213243
return new ArrayList<FSQueue>(1);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.ArrayList;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.Comparator;
2425
import java.util.List;
2526

2627
import org.apache.commons.logging.Log;
@@ -32,6 +33,7 @@
3233
import org.apache.hadoop.yarn.api.records.QueueACL;
3334
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
3435
import org.apache.hadoop.yarn.api.records.Resource;
36+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
3537
import org.apache.hadoop.yarn.util.resource.Resources;
3638
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
3739

@@ -156,6 +158,32 @@ public Resource assignContainer(FSSchedulerNode node) {
156158
return assigned;
157159
}
158160

161+
@Override
162+
public RMContainer preemptContainer() {
163+
RMContainer toBePreempted = null;
164+
165+
// If this queue is not over its fair share, reject
166+
if (!preemptContainerPreCheck()) {
167+
return toBePreempted;
168+
}
169+
170+
// Find the childQueue which is most over fair share
171+
FSQueue candidateQueue = null;
172+
Comparator<Schedulable> comparator = policy.getComparator();
173+
for (FSQueue queue : childQueues) {
174+
if (candidateQueue == null ||
175+
comparator.compare(queue, candidateQueue) > 0) {
176+
candidateQueue = queue;
177+
}
178+
}
179+
180+
// Let the selected queue choose which of its container to preempt
181+
if (candidateQueue != null) {
182+
toBePreempted = candidateQueue.preemptContainer();
183+
}
184+
return toBePreempted;
185+
}
186+
159187
@Override
160188
public List<FSQueue> getChildQueues() {
161189
return childQueues;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,17 @@ protected boolean assignContainerPreCheck(FSSchedulerNode node) {
187187
}
188188
return true;
189189
}
190+
191+
/**
192+
* Helper method to check if the queue should preempt containers
193+
*
194+
* @return true if check passes (can preempt) or false otherwise
195+
*/
196+
protected boolean preemptContainerPreCheck() {
197+
if (this == scheduler.getQueueManager().getRootQueue()) {
198+
return true;
199+
}
200+
return parent.getPolicy()
201+
.checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
202+
}
190203
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
5959
private AppSchedulable appSchedulable;
6060

6161
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
62+
63+
private Resource preemptedResources = Resources.createResource(0);
6264

6365
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
6466
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
@@ -316,6 +318,7 @@ public synchronized void resetAllowedLocalityLevel(Priority priority,
316318
public void addPreemption(RMContainer container, long time) {
317319
assert preemptionMap.get(container) == null;
318320
preemptionMap.put(container, time);
321+
Resources.addTo(preemptedResources, container.getAllocatedResource());
319322
}
320323

321324
public Long getContainerPreemptionTime(RMContainer container) {
@@ -330,4 +333,20 @@ public Set<RMContainer> getPreemptionContainers() {
330333
public FSLeafQueue getQueue() {
331334
return (FSLeafQueue)super.getQueue();
332335
}
336+
337+
public Resource getPreemptedResources() {
338+
return preemptedResources;
339+
}
340+
341+
public void resetPreemptedResources() {
342+
preemptedResources = Resources.createResource(0);
343+
for (RMContainer container : getPreemptionContainers()) {
344+
Resources.addTo(preemptedResources, container.getAllocatedResource());
345+
}
346+
}
347+
348+
public void clearPreemptedResources() {
349+
preemptedResources.setMemory(0);
350+
preemptedResources.setVirtualCores(0);
351+
}
333352
}

0 commit comments

Comments
 (0)