Skip to content

Commit 217d609

Browse files
committed
YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605616 13f79535-47bb-0310-9956-ffa450edef68
1 parent cf5d4b4 commit 217d609

File tree

3 files changed

+154
-7
lines changed
  • hadoop-yarn-project

3 files changed

+154
-7
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ Release 2.5.0 - UNRELEASED
192192
YARN-2152. Added missing information into ContainerTokenIdentifier so that
193193
NodeManagers can report the same to RM when RM restarts. (Jian He via vinodkv)
194194

195+
YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when
196+
AMs heartbeat in. (Jason Lowe via vinodkv)
197+
195198
OPTIMIZATIONS
196199

197200
BUG FIXES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
2020

2121
import com.google.common.base.Preconditions;
22+
2223
import java.io.IOException;
2324
import java.io.InputStream;
2425
import java.util.ArrayList;
@@ -30,6 +31,7 @@
3031
import java.util.Random;
3132
import java.util.concurrent.ConcurrentHashMap;
3233
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
3335

3436
import org.apache.commons.logging.Log;
3537
import org.apache.commons.logging.LogFactory;
@@ -180,7 +182,7 @@ public Configuration getConf() {
180182

181183
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
182184

183-
private int numNodeManagers = 0;
185+
private AtomicInteger numNodeManagers = new AtomicInteger(0);
184186

185187
private ResourceCalculator calculator;
186188
private boolean usePortForNodeName;
@@ -236,8 +238,8 @@ public Comparator<CSQueue> getQueueComparator() {
236238
}
237239

238240
@Override
239-
public synchronized int getNumClusterNodes() {
240-
return numNodeManagers;
241+
public int getNumClusterNodes() {
242+
return numNodeManagers.get();
241243
}
242244

243245
@Override
@@ -953,11 +955,11 @@ private synchronized void addNode(RMNode nodeManager) {
953955
usePortForNodeName));
954956
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
955957
root.updateClusterResource(clusterResource);
956-
++numNodeManagers;
958+
int numNodes = numNodeManagers.incrementAndGet();
957959
LOG.info("Added node " + nodeManager.getNodeAddress() +
958960
" clusterResource: " + clusterResource);
959961

960-
if (scheduleAsynchronously && numNodeManagers == 1) {
962+
if (scheduleAsynchronously && numNodes == 1) {
961963
asyncSchedulerThread.beginSchedule();
962964
}
963965
}
@@ -969,9 +971,9 @@ private synchronized void removeNode(RMNode nodeInfo) {
969971
}
970972
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
971973
root.updateClusterResource(clusterResource);
972-
--numNodeManagers;
974+
int numNodes = numNodeManagers.decrementAndGet();
973975

974-
if (scheduleAsynchronously && numNodeManagers == 0) {
976+
if (scheduleAsynchronously && numNodes == 0) {
975977
asyncSchedulerThread.suspendSchedule();
976978
}
977979

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,29 @@
2525
import static org.mockito.Mockito.when;
2626

2727
import java.io.IOException;
28+
import java.net.InetSocketAddress;
29+
import java.security.PrivilegedAction;
2830
import java.util.Collections;
2931
import java.util.Comparator;
32+
import java.util.HashMap;
3033
import java.util.List;
34+
import java.util.Map;
35+
import java.util.concurrent.BrokenBarrierException;
36+
import java.util.concurrent.CyclicBarrier;
3137

3238
import org.apache.commons.logging.Log;
3339
import org.apache.commons.logging.LogFactory;
3440
import org.apache.hadoop.conf.Configuration;
3541
import org.apache.hadoop.net.NetworkTopology;
42+
import org.apache.hadoop.security.Credentials;
43+
import org.apache.hadoop.security.UserGroupInformation;
44+
import org.apache.hadoop.security.token.Token;
45+
import org.apache.hadoop.security.token.TokenIdentifier;
3646
import org.apache.hadoop.yarn.LocalConfigurationProvider;
47+
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
48+
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
49+
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
50+
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
3751
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
3852
import org.apache.hadoop.yarn.api.records.ApplicationId;
3953
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,13 +60,20 @@
4660
import org.apache.hadoop.yarn.event.AsyncDispatcher;
4761
import org.apache.hadoop.yarn.exceptions.YarnException;
4862
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
63+
import org.apache.hadoop.yarn.ipc.YarnRPC;
4964
import org.apache.hadoop.yarn.server.resourcemanager.Application;
65+
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
5066
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
5167
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
5268
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
5369
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
5470
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
5571
import org.apache.hadoop.yarn.server.resourcemanager.Task;
72+
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
73+
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
74+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
75+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
76+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
5677
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
5778
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
5879
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -686,4 +707,125 @@ public void testAsyncScheduling() throws Exception {
686707
}
687708
}
688709

710+
@Test(timeout = 30000)
711+
public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
712+
final YarnConfiguration conf = new YarnConfiguration();
713+
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
714+
ResourceScheduler.class);
715+
MyContainerManager containerManager = new MyContainerManager();
716+
final MockRMWithAMS rm =
717+
new MockRMWithAMS(conf, containerManager);
718+
rm.start();
719+
720+
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
721+
722+
Map<ApplicationAccessType, String> acls =
723+
new HashMap<ApplicationAccessType, String>(2);
724+
acls.put(ApplicationAccessType.VIEW_APP, "*");
725+
RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
726+
727+
nm1.nodeHeartbeat(true);
728+
729+
RMAppAttempt attempt = app.getCurrentAppAttempt();
730+
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
731+
int msecToWait = 10000;
732+
int msecToSleep = 100;
733+
while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
734+
&& msecToWait > 0) {
735+
LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
736+
+ "Current state is " + attempt.getAppAttemptState());
737+
Thread.sleep(msecToSleep);
738+
msecToWait -= msecToSleep;
739+
}
740+
Assert.assertEquals(attempt.getAppAttemptState(),
741+
RMAppAttemptState.LAUNCHED);
742+
743+
// Create a client to the RM.
744+
final YarnRPC rpc = YarnRPC.create(conf);
745+
746+
UserGroupInformation currentUser =
747+
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
748+
Credentials credentials = containerManager.getContainerCredentials();
749+
final InetSocketAddress rmBindAddress =
750+
rm.getApplicationMasterService().getBindAddress();
751+
Token<? extends TokenIdentifier> amRMToken =
752+
MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
753+
credentials.getAllTokens());
754+
currentUser.addToken(amRMToken);
755+
ApplicationMasterProtocol client =
756+
currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
757+
@Override
758+
public ApplicationMasterProtocol run() {
759+
return (ApplicationMasterProtocol) rpc.getProxy(
760+
ApplicationMasterProtocol.class, rmBindAddress, conf);
761+
}
762+
});
763+
764+
RegisterApplicationMasterRequest request =
765+
RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
766+
client.registerApplicationMaster(request);
767+
768+
// grab the scheduler lock from another thread
769+
// and verify an allocate call in this thread doesn't block on it
770+
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
771+
final CyclicBarrier barrier = new CyclicBarrier(2);
772+
Thread otherThread = new Thread(new Runnable() {
773+
@Override
774+
public void run() {
775+
synchronized(cs) {
776+
try {
777+
barrier.await();
778+
barrier.await();
779+
} catch (InterruptedException e) {
780+
e.printStackTrace();
781+
} catch (BrokenBarrierException e) {
782+
e.printStackTrace();
783+
}
784+
}
785+
}
786+
});
787+
otherThread.start();
788+
barrier.await();
789+
AllocateRequest allocateRequest =
790+
AllocateRequest.newInstance(0, 0.0f, null, null, null);
791+
client.allocate(allocateRequest);
792+
barrier.await();
793+
otherThread.join();
794+
795+
rm.stop();
796+
}
797+
798+
@Test
799+
public void testNumClusterNodes() throws Exception {
800+
YarnConfiguration conf = new YarnConfiguration();
801+
CapacityScheduler cs = new CapacityScheduler();
802+
cs.setConf(conf);
803+
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
804+
null, new RMContainerTokenSecretManager(conf),
805+
new NMTokenSecretManagerInRM(conf),
806+
new ClientToAMTokenSecretManagerInRM(), null);
807+
cs.setRMContext(rmContext);
808+
CapacitySchedulerConfiguration csConf =
809+
new CapacitySchedulerConfiguration();
810+
setupQueueConfiguration(csConf);
811+
cs.init(csConf);
812+
cs.start();
813+
assertEquals(0, cs.getNumClusterNodes());
814+
815+
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
816+
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
817+
cs.handle(new NodeAddedSchedulerEvent(n1));
818+
cs.handle(new NodeAddedSchedulerEvent(n2));
819+
assertEquals(2, cs.getNumClusterNodes());
820+
821+
cs.handle(new NodeRemovedSchedulerEvent(n1));
822+
assertEquals(1, cs.getNumClusterNodes());
823+
cs.handle(new NodeAddedSchedulerEvent(n1));
824+
assertEquals(2, cs.getNumClusterNodes());
825+
cs.handle(new NodeRemovedSchedulerEvent(n2));
826+
cs.handle(new NodeRemovedSchedulerEvent(n1));
827+
assertEquals(0, cs.getNumClusterNodes());
828+
829+
cs.stop();
830+
}
689831
}

0 commit comments

Comments
 (0)