Skip to content

Commit 9017491

Browse files
committed
YARN-365. Change NM heartbeat handling to not generate a scheduler event on each heartbeat. (Contributed by Xuan Gong)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1450007 13f79535-47bb-0310-9956-ffa450edef68
1 parent c936078 commit 9017491

File tree

12 files changed

+287
-106
lines changed

12 files changed

+287
-106
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ Release 2.0.4-beta - UNRELEASED
2222

2323
IMPROVEMENTS
2424

25+
YARN-365. Change NM heartbeat handling to not generate a scheduler event
26+
on each heartbeat. (Xuan Gong via sseth)
27+
2528
OPTIMIZATIONS
2629

2730
BUG FIXES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,13 @@ public interface RMNode {
106106
public List<ApplicationId> getAppsToCleanup();
107107

108108
public HeartbeatResponse getLastHeartBeatResponse();
109+
110+
/**
111+
* Get and clear the list of containerUpdates accumulated across NM
112+
* heartbeats.
113+
*
114+
* @return containerUpdates accumulated across NM heartbeats.
115+
*/
116+
public List<UpdatedContainerInfo> pullContainerUpdates();
117+
109118
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.Set;
2727
import java.util.TreeSet;
28+
import java.util.concurrent.ConcurrentLinkedQueue;
2829
import java.util.concurrent.locks.ReentrantReadWriteLock;
2930
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
3031
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -60,6 +61,8 @@
6061
import org.apache.hadoop.yarn.state.StateMachineFactory;
6162
import org.apache.hadoop.yarn.util.BuilderUtils.ContainerIdComparator;
6263

64+
import com.google.common.annotations.VisibleForTesting;
65+
6366
/**
6467
* This class is used to keep track of all the applications/containers
6568
* running on a node.
@@ -78,6 +81,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
7881
private final ReadLock readLock;
7982
private final WriteLock writeLock;
8083

84+
private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
85+
private volatile boolean nextHeartBeat = true;
86+
8187
private final NodeId nodeId;
8288
private final RMContext context;
8389
private final String hostName;
@@ -186,6 +192,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
186192

187193
this.stateMachine = stateMachineFactory.make(this);
188194

195+
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
189196
}
190197

191198
@Override
@@ -400,6 +407,7 @@ public static class ReconnectNodeTransition implements
400407
@Override
401408
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
402409
// Kill containers since node is rejoining.
410+
rmNode.nodeUpdateQueue.clear();
403411
rmNode.context.getDispatcher().getEventHandler().handle(
404412
new NodeRemovedSchedulerEvent(rmNode));
405413

@@ -458,6 +466,7 @@ public DeactivateNodeTransition(NodeState finalState) {
458466
@Override
459467
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
460468
// Inform the scheduler
469+
rmNode.nodeUpdateQueue.clear();
461470
rmNode.context.getDispatcher().getEventHandler().handle(
462471
new NodeRemovedSchedulerEvent(rmNode));
463472
rmNode.context.getDispatcher().getEventHandler().handle(
@@ -489,6 +498,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
489498
statusEvent.getNodeHealthStatus();
490499
rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
491500
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
501+
rmNode.nodeUpdateQueue.clear();
492502
// Inform the scheduler
493503
rmNode.context.getDispatcher().getEventHandler().handle(
494504
new NodeRemovedSchedulerEvent(rmNode));
@@ -538,10 +548,16 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
538548
completedContainers.add(remoteContainer);
539549
}
540550
}
541-
542-
rmNode.context.getDispatcher().getEventHandler().handle(
543-
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
544-
completedContainers));
551+
if(newlyLaunchedContainers.size() != 0
552+
|| completedContainers.size() != 0) {
553+
rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
554+
(newlyLaunchedContainers, completedContainers));
555+
}
556+
if(rmNode.nextHeartBeat) {
557+
rmNode.nextHeartBeat = false;
558+
rmNode.context.getDispatcher().getEventHandler().handle(
559+
new NodeUpdateSchedulerEvent(rmNode));
560+
}
545561

546562
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
547563
statusEvent.getKeepAliveAppIds());
@@ -584,4 +600,25 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
584600
return NodeState.UNHEALTHY;
585601
}
586602
}
603+
604+
@Override
605+
public List<UpdatedContainerInfo> pullContainerUpdates() {
606+
List<UpdatedContainerInfo> latestContainerInfoList =
607+
new ArrayList<UpdatedContainerInfo>();
608+
while(nodeUpdateQueue.peek() != null){
609+
latestContainerInfoList.add(nodeUpdateQueue.poll());
610+
}
611+
this.nextHeartBeat = true;
612+
return latestContainerInfoList;
613+
}
614+
615+
@VisibleForTesting
616+
public void setNextHeartBeat(boolean nextHeartBeat) {
617+
this.nextHeartBeat = nextHeartBeat;
618+
}
619+
620+
@VisibleForTesting
621+
public int getQueueSize() {
622+
return nodeUpdateQueue.size();
623+
}
587624
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
20+
21+
import java.util.List;
22+
23+
import org.apache.hadoop.yarn.api.records.ContainerStatus;
24+
25+
public class UpdatedContainerInfo {
26+
private List<ContainerStatus> newlyLaunchedContainers;
27+
private List<ContainerStatus> completedContainers;
28+
29+
public UpdatedContainerInfo() {
30+
}
31+
32+
public UpdatedContainerInfo(List<ContainerStatus> newlyLaunchedContainers
33+
, List<ContainerStatus> completedContainers) {
34+
this.newlyLaunchedContainers = newlyLaunchedContainers;
35+
this.completedContainers = completedContainers;
36+
}
37+
38+
public List<ContainerStatus> getNewlyLaunchedContainers() {
39+
return this.newlyLaunchedContainers;
40+
}
41+
42+
public List<ContainerStatus> getCompletedContainers() {
43+
return this.completedContainers;
44+
}
45+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
6161
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
6262
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
63+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
6364
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
6465
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
6566
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -562,15 +563,20 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
562563
return root.getQueueUserAclInfo(user);
563564
}
564565

565-
private synchronized void nodeUpdate(RMNode nm,
566-
List<ContainerStatus> newlyLaunchedContainers,
567-
List<ContainerStatus> completedContainers) {
566+
private synchronized void nodeUpdate(RMNode nm) {
568567
if (LOG.isDebugEnabled()) {
569568
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
570569
}
571-
572-
FiCaSchedulerNode node = getNode(nm.getNodeID());
573570

571+
FiCaSchedulerNode node = getNode(nm.getNodeID());
572+
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
573+
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
574+
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
575+
for(UpdatedContainerInfo containerInfo : containerInfoList) {
576+
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
577+
completedContainers.addAll(containerInfo.getCompletedContainers());
578+
}
579+
574580
// Processing the newly launched containers
575581
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
576582
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -666,9 +672,7 @@ public void handle(SchedulerEvent event) {
666672
case NODE_UPDATE:
667673
{
668674
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
669-
nodeUpdate(nodeUpdatedEvent.getRMNode(),
670-
nodeUpdatedEvent.getNewlyLaunchedContainers(),
671-
nodeUpdatedEvent.getCompletedContainers());
675+
nodeUpdate(nodeUpdatedEvent.getRMNode());
672676
}
673677
break;
674678
case APP_ADDED:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,18 @@
1818

1919
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
2020

21-
import java.util.List;
22-
23-
import org.apache.hadoop.yarn.api.records.ContainerStatus;
2421
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
2522

2623
public class NodeUpdateSchedulerEvent extends SchedulerEvent {
2724

2825
private final RMNode rmNode;
29-
private final List<ContainerStatus> newlyLaunchedContainers;
30-
private final List<ContainerStatus> completedContainersStatuses;
3126

32-
public NodeUpdateSchedulerEvent(RMNode rmNode,
33-
List<ContainerStatus> newlyLaunchedContainers,
34-
List<ContainerStatus> completedContainers) {
27+
public NodeUpdateSchedulerEvent(RMNode rmNode) {
3528
super(SchedulerEventType.NODE_UPDATE);
3629
this.rmNode = rmNode;
37-
this.newlyLaunchedContainers = newlyLaunchedContainers;
38-
this.completedContainersStatuses = completedContainers;
3930
}
4031

4132
public RMNode getRMNode() {
4233
return rmNode;
4334
}
44-
45-
public List<ContainerStatus> getNewlyLaunchedContainers() {
46-
return newlyLaunchedContainers;
47-
}
48-
49-
public List<ContainerStatus> getCompletedContainers() {
50-
return completedContainersStatuses;
51-
}
5235
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
3434
import org.apache.hadoop.classification.InterfaceStability.Unstable;
3535
import org.apache.hadoop.conf.Configuration;
36-
import org.apache.hadoop.security.AccessControlException;
3736
import org.apache.hadoop.security.UserGroupInformation;
3837
import org.apache.hadoop.yarn.Clock;
3938
import org.apache.hadoop.yarn.SystemClock;
@@ -61,6 +60,7 @@
6160
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
6261
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
6362
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
63+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
6464
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
6565
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
6666
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -750,15 +750,20 @@ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode no
750750
/**
751751
* Process a heartbeat update from a node.
752752
*/
753-
private synchronized void nodeUpdate(RMNode nm,
754-
List<ContainerStatus> newlyLaunchedContainers,
755-
List<ContainerStatus> completedContainers) {
753+
private synchronized void nodeUpdate(RMNode nm) {
756754
if (LOG.isDebugEnabled()) {
757755
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
758756
}
759757
eventLog.log("HEARTBEAT", nm.getHostName());
760758
FSSchedulerNode node = nodes.get(nm.getNodeID());
761759

760+
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
761+
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
762+
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
763+
for(UpdatedContainerInfo containerInfo : containerInfoList) {
764+
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
765+
completedContainers.addAll(containerInfo.getCompletedContainers());
766+
}
762767
// Processing the newly launched containers
763768
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
764769
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -864,9 +869,7 @@ public void handle(SchedulerEvent event) {
864869
throw new RuntimeException("Unexpected event type: " + event);
865870
}
866871
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
867-
nodeUpdate(nodeUpdatedEvent.getRMNode(),
868-
nodeUpdatedEvent.getNewlyLaunchedContainers(),
869-
nodeUpdatedEvent.getCompletedContainers());
872+
nodeUpdate(nodeUpdatedEvent.getRMNode());
870873
break;
871874
case APP_ADDED:
872875
if (!(event instanceof AppAddedSchedulerEvent)) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
6868
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
6969
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
70+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
7071
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
7172
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
7273
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -576,11 +577,16 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application
576577
return assignedContainers;
577578
}
578579

579-
private synchronized void nodeUpdate(RMNode rmNode,
580-
List<ContainerStatus> newlyLaunchedContainers,
581-
List<ContainerStatus> completedContainers) {
580+
private synchronized void nodeUpdate(RMNode rmNode) {
582581
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
583582

583+
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
584+
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
585+
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
586+
for(UpdatedContainerInfo containerInfo : containerInfoList) {
587+
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
588+
completedContainers.addAll(containerInfo.getCompletedContainers());
589+
}
584590
// Processing the newly launched containers
585591
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
586592
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -628,9 +634,7 @@ public void handle(SchedulerEvent event) {
628634
{
629635
NodeUpdateSchedulerEvent nodeUpdatedEvent =
630636
(NodeUpdateSchedulerEvent)event;
631-
nodeUpdate(nodeUpdatedEvent.getRMNode(),
632-
nodeUpdatedEvent.getNewlyLaunchedContainers(),
633-
nodeUpdatedEvent.getCompletedContainers());
637+
nodeUpdate(nodeUpdatedEvent.getRMNode());
634638
}
635639
break;
636640
case APP_ADDED:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package org.apache.hadoop.yarn.server.resourcemanager;
2020

21+
import java.util.ArrayList;
2122
import java.util.List;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2224

2325
import org.apache.hadoop.net.Node;
2426
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -31,6 +33,7 @@
3133
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
3234
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
3335
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
36+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
3437

3538
import com.google.common.collect.Lists;
3639

@@ -187,6 +190,11 @@ public List<ApplicationId> getAppsToCleanup() {
187190
public HeartbeatResponse getLastHeartBeatResponse() {
188191
return null;
189192
}
193+
194+
@Override
195+
public List<UpdatedContainerInfo> pullContainerUpdates() {
196+
return new ArrayList<UpdatedContainerInfo>();
197+
}
190198
};
191199

192200
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {

0 commit comments

Comments
 (0)