Skip to content

Commit 58a8032

Browse files
rkanterwilfred-s
authored andcommitted
YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race condition (wilfreds and gphillips via rkanter)
(cherry picked from commit 0cd145a) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Change-Id: I34608b53bd20d3db62312675d7b6cf9cfd52c7b1
1 parent a2c9c58 commit 58a8032

File tree

6 files changed

+115
-45
lines changed

6 files changed

+115
-45
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,9 @@ protected void nodeUpdate(RMNode nm) {
10851085
}
10861086

10871087
// Process new container information
1088+
// NOTICE: it is possible to not find the NodeID as a node can be
1089+
// decommissioned at the same time. Skip updates if node is null.
1090+
SchedulerNode schedulerNode = getNode(nm.getNodeID());
10881091
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
10891092

10901093
// Process completed containers
@@ -1095,26 +1098,26 @@ protected void nodeUpdate(RMNode nm) {
10951098
// If the node is decommissioning, send an update to have the total
10961099
// resource equal to the used resource, so no available resource to
10971100
// schedule.
1098-
// TODO YARN-5128: Fix possible race-condition when request comes in before
1099-
// update is propagated
1100-
if (nm.getState() == NodeState.DECOMMISSIONING) {
1101+
if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) {
11011102
this.rmContext
11021103
.getDispatcher()
11031104
.getEventHandler()
11041105
.handle(
11051106
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
1106-
.newInstance(getSchedulerNode(nm.getNodeID())
1107-
.getAllocatedResource(), 0)));
1107+
.newInstance(schedulerNode.getAllocatedResource(), 0)));
11081108
}
11091109

11101110
updateSchedulerHealthInformation(releasedResources, releasedContainers);
1111-
updateNodeResourceUtilization(nm);
1111+
if (schedulerNode != null) {
1112+
updateNodeResourceUtilization(nm);
1113+
}
11121114

11131115
// Now node data structures are up-to-date and ready for scheduling.
11141116
if(LOG.isDebugEnabled()) {
1115-
SchedulerNode node = getNode(nm.getNodeID());
1116-
LOG.debug("Node being looked for scheduling " + nm +
1117-
" availableResource: " + node.getUnallocatedResource());
1117+
LOG.debug(
1118+
"Node being looked for scheduling " + nm + " availableResource: " +
1119+
(schedulerNode == null ? "unknown (decommissioned)" :
1120+
schedulerNode.getUnallocatedResource()));
11181121
}
11191122
}
11201123

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,7 @@ void attemptScheduling(FSSchedulerNode node) {
10471047
return;
10481048
}
10491049

1050-
final NodeId nodeID = node.getNodeID();
1050+
final NodeId nodeID = (node != null ? node.getNodeID() : null);
10511051
if (!nodeTracker.exists(nodeID)) {
10521052
// The node might have just been removed while this thread was waiting
10531053
// on the synchronized lock before it entered this synchronized method

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -961,8 +961,10 @@ protected synchronized void nodeUpdate(RMNode nm) {
961961
return;
962962
}
963963

964-
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
965-
node.getUnallocatedResource(), minimumAllocation)) {
964+
// A decommissioned node might be removed before we get here
965+
if (node != null &&
966+
Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
967+
node.getUnallocatedResource(), minimumAllocation)) {
966968
LOG.debug("Node heartbeat " + nm.getNodeID() +
967969
" available resource = " + node.getUnallocatedResource());
968970

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -279,14 +279,12 @@ public void testConfValidation() throws Exception {
279279
}
280280
}
281281

