Skip to content

Commit 48209ee

Browse files
committed
YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of transferred containers from previous app-attempts to new AMs after YARN-1490. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1572230 13f79535-47bb-0310-9956-ffa450edef68
1 parent ac83a2f commit 48209ee

File tree

12 files changed

+324
-43
lines changed

12 files changed

+324
-43
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ Release 2.4.0 - UNRELEASED
146146
YARN-1497. Command line additions for moving apps between queues (Sandy
147147
Ryza)
148148

149+
YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of
150+
transferred containers from previous app-attempts to new AMs after YARN-1490.
151+
(Jian He via vinodkv)
152+
149153
IMPROVEMENTS
150154

151155
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
3030
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
3131
import org.apache.hadoop.yarn.api.records.Container;
32+
import org.apache.hadoop.yarn.api.records.NMToken;
3233
import org.apache.hadoop.yarn.api.records.Resource;
3334
import org.apache.hadoop.yarn.util.Records;
3435

@@ -55,13 +56,15 @@ public abstract class RegisterApplicationMasterResponse {
5556
public static RegisterApplicationMasterResponse newInstance(
5657
Resource minCapability, Resource maxCapability,
5758
Map<ApplicationAccessType, String> acls, ByteBuffer key,
58-
List<Container> containersFromPreviousAttempt, String queue) {
59+
List<Container> containersFromPreviousAttempt, String queue,
60+
List<NMToken> nmTokensFromPreviousAttempts) {
5961
RegisterApplicationMasterResponse response =
6062
Records.newRecord(RegisterApplicationMasterResponse.class);
6163
response.setMaximumResourceCapability(maxCapability);
6264
response.setApplicationACLs(acls);
6365
response.setClientToAMTokenMasterKey(key);
64-
response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
66+
response.setContainersFromPreviousAttempts(containersFromPreviousAttempt);
67+
response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts);
6568
response.setQueue(queue);
6669
return response;
6770
}
@@ -129,26 +132,52 @@ public static RegisterApplicationMasterResponse newInstance(
129132
/**
130133
* <p>
131134
* Get the list of running containers as viewed by
132-
* <code>ResourceManager</code> from previous application attempt.
135+
* <code>ResourceManager</code> from previous application attempts.
133136
* </p>
134137
*
135138
* @return the list of running containers as viewed by
136-
* <code>ResourceManager</code> from previous application attempt
139+
* <code>ResourceManager</code> from previous application attempts
140+
* @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts()
137141
*/
138142
@Public
139143
@Unstable
140-
public abstract List<Container> getContainersFromPreviousAttempt();
144+
public abstract List<Container> getContainersFromPreviousAttempts();
141145

142146
/**
143147
* Set the list of running containers as viewed by
144-
* <code>ResourceManager</code> from previous application attempt.
148+
* <code>ResourceManager</code> from previous application attempts.
145149
*
146150
* @param containersFromPreviousAttempt
147151
* the list of running containers as viewed by
148-
* <code>ResourceManager</code> from previous application attempt.
152+
* <code>ResourceManager</code> from previous application attempts.
149153
*/
150154
@Private
151155
@Unstable
152-
public abstract void setContainersFromPreviousAttempt(
156+
public abstract void setContainersFromPreviousAttempts(
153157
List<Container> containersFromPreviousAttempt);
158+
159+
/**
160+
* Get the list of NMTokens for communicating with the NMs where the
161+
* containers of previous application attempts are running.
162+
*
163+
* @return the list of NMTokens for communicating with the NMs where the
164+
* containers of previous application attempts are running.
165+
*
166+
* @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()
167+
*/
168+
@Public
169+
@Stable
170+
public abstract List<NMToken> getNMTokensFromPreviousAttempts();
171+
172+
/**
173+
* Set the list of NMTokens for communicating with the NMs where the the
174+
* containers of previous application attempts are running.
175+
*
176+
* @param nmTokens
177+
* the list of NMTokens for communicating with the NMs where the
178+
* containers of previous application attempts are running.
179+
*/
180+
@Private
181+
@Unstable
182+
public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
154183
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,37 @@ public static NMToken newInstance(NodeId nodeId, Token token) {
7272
@Stable
7373
public abstract void setToken(Token token);
7474

75+
76+
@Override
77+
public int hashCode() {
78+
final int prime = 31;
79+
int result = 1;
80+
result =
81+
prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode());
82+
result =
83+
prime * result + ((getToken() == null) ? 0 : getToken().hashCode());
84+
return result;
85+
}
86+
87+
@Override
88+
public boolean equals(Object obj) {
89+
if (this == obj)
90+
return true;
91+
if (obj == null)
92+
return false;
93+
if (getClass() != obj.getClass())
94+
return false;
95+
NMToken other = (NMToken) obj;
96+
if (getNodeId() == null) {
97+
if (other.getNodeId() != null)
98+
return false;
99+
} else if (!getNodeId().equals(other.getNodeId()))
100+
return false;
101+
if (getToken() == null) {
102+
if (other.getToken() != null)
103+
return false;
104+
} else if (!getToken().equals(other.getToken()))
105+
return false;
106+
return true;
107+
}
75108
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ message RegisterApplicationMasterResponseProto {
4444
optional ResourceProto maximumCapability = 1;
4545
optional bytes client_to_am_token_master_key = 2;
4646
repeated ApplicationACLMapProto application_ACLs = 3;
47-
repeated ContainerProto containers_from_previous_attempt = 4;
47+
repeated ContainerProto containers_from_previous_attempts = 4;
4848
optional string queue = 5;
49+
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
4950
}
5051

5152
message FinishApplicationMasterRequestProto {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.hadoop.yarn.api.records.LocalResource;
7575
import org.apache.hadoop.yarn.api.records.LocalResourceType;
7676
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
77+
import org.apache.hadoop.yarn.api.records.NMToken;
7778
import org.apache.hadoop.yarn.api.records.NodeReport;
7879
import org.apache.hadoop.yarn.api.records.Priority;
7980
import org.apache.hadoop.yarn.api.records.Resource;
@@ -542,7 +543,7 @@ public void run() throws YarnException, IOException {
542543
}
543544

544545
List<Container> previousAMRunningContainers =
545-
response.getContainersFromPreviousAttempt();
546+
response.getContainersFromPreviousAttempts();
546547
LOG.info("Received " + previousAMRunningContainers.size()
547548
+ " previous AM's running containers on AM registration.");
548549
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
195195
appTrackingUrl);
196196
RegisterApplicationMasterResponse response =
197197
rmClient.registerApplicationMaster(request);
198+
199+
synchronized (this) {
200+
if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
201+
populateNMTokens(response.getNMTokensFromPreviousAttempts());
202+
}
203+
}
198204
return response;
199205
}
200206

