Skip to content

Commit aec2c22

Browse files
committed
YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting resources when cluster is free (Karthik Kambatla via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1597209 13f79535-47bb-0310-9956-ffa450edef68
1 parent 9869b0b commit aec2c22

File tree

7 files changed

+397
-147
lines changed

7 files changed

+397
-147
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ Release 2.5.0 - UNRELEASED
104104

105105
YARN-2059. Added admin ACLs support to Timeline Server. (Zhijie Shen via
106106
vinodkv)
107+
108+
YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting
109+
resources when cluster is free (Karthik Kambatla via Sandy Ryza)
107110

108111
OPTIMIZATIONS
109112

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,11 @@ public class FairScheduler extends
148148
// Time we last ran preemptTasksIfNecessary
149149
private long lastPreemptCheckTime;
150150

151-
// How often tasks are preempted
151+
// Preemption related variables
152+
protected boolean preemptionEnabled;
153+
protected float preemptionUtilizationThreshold;
154+
155+
// How often tasks are preempted
152156
protected long preemptionInterval;
153157

154158
// ms to wait before force killing stuff (must be longer than a couple
@@ -158,7 +162,6 @@ public class FairScheduler extends
158162
// Containers whose AMs have been warned that they will be preempted soon.
159163
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
160164

161-
protected boolean preemptionEnabled;
162165
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
163166
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
164167
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
@@ -318,7 +321,7 @@ boolean isStarvedForFairShare(FSLeafQueue sched) {
318321
* and then select the right ones using preemptTasks.
319322
*/
320323
protected synchronized void preemptTasksIfNecessary() {
321-
if (!preemptionEnabled) {
324+
if (!shouldAttemptPreemption()) {
322325
return;
323326
}
324327

@@ -328,10 +331,9 @@ protected synchronized void preemptTasksIfNecessary() {
328331
}
329332
lastPreemptCheckTime = curTime;
330333

331-
Resource resToPreempt = Resources.none();
332-
334+
Resource resToPreempt = Resources.clone(Resources.none());
333335
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
334-
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
336+
Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
335337
}
336338
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
337339
Resources.none())) {
@@ -1067,6 +1069,22 @@ private void updateRootQueueMetrics() {
10671069
clusterResource, rootMetrics.getAllocatedResources()));
10681070
}
10691071

