Skip to content

Commit 66aa00d

Browse files
author
Jonathan Turner Eagles
committed
YARN-819. ResourceManager and NodeManager should check for a minimum allowed version (Robert Parker via jeagles)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1526665 13f79535-47bb-0310-9956-ffa450edef68
1 parent b5e9a01 commit 66aa00d

File tree

13 files changed

+228
-4
lines changed

13 files changed

+228
-4
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ Release 2.3.0 - UNRELEASED
2121
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
2222
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
2323

24+
YARN-819. ResourceManager and NodeManager should check for a minimum allowed
25+
version (Robert Parker via jeagles)
26+
2427
OPTIMIZATIONS
2528

2629
BUG FIXES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,13 @@ public class YarnConfiguration extends Configuration {
362362

363363
public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
364364
24 * 60 * 60;
365+
366+
public static final String RM_NODEMANAGER_MINIMUM_VERSION =
367+
RM_PREFIX + "nodemanager.minimum.version";
368+
369+
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
370+
"NONE";
371+
365372
////////////////////////////////
366373
// Node Manager Configs
367374
////////////////////////////////
@@ -460,6 +467,10 @@ public class YarnConfiguration extends Configuration {
460467
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
461468
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
462469

470+
public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
471+
NM_PREFIX + "resourcemanager.minimum.version";
472+
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
473+
463474
/** Interval at which the delayed token removal thread runs */
464475
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
465476
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,14 @@
358358
<value>1000</value>
359359
</property>
360360

361+
<property>
362+
<description>The minimum allowed version of a connecting nodemanager. The valid values are
363+
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
364+
or greater than the RM version), or a Version String.</description>
365+
<name>yarn.resourcemanager.nodemanager.minimum.version</name>
366+
<value>NONE</value>
367+
</property>
368+
361369
<property>
362370
<description>Enable a set of periodic monitors (specified in
363371
yarn.resourcemanager.scheduler.monitor.policies) that affect the
@@ -737,6 +745,14 @@
737745
<value>30</value>
738746
</property>
739747

748+
<property>
749+
<description>The minimum allowed version of a resourcemanager that a nodemanager will connect to.
750+
The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is
751+
equal to or greater than the NM version), or a Version String.</description>
752+
<name>yarn.nodemanager.resourcemanager.minimum.version</name>
753+
<value>NONE</value>
754+
</property>
755+
740756
<property>
741757
<description>Max number of threads in NMClientAsync to process container
742758
management events</description>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ public interface RegisterNodeManagerRequest {
2525
NodeId getNodeId();
2626
int getHttpPort();
2727
Resource getResource();
28+
String getNMVersion();
2829

2930
void setNodeId(NodeId nodeId);
3031
void setHttpPort(int port);
3132
void setResource(Resource resource);
33+
void setNMVersion(String version);
3234
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,7 @@ public interface RegisterNodeManagerResponse {
4242

4343
void setDiagnosticsMessage(String diagnosticsMessage);
4444

45+
void setRMVersion(String version);
46+
47+
String getRMVersion();
4548
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,21 @@ public void setHttpPort(int httpPort) {
139139
builder.setHttpPort(httpPort);
140140
}
141141

142+
@Override
143+
public String getNMVersion() {
144+
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
145+
if (!p.hasNmVersion()) {
146+
return "";
147+
}
148+
return (p.getNmVersion());
149+
}
150+
151+
@Override
152+
public void setNMVersion(String version) {
153+
maybeInitBuilder();
154+
builder.setNmVersion(version);
155+
}
156+
142157
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
143158
return new NodeIdPBImpl(p);
144159
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,25 @@ public void setDiagnosticsMessage(String diagnosticsMessage) {
150150
builder.setDiagnosticsMessage((diagnosticsMessage));
151151
}
152152

153+
@Override
154+
public String getRMVersion() {
155+
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
156+
if (!p.hasRmVersion()) {
157+
return null;
158+
}
159+
return p.getRmVersion();
160+
}
161+
162+
@Override
163+
public void setRMVersion(String rmVersion) {
164+
maybeInitBuilder();
165+
if (rmVersion == null) {
166+
builder.clearRmIdentifier();
167+
return;
168+
}
169+
builder.setRmVersion(rmVersion);
170+
}
171+
153172
@Override
154173
public NodeAction getNodeAction() {
155174
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ message RegisterNodeManagerRequestProto {
2929
optional NodeIdProto node_id = 1;
3030
optional int32 http_port = 3;
3131
optional ResourceProto resource = 4;
32+
optional string nm_version = 5;
3233
}
3334

3435
message RegisterNodeManagerResponseProto {
@@ -37,6 +38,7 @@ message RegisterNodeManagerResponseProto {
3738
optional NodeActionProto nodeAction = 3;
3839
optional int64 rm_identifier = 4;
3940
optional string diagnostics_message = 5;
41+
optional string rm_version = 6;
4042
}
4143

4244
message NodeHeartbeatRequestProto {
@@ -45,7 +47,6 @@ message NodeHeartbeatRequestProto {
4547
optional MasterKeyProto last_known_nm_token_master_key = 3;
4648
}
4749

48-
4950
message NodeHeartbeatResponseProto {
5051
optional int32 response_id = 1;
5152
optional MasterKeyProto container_token_master_key = 2;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.ipc.RPC;
3838
import org.apache.hadoop.security.UserGroupInformation;
3939
import org.apache.hadoop.service.AbstractService;
40+
import org.apache.hadoop.util.VersionUtil;
4041
import org.apache.hadoop.yarn.api.records.ApplicationId;
4142
import org.apache.hadoop.yarn.api.records.ContainerId;
4243
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -63,6 +64,7 @@
6364
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
6465
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
6566
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
67+
import org.apache.hadoop.yarn.util.YarnVersionInfo;
6668

6769
import com.google.common.annotations.VisibleForTesting;
6870

@@ -84,6 +86,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
8486
private ResourceTracker resourceTracker;
8587
private Resource totalResource;
8688
private int httpPort;
89+
private String nodeManagerVersionId;
90+
private String minimumResourceManagerVersion;
8791
private volatile boolean isStopped;
8892
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
8993
private boolean tokenKeepAliveEnabled;
@@ -138,6 +142,10 @@ protected void serviceInit(Configuration conf) throws Exception {
138142
this.tokenRemovalDelayMs =
139143
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
140144
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
145+
146+
this.minimumResourceManagerVersion = conf.get(
147+
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
148+
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
141149

142150
// Default duration to track stopped containers on nodemanager is 10Min.
143151
// This should not be assigned very large value as it will remember all the
@@ -168,6 +176,7 @@ protected void serviceStart() throws Exception {
168176
// NodeManager is the last service to start, so NodeId is available.
169177
this.nodeId = this.context.getNodeId();
170178
this.httpPort = this.context.getHttpPort();
179+
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
171180
try {
172181
// Registration has to be in start so that ContainerManager can get the
173182
// perNM tokens needed to authenticate ContainerTokens.
@@ -235,6 +244,7 @@ protected void registerWithRM() throws YarnException, IOException {
235244
request.setHttpPort(this.httpPort);
236245
request.setResource(this.totalResource);
237246
request.setNodeId(this.nodeId);
247+
request.setNMVersion(this.nodeManagerVersionId);
238248
RegisterNodeManagerResponse regNMResponse =
239249
resourceTracker.registerNodeManager(request);
240250
this.rmIdentifier = regNMResponse.getRMIdentifier();
@@ -248,6 +258,26 @@ protected void registerWithRM() throws YarnException, IOException {
248258
+ message);
249259
}
250260

261+
// if ResourceManager version is too old then shutdown
262+
if (!minimumResourceManagerVersion.equals("NONE")){
263+
if (minimumResourceManagerVersion.equals("EqualToNM")){
264+
minimumResourceManagerVersion = nodeManagerVersionId;
265+
}
266+
String rmVersion = regNMResponse.getRMVersion();
267+
if (rmVersion == null) {
268+
String message = "The Resource Manager's did not return a version. "
269+
+ "Valid version cannot be checked.";
270+
throw new YarnRuntimeException("Shutting down the Node Manager. "
271+
+ message);
272+
}
273+
if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) {
274+
String message = "The Resource Manager's version ("
275+
+ rmVersion +") is less than the minimum "
276+
+ "allowed version " + minimumResourceManagerVersion;
277+
throw new YarnRuntimeException("Shutting down the Node Manager on RM "
278+
+ "version error, " + message);
279+
}
280+
}
251281
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
252282
// do this now so that its set before we start heartbeating to RM
253283
// It is expected that status updater is started by this point and

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public static MasterKey createMasterKey() {
145145
.byteValue() }));
146146
return masterKey;
147147
}
148-
148+
149149
private class MyResourceTracker implements ResourceTracker {
150150

151151
private final Context context;
@@ -471,6 +471,7 @@ private class MyResourceTracker2 implements ResourceTracker {
471471
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
472472
public NodeAction registerNodeAction = NodeAction.NORMAL;
473473
public String shutDownMessage = "";
474+
public String rmVersion = "3.0.1";
474475

475476
@Override
476477
public RegisterNodeManagerResponse registerNodeManager(
@@ -483,6 +484,7 @@ public RegisterNodeManagerResponse registerNodeManager(
483484
response.setContainerTokenMasterKey(createMasterKey());
484485
response.setNMTokenMasterKey(createMasterKey());
485486
response.setDiagnosticsMessage(shutDownMessage);
487+
response.setRMVersion(rmVersion);
486488
return response;
487489
}
488490
@Override
@@ -1180,6 +1182,44 @@ public void testNodeStatusUpdaterRetryAndNMShutdown()
11801182
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
11811183
}
11821184

1185+
@Test
1186+
public void testRMVersionLessThanMinimum() throws InterruptedException {
1187+
final AtomicInteger numCleanups = new AtomicInteger(0);
1188+
YarnConfiguration conf = createNMConfig();
1189+
conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0");
1190+
nm = new NodeManager() {
1191+
@Override
1192+
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1193+
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1194+
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
1195+
context, dispatcher, healthChecker, metrics);
1196+
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
1197+
myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL;
1198+
myResourceTracker2.rmVersion = "3.0.0";
1199+
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
1200+
return myNodeStatusUpdater;
1201+
}
1202+
1203+
@Override
1204+
protected void cleanupContainers(NodeManagerEventType eventType) {
1205+
super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
1206+
numCleanups.incrementAndGet();
1207+
}
1208+
};
1209+
1210+
nm.init(conf);
1211+
nm.start();
1212+
1213+
// NM takes a while to reach the STARTED state.
1214+
int waitCount = 0;
1215+
while (nm.getServiceState() != STATE.STARTED && waitCount++ != 20) {
1216+
LOG.info("Waiting for NM to stop..");
1217+
Thread.sleep(1000);
1218+
}
1219+
Assert.assertTrue(nm.getServiceState() == STATE.STARTED);
1220+
nm.stop();
1221+
}
1222+
11831223
private class MyNMContext extends NMContext {
11841224
ConcurrentMap<ContainerId, Container> containers =
11851225
new ConcurrentSkipListMap<ContainerId, Container>();

0 commit comments

Comments
 (0)