@@ -250,7 +256,7 @@ public AllocateResponse allocate(float progressIndicator)
250256
lastResponseId = allocateResponse.getResponseId();
251257
clusterAvailableResources = allocateResponse.getAvailableResources();
252258
if (!allocateResponse.getNMTokens().isEmpty()) {
253-
populateNMTokens(allocateResponse);
259+
populateNMTokens(allocateResponse.getNMTokens());
254260
}
255261
}
256262
} finally {
@@ -284,13 +290,17 @@ public AllocateResponse allocate(float progressIndicator)
284290

285291
@Private
286292
@VisibleForTesting
287-
protected void populateNMTokens(AllocateResponse allocateResponse) {
288-
for (NMToken token : allocateResponse.getNMTokens()) {
293+
protected void populateNMTokens(List<NMToken> nmTokens) {
294+
for (NMToken token : nmTokens) {
289295
String nodeId = token.getNodeId().toString();
290296
if (getNMTokenCache().containsToken(nodeId)) {
291-
LOG.debug("Replacing token for : " + nodeId);
297+
if (LOG.isDebugEnabled()) {
298+
LOG.debug("Replacing token for : " + nodeId);
299+
}
292300
} else {
293-
LOG.debug("Received new token for : " + nodeId);
301+
if (LOG.isDebugEnabled()) {
302+
LOG.debug("Received new token for : " + nodeId);
303+
}
294304
}
295305
getNMTokenCache().setToken(nodeId, token.getToken());
296306
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java

Lines changed: 101 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
3232
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
3333
import org.apache.hadoop.yarn.api.records.Container;
34+
import org.apache.hadoop.yarn.api.records.NMToken;
3435
import org.apache.hadoop.yarn.api.records.Resource;
3536
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
37+
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
3638
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
3739
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
3840
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
3941
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
4042
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
43+
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
4144
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
4245
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
4346

@@ -56,7 +59,8 @@ public class RegisterApplicationMasterResponsePBImpl extends
5659

5760
private Resource maximumResourceCapability;
5861
private Map<ApplicationAccessType, String> applicationACLS = null;
59-
private List<Container> containersFromPreviousAttempt = null;
62+
private List<Container> containersFromPreviousAttempts = null;
63+
private List<NMToken> nmTokens = null;
6064

6165
public RegisterApplicationMasterResponsePBImpl() {
6266
builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -110,8 +114,13 @@ private void mergeLocalToBuilder() {
110114
if (this.applicationACLS != null) {
111115
addApplicationACLs();
112116
}
113-
if (this.containersFromPreviousAttempt != null) {
114-
addRunningContainersToProto();
117+
if (this.containersFromPreviousAttempts != null) {
118+
addContainersFromPreviousAttemptToProto();
119+
}
120+
if (nmTokens != null) {
121+
builder.clearNmTokensFromPreviousAttempts();
122+
Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
123+
builder.addAllNmTokensFromPreviousAttempts(iterable);
115124
}
116125
}
117126

@@ -236,21 +245,22 @@ public ByteBuffer getClientToAMTokenMasterKey() {
236245
}
237246

238247
@Override
239-
public List<Container> getContainersFromPreviousAttempt() {
240-
if (this.containersFromPreviousAttempt != null) {
241-
return this.containersFromPreviousAttempt;
248+
public List<Container> getContainersFromPreviousAttempts() {
249+
if (this.containersFromPreviousAttempts != null) {
250+
return this.containersFromPreviousAttempts;
242251
}
243-
initRunningContainersList();
244-
return this.containersFromPreviousAttempt;
252+
initContainersPreviousAttemptList();
253+
return this.containersFromPreviousAttempts;
245254
}
246255

247256
@Override
248-
public void setContainersFromPreviousAttempt(final List<Container> containers) {
257+
public void
258+
setContainersFromPreviousAttempts(final List<Container> containers) {
249259
if (containers == null) {
250260
return;
251261
}
252-
this.containersFromPreviousAttempt = new ArrayList<Container>();
253-
this.containersFromPreviousAttempt.addAll(containers);
262+
this.containersFromPreviousAttempts = new ArrayList<Container>();
263+
this.containersFromPreviousAttempts.addAll(containers);
254264
}
255265

256266
@Override
@@ -272,25 +282,88 @@ public void setQueue(String queue) {
272282
}
273283
}
274284

275-
private void initRunningContainersList() {
276-
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
277-
List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
278-
containersFromPreviousAttempt = new ArrayList<Container>();
285+
286+
private void initContainersPreviousAttemptList() {
287+
RegisterApplicationMasterResponseProtoOrBuilder p =
288+
viaProto ? proto : builder;
289+
List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
290+
containersFromPreviousAttempts = new ArrayList<Container>();
279291
for (ContainerProto c : list) {
280-
containersFromPreviousAttempt.add(convertFromProtoFormat(c));
292+
containersFromPreviousAttempts.add(convertFromProtoFormat(c));
281293
}
282294
}
283295

284-
private void addRunningContainersToProto() {
296+
private void addContainersFromPreviousAttemptToProto() {
285297
maybeInitBuilder();
286-
builder.clearContainersFromPreviousAttempt();
298+
builder.clearContainersFromPreviousAttempts();
287299
List<ContainerProto> list = new ArrayList<ContainerProto>();
288-
for (Container c : containersFromPreviousAttempt) {
300+
for (Container c : containersFromPreviousAttempts) {
289301
list.add(convertToProtoFormat(c));
290302
}
291-
builder.addAllContainersFromPreviousAttempt(list);
303+
builder.addAllContainersFromPreviousAttempts(list);
304+
}
305+
306+
307+
@Override
308+
public List<NMToken> getNMTokensFromPreviousAttempts() {
309+
if (nmTokens != null) {
310+
return nmTokens;
311+
}
312+
initLocalNewNMTokenList();
313+
return nmTokens;
292314
}
293315

316+
@Override
317+
public void setNMTokensFromPreviousAttempts(final List<NMToken> nmTokens) {
318+
if (nmTokens == null || nmTokens.isEmpty()) {
319+
if (this.nmTokens != null) {
320+
this.nmTokens.clear();
321+
}
322+
builder.clearNmTokensFromPreviousAttempts();
323+
return;
324+
}
325+
this.nmTokens = new ArrayList<NMToken>();
326+
this.nmTokens.addAll(nmTokens);
327+
}
328+
329+
private synchronized void initLocalNewNMTokenList() {
330+
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
331+
List<NMTokenProto> list = p.getNmTokensFromPreviousAttemptsList();
332+
nmTokens = new ArrayList<NMToken>();
333+
for (NMTokenProto t : list) {
334+
nmTokens.add(convertFromProtoFormat(t));
335+
}
336+
}
337+
338+
private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
339+
final List<NMToken> nmTokenList) {
340+
maybeInitBuilder();
341+
return new Iterable<NMTokenProto>() {
342+
@Override
343+
public synchronized Iterator<NMTokenProto> iterator() {
344+
return new Iterator<NMTokenProto>() {
345+
346+
Iterator<NMToken> iter = nmTokenList.iterator();
347+
348+
@Override
349+
public boolean hasNext() {
350+
return iter.hasNext();
351+
}
352+
353+
@Override
354+
public NMTokenProto next() {
355+
return convertToProtoFormat(iter.next());
356+
}
357+
358+
@Override
359+
public void remove() {
360+
throw new UnsupportedOperationException();
361+
}
362+
};
363+
}
364+
};
365+
}
366+
294367
private Resource convertFromProtoFormat(ResourceProto resource) {
295368
return new ResourcePBImpl(resource);
296369
}
@@ -306,4 +379,12 @@ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
306379
private ContainerProto convertToProtoFormat(Container t) {
307380
return ((ContainerPBImpl) t).getProto();
308381
}
382+
383+
private NMTokenProto convertToProtoFormat(NMToken token) {
384+
return ((NMTokenPBImpl) token).getProto();
385+
}
386+
387+
private NMToken convertFromProtoFormat(NMTokenProto proto) {
388+
return new NMTokenPBImpl(proto);
389+
}
309390
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public NMTokenPBImpl(NMTokenProto proto) {
4747
this.proto = proto;
4848
viaProto = true;
4949
}
50-
50+
5151
@Override
5252
public synchronized NodeId getNodeId() {
5353
NMTokenProtoOrBuilder p = viaProto ? proto : builder;

0 commit comments

Comments
 (0)