1072+
/**
1073+
* Check if preemption is enabled and the utilization threshold for
1074+
* preemption is met.
1075+
*
1076+
* @return true if preemption should be attempted, false otherwise.
1077+
*/
1078+
private boolean shouldAttemptPreemption() {
1079+
if (preemptionEnabled) {
1080+
return (preemptionUtilizationThreshold < Math.max(
1081+
(float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
1082+
(float) rootMetrics.getAvailableVirtualCores() /
1083+
clusterResource.getVirtualCores()));
1084+
}
1085+
return false;
1086+
}
1087+
10701088
@Override
10711089
public QueueMetrics getRootQueueMetrics() {
10721090
return rootMetrics;
@@ -1172,6 +1190,8 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
11721190
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
11731191
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
11741192
preemptionEnabled = this.conf.getPreemptionEnabled();
1193+
preemptionUtilizationThreshold =
1194+
this.conf.getPreemptionUtilizationThreshold();
11751195
assignMultiple = this.conf.getAssignMultiple();
11761196
maxAssign = this.conf.getMaxAssign();
11771197
sizeBasedWeight = this.conf.getSizeBasedWeight();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public class FairSchedulerConfiguration extends Configuration {
101101
/** Whether preemption is enabled. */
102102
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
103103
protected static final boolean DEFAULT_PREEMPTION = false;
104+
105+
protected static final String PREEMPTION_THRESHOLD =
106+
CONF_PREFIX + "preemption.cluster-utilization-threshold";
107+
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
104108

105109
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
106110
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
@@ -185,6 +189,10 @@ public boolean getPreemptionEnabled() {
185189
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
186190
}
187191

192+
public float getPreemptionUtilizationThreshold() {
193+
return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
194+
}
195+
188196
public boolean getAssignMultiple() {
189197
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
190198
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
22+
import org.apache.hadoop.yarn.api.records.ApplicationId;
23+
import org.apache.hadoop.yarn.api.records.ContainerId;
24+
import org.apache.hadoop.yarn.api.records.Priority;
25+
import org.apache.hadoop.yarn.api.records.ResourceRequest;
26+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
27+
import org.apache.hadoop.yarn.factories.RecordFactory;
28+
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
29+
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
30+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
31+
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
32+
import org.apache.hadoop.yarn.util.Clock;
33+
34+
import java.io.File;
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
38+
public class FairSchedulerTestBase {
39+
protected static class MockClock implements Clock {
40+
private long time = 0;
41+
@Override
42+
public long getTime() {
43+
return time;
44+
}
45+
46+
public void tick(int seconds) {
47+
time = time + seconds * 1000;
48+
}
49+
}
50+
51+
protected final static String TEST_DIR =
52+
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
53+
54+
private static RecordFactory
55+
recordFactory = RecordFactoryProvider.getRecordFactory(null);
56+
57+
protected int APP_ID = 1; // Incrementing counter for scheduling apps
58+
protected int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
59+
60+
protected Configuration conf;
61+
protected FairScheduler scheduler;
62+
protected ResourceManager resourceManager;
63+
64+
// Helper methods
65+
protected Configuration createConfiguration() {
66+
Configuration conf = new YarnConfiguration();
67+
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
68+
ResourceScheduler.class);
69+
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
70+
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
71+
1024);
72+
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
73+
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
74+
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
75+
return conf;
76+
}
77+
78+
protected ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
79+
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
80+
return ApplicationAttemptId.newInstance(appIdImpl, attemptId);
81+
}
82+
83+
protected ResourceRequest createResourceRequest(
84+
int memory, String host, int priority, int numContainers,
85+
boolean relaxLocality) {
86+
return createResourceRequest(memory, 1, host, priority, numContainers,
87+
relaxLocality);
88+
}
89+
90+
protected ResourceRequest createResourceRequest(
91+
int memory, int vcores, String host, int priority, int numContainers,
92+
boolean relaxLocality) {
93+
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
94+
request.setCapability(BuilderUtils.newResource(memory, vcores));
95+
request.setResourceName(host);
96+
request.setNumContainers(numContainers);
97+
Priority prio = recordFactory.newRecordInstance(Priority.class);
98+
prio.setPriority(priority);
99+
request.setPriority(prio);
100+
request.setRelaxLocality(relaxLocality);
101+
return request;
102+
}
103+
104+
/**
105+
* Creates a single container priority-1 request and submits to
106+
* scheduler.
107+
*/
108+
protected ApplicationAttemptId createSchedulingRequest(
109+
int memory, String queueId, String userId) {
110+
return createSchedulingRequest(memory, queueId, userId, 1);
111+
}
112+
113+
protected ApplicationAttemptId createSchedulingRequest(
114+
int memory, int vcores, String queueId, String userId) {
115+
return createSchedulingRequest(memory, vcores, queueId, userId, 1);
116+
}
117+
118+
protected ApplicationAttemptId createSchedulingRequest(
119+
int memory, String queueId, String userId, int numContainers) {
120+
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
121+
}
122+
123+
protected ApplicationAttemptId createSchedulingRequest(
124+
int memory, int vcores, String queueId, String userId, int numContainers) {
125+
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
126+
}
127+
128+
protected ApplicationAttemptId createSchedulingRequest(
129+
int memory, String queueId, String userId, int numContainers, int priority) {
130+
return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
131+
priority);
132+
}
133+
134+
protected ApplicationAttemptId createSchedulingRequest(
135+
int memory, int vcores, String queueId, String userId, int numContainers,
136+
int priority) {
137+
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
138+
scheduler.addApplication(id.getApplicationId(), queueId, userId);
139+
// This conditional is for testAclSubmitApplication where app is rejected
140+
// and no app is added.
141+
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
142+
scheduler.addApplicationAttempt(id, false);
143+
}
144+
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
145+
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
146+
priority, numContainers, true);
147+
ask.add(request);
148+
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
149+
return id;
150+
}
151+
152+
protected void createSchedulingRequestExistingApplication(
153+
int memory, int priority, ApplicationAttemptId attId) {
154+
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
155+
priority, 1, true);
156+
createSchedulingRequestExistingApplication(request, attId);
157+
}
158+
159+
protected void createSchedulingRequestExistingApplication(
160+
int memory, int vcores, int priority, ApplicationAttemptId attId) {
161+
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
162+
priority, 1, true);
163+
createSchedulingRequestExistingApplication(request, attId);
164+
}
165+
166+
protected void createSchedulingRequestExistingApplication(
167+
ResourceRequest request, ApplicationAttemptId attId) {
168+
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
169+
ask.add(request);
170+
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
171+
}
172+
}

0 commit comments

Comments
 (0)