282-
private NodeManager
283-
registerNode(String hostName, int containerManagerPort, int httpPort,
284-
String rackName, Resource capability)
282+
private NodeManager registerNode(String hostName, int containerManagerPort,
283+
int httpPort, String rackName,
284+
Resource capability)
285285
throws IOException, YarnException {
286-
NodeManager nm =
287-
new NodeManager(
288-
hostName, containerManagerPort, httpPort, rackName, capability,
289-
resourceManager);
286+
NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
287+
rackName, capability, resourceManager);
290288
NodeAddedSchedulerEvent nodeAddEvent1 =
291289
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
292290
.getRMNodes().get(nm.getNodeId()));
@@ -301,13 +299,13 @@ public void testCapacityScheduler() throws Exception {
301299

302300
// Register node1
303301
String host_0 = "host_0";
304-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
302+
NodeManager nm_0 =
305303
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
306304
Resources.createResource(4 * GB, 1));
307305

308306
// Register node2
309307
String host_1 = "host_1";
310-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
308+
NodeManager nm_1 =
311309
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
312310
Resources.createResource(2 * GB, 1));
313311

@@ -4080,6 +4078,29 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
40804078
Assert.fail("Cannot find RMContainer");
40814079
}
40824080
}
4081+
@Test
4082+
public void testRemovedNodeDecomissioningNode() throws Exception {
4083+
// Register nodemanager
4084+
NodeManager nm = registerNode("host_decom", 1234, 2345,
4085+
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
4086+
4087+
RMNode node =
4088+
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
4089+
// Send a heartbeat to kick the tires on the Scheduler
4090+
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
4091+
resourceManager.getResourceScheduler().handle(nodeUpdate);
4092+
4093+
// force remove the node to simulate race condition
4094+
((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
4095+
removeNode(nm.getNodeId());
4096+
// Kick off another heartbeat with the node state mocked to decommissioning
4097+
RMNode spyNode =
4098+
Mockito.spy(resourceManager.getRMContext().getRMNodes()
4099+
.get(nm.getNodeId()));
4100+
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
4101+
resourceManager.getResourceScheduler().handle(
4102+
new NodeUpdateSchedulerEvent(spyNode));
4103+
}
40834104

40844105
@Test
40854106
public void testResourceUpdateDecommissioningNode() throws Exception {
@@ -4106,9 +4127,8 @@ public void handle(Event event) {
41064127
((AsyncDispatcher) mockDispatcher).start();
41074128
// Register node
41084129
String host_0 = "host_0";
4109-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
4110-
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
4111-
Resources.createResource(8 * GB, 4));
4130+
NodeManager nm_0 = registerNode(host_0, 1234, 2345,
4131+
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
41124132
// ResourceRequest priorities
41134133
Priority priority_0 = Priority.newInstance(0);
41144134

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
8383
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
8484
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
85+
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
8586
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
8687
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
8788
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -4973,6 +4974,30 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName()
49734974
.get(attId3.getApplicationId()).getQueue());
49744975
}
49754976

4977+
@Test
4978+
public void testRemovedNodeDecomissioningNode() throws Exception {
4979+
// Register nodemanager
4980+
NodeManager nm = registerNode("host_decom", 1234, 2345,
4981+
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
4982+
4983+
RMNode node =
4984+
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
4985+
// Send a heartbeat to kick the tires on the Scheduler
4986+
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
4987+
resourceManager.getResourceScheduler().handle(nodeUpdate);
4988+
4989+
// Force remove the node to simulate race condition
4990+
((FairScheduler) resourceManager.getResourceScheduler())
4991+
.getNodeTracker().removeNode(nm.getNodeId());
4992+
// Kick off another heartbeat with the node state mocked to decommissioning
4993+
RMNode spyNode =
4994+
Mockito.spy(resourceManager.getRMContext().getRMNodes()
4995+
.get(nm.getNodeId()));
4996+
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
4997+
resourceManager.getResourceScheduler().handle(
4998+
new NodeUpdateSchedulerEvent(spyNode));
4999+
}
5000+
49765001
@Test
49775002
public void testResourceUpdateDecommissioningNode() throws Exception {
49785003
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@@ -4998,9 +5023,8 @@ public void handle(Event event) {
49985023
((AsyncDispatcher) mockDispatcher).start();
49995024
// Register node
50005025
String host_0 = "host_0";
5001-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
5002-
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
5003-
Resources.createResource(8 * GB, 4));
5026+
NodeManager nm_0 = registerNode(host_0, 1234, 2345,
5027+
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
50045028

50055029
RMNode node =
50065030
resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
@@ -5038,13 +5062,12 @@ public void handle(Event event) {
50385062
Assert.assertEquals(availableResource.getVirtualCores(), 0);
50395063
}
50405064

5041-
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(
5042-
String hostName, int containerManagerPort, int httpPort, String rackName,
5043-
Resource capability) throws IOException, YarnException {
5044-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
5045-
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
5046-
containerManagerPort, httpPort, rackName, capability,
5047-
resourceManager);
5065+
private NodeManager registerNode(String hostName, int containerManagerPort,
5066+
int httpPort, String rackName,
5067+
Resource capability)
5068+
throws IOException, YarnException {
5069+
NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
5070+
rackName, capability, resourceManager);
50485071

50495072
// after YARN-5375, scheduler event is processed in rm main dispatcher,
50505073
// wait it processed, or may lead dead lock

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
6767
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
6868
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
69+
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
6970
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
7071
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
7172
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -137,14 +138,12 @@ public void tearDown() throws Exception {
137138
resourceManager.stop();
138139
}
139140

140-
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
141-
registerNode(String hostName, int containerManagerPort, int nmHttpPort,
142-
String rackName, Resource capability) throws IOException,
143-
YarnException {
144-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
145-
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
146-
containerManagerPort, nmHttpPort, rackName, capability,
147-
resourceManager);
141+
private NodeManager registerNode(String hostName, int containerManagerPort,
142+
int nmHttpPort, String rackName,
143+
Resource capability)
144+
throws IOException, YarnException {
145+
NodeManager nm = new NodeManager(hostName, containerManagerPort,
146+
nmHttpPort, rackName, capability, resourceManager);
148147
NodeAddedSchedulerEvent nodeAddEvent1 =
149148
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
150149
.get(nm.getNodeId()));
@@ -1190,6 +1189,30 @@ public void testResourceOverCommit() throws Exception {
11901189
rm.stop();
11911190
}
11921191

1192+
@Test
1193+
public void testRemovedNodeDecomissioningNode() throws Exception {
1194+
// Register nodemanager
1195+
NodeManager nm = registerNode("host_decom", 1234, 2345,
1196+
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
1197+
1198+
RMNode node =
1199+
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
1200+
// Send a heartbeat to kick the tires on the Scheduler
1201+
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
1202+
resourceManager.getResourceScheduler().handle(nodeUpdate);
1203+
1204+
// Force remove the node to simulate race condition
1205+
((FifoScheduler) resourceManager.getResourceScheduler())
1206+
.getNodeTracker().removeNode(nm.getNodeId());
1207+
// Kick off another heartbeat with the node state mocked to decommissioning
1208+
RMNode spyNode =
1209+
Mockito.spy(resourceManager.getRMContext().getRMNodes()
1210+
.get(nm.getNodeId()));
1211+
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
1212+
resourceManager.getResourceScheduler().handle(
1213+
new NodeUpdateSchedulerEvent(spyNode));
1214+
}
1215+
11931216
@Test
11941217
public void testResourceUpdateDecommissioningNode() throws Exception {
11951218
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@@ -1215,9 +1238,8 @@ public void handle(Event event) {
12151238
((AsyncDispatcher) mockDispatcher).start();
12161239
// Register node
12171240
String host_0 = "host_0";
1218-
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
1219-
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
1220-
Resources.createResource(8 * GB, 4));
1241+
NodeManager nm_0 = registerNode(host_0, 1234, 2345,
1242+
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
12211243
// ResourceRequest priorities
12221244
Priority priority_0 = Priority.newInstance(0);
12231245

0 commit comments

Comments
 (0)