Skip to content

Commit d7ada82

Browse files
committed
MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM (Tom Graves via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344283 13f79535-47bb-0310-9956-ffa450edef68
1 parent 96659f4 commit d7ada82

File tree

5 files changed

+226
-23
lines changed

5 files changed

+226
-23
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,9 @@ Release 0.23.3 - UNRELEASED
544544
MAPREDUCE-3870. Invalid App Metrics
545545
(Bhallamudi Venkata Siva Kamesh via tgraves).
546546

547+
MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM
548+
(Tom Graves via bobby)
549+
547550
Release 0.23.2 - UNRELEASED
548551

549552
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
253253
.addTransition(TaskAttemptState.RUNNING,
254254
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
255255
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
256+
// if container killed by AM shutting down
257+
.addTransition(TaskAttemptState.RUNNING,
258+
TaskAttemptState.KILLED,
259+
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
256260
// Kill handling
257261
.addTransition(TaskAttemptState.RUNNING,
258262
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
@@ -272,6 +276,10 @@ TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
272276
.addTransition(TaskAttemptState.COMMIT_PENDING,
273277
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
274278
CLEANUP_CONTAINER_TRANSITION)
279+
// if container killed by AM shutting down
280+
.addTransition(TaskAttemptState.COMMIT_PENDING,
281+
TaskAttemptState.KILLED,
282+
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
275283
.addTransition(TaskAttemptState.COMMIT_PENDING,
276284
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
277285
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
@@ -363,6 +371,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
363371
TaskAttemptEventType.TA_COMMIT_PENDING,
364372
TaskAttemptEventType.TA_DONE,
365373
TaskAttemptEventType.TA_FAILMSG,
374+
TaskAttemptEventType.TA_CONTAINER_CLEANED,
366375
// Container launch events can arrive late
367376
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
368377
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -384,6 +393,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
384393
TaskAttemptEventType.TA_COMMIT_PENDING,
385394
TaskAttemptEventType.TA_DONE,
386395
TaskAttemptEventType.TA_FAILMSG,
396+
TaskAttemptEventType.TA_CONTAINER_CLEANED,
387397
// Container launch events can arrive late
388398
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
389399
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -402,6 +412,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
402412
TaskAttemptState.SUCCEEDED,
403413
EnumSet.of(TaskAttemptEventType.TA_KILL,
404414
TaskAttemptEventType.TA_FAILMSG,
415+
TaskAttemptEventType.TA_CONTAINER_CLEANED,
405416
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
406417

407418
// Transitions from FAILED state
@@ -417,6 +428,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
417428
// Container launch events can arrive late
418429
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
419430
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
431+
TaskAttemptEventType.TA_CONTAINER_CLEANED,
420432
TaskAttemptEventType.TA_COMMIT_PENDING,
421433
TaskAttemptEventType.TA_DONE,
422434
TaskAttemptEventType.TA_FAILMSG))
@@ -434,6 +446,7 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
434446
// Container launch events can arrive late
435447
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
436448
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
449+
TaskAttemptEventType.TA_CONTAINER_CLEANED,
437450
TaskAttemptEventType.TA_COMMIT_PENDING,
438451
TaskAttemptEventType.TA_DONE,
439452
TaskAttemptEventType.TA_FAILMSG))

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,12 @@ public class ContainerLauncherImpl extends AbstractService implements
8282
new LinkedBlockingQueue<ContainerLauncherEvent>();
8383
YarnRPC rpc;
8484

85-
private Container getContainer(ContainerId id) {
85+
private Container getContainer(ContainerLauncherEvent event) {
86+
ContainerId id = event.getContainerID();
8687
Container c = containers.get(id);
8788
if(c == null) {
88-
c = new Container();
89+
c = new Container(event.getTaskAttemptID(), event.getContainerID(),
90+
event.getContainerMgrAddress(), event.getContainerToken());
8991
Container old = containers.putIfAbsent(id, c);
9092
if(old != null) {
9193
c = old;
@@ -107,9 +109,19 @@ private static enum ContainerState {
107109

108110
private class Container {
109111
private ContainerState state;
112+
// store enough information to be able to cleanup the container
113+
private TaskAttemptId taskAttemptID;
114+
private ContainerId containerID;
115+
final private String containerMgrAddress;
116+
private ContainerToken containerToken;
110117

111-
public Container() {
118+
public Container(TaskAttemptId taId, ContainerId containerID,
119+
String containerMgrAddress, ContainerToken containerToken) {
112120
this.state = ContainerState.PREP;
121+
this.taskAttemptID = taId;
122+
this.containerMgrAddress = containerMgrAddress;
123+
this.containerID = containerID;
124+
this.containerToken = containerToken;
113125
}
114126

115127
public synchronized boolean isCompletelyDone() {
@@ -118,7 +130,6 @@ public synchronized boolean isCompletelyDone() {
118130

119131
@SuppressWarnings("unchecked")
120132
public synchronized void launch(ContainerRemoteLaunchEvent event) {
121-
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
122133
LOG.info("Launching " + taskAttemptID);
123134
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
124135
state = ContainerState.DONE;
@@ -127,15 +138,10 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
127138
return;
128139
}
129140

130-
131-
final String containerManagerBindAddr = event.getContainerMgrAddress();
132-
ContainerId containerID = event.getContainerID();
133-
ContainerToken containerToken = event.getContainerToken();
134-
135141
ContainerManager proxy = null;
136142
try {
137143

138-
proxy = getCMProxy(containerID, containerManagerBindAddr,
144+
proxy = getCMProxy(containerID, containerMgrAddress,
139145
containerToken);
140146

141147
// Construct the actual Container
@@ -181,35 +187,35 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
181187
}
182188

183189
@SuppressWarnings("unchecked")
184-
public synchronized void kill(ContainerLauncherEvent event) {
190+
public synchronized void kill() {
191+
192+
if(isCompletelyDone()) {
193+
return;
194+
}
185195
if(this.state == ContainerState.PREP) {
186196
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
187197
} else {
188-
final String containerManagerBindAddr = event.getContainerMgrAddress();
189-
ContainerId containerID = event.getContainerID();
190-
ContainerToken containerToken = event.getContainerToken();
191-
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
192198
LOG.info("KILLING " + taskAttemptID);
193199

194200
ContainerManager proxy = null;
195201
try {
196-
proxy = getCMProxy(containerID, containerManagerBindAddr,
197-
containerToken);
202+
proxy = getCMProxy(this.containerID, this.containerMgrAddress,
203+
this.containerToken);
198204

199205
// kill the remote container if already launched
200206
StopContainerRequest stopRequest = Records
201207
.newRecord(StopContainerRequest.class);
202-
stopRequest.setContainerId(event.getContainerID());
208+
stopRequest.setContainerId(this.containerID);
203209
proxy.stopContainer(stopRequest);
204210

205211
} catch (Throwable t) {
206212

207213
// ignore the cleanup failure
208214
String message = "cleanup failed for container "
209-
+ event.getContainerID() + " : "
215+
+ this.containerID + " : "
210216
+ StringUtils.stringifyException(t);
211217
context.getEventHandler().handle(
212-
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
218+
new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message));
213219
LOG.warn(message);
214220
} finally {
215221
if (proxy != null) {
@@ -220,10 +226,11 @@ public synchronized void kill(ContainerLauncherEvent event) {
220226
}
221227
// after killing, send killed event to task attempt
222228
context.getEventHandler().handle(
223-
new TaskAttemptEvent(event.getTaskAttemptID(),
229+
new TaskAttemptEvent(this.taskAttemptID,
224230
TaskAttemptEventType.TA_CONTAINER_CLEANED));
225231
}
226232
}
233+
227234
// To track numNodes.
228235
Set<String> allNodes = new HashSet<String>();
229236

@@ -308,7 +315,17 @@ public void run() {
308315
super.start();
309316
}
310317

318+
private void shutdownAllContainers() {
319+
for (Container ct : this.containers.values()) {
320+
if (ct != null) {
321+
ct.kill();
322+
}
323+
}
324+
}
325+
311326
public void stop() {
327+
// shutdown any containers that might be left running
328+
shutdownAllContainers();
312329
eventHandlingThread.interrupt();
313330
launcherPool.shutdownNow();
314331
super.stop();
@@ -364,7 +381,7 @@ public void run() {
364381
// TODO: Do it only once per NodeManager.
365382
ContainerId containerID = event.getContainerID();
366383

367-
Container c = getContainer(containerID);
384+
Container c = getContainer(event);
368385
switch(event.getType()) {
369386

370387
case CONTAINER_REMOTE_LAUNCH:
@@ -374,7 +391,7 @@ public void run() {
374391
break;
375392

376393
case CONTAINER_REMOTE_CLEANUP:
377-
c.kill(event);
394+
c.kill();
378395
break;
379396
}
380397
removeContainerIfDone(containerID);

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
7373
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
7474
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
75+
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
7576
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
7677
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
7778
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -450,6 +451,121 @@ public void testLaunchFailedWhileKilling() throws Exception {
450451
assertFalse(eventHandler.internalError);
451452
}
452453

454+
@Test
455+
public void testContainerCleanedWhileRunning() throws Exception {
456+
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
457+
ApplicationAttemptId appAttemptId =
458+
BuilderUtils.newApplicationAttemptId(appId, 0);
459+
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
460+
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
461+
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
462+
Path jobFile = mock(Path.class);
463+
464+
MockEventHandler eventHandler = new MockEventHandler();
465+
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
466+
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
467+
468+
JobConf jobConf = new JobConf();
469+
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
470+
jobConf.setBoolean("fs.file.impl.disable.cache", true);
471+
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
472+
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
473+
474+
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
475+
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
476+
477+
AppContext appCtx = mock(AppContext.class);
478+
ClusterInfo clusterInfo = mock(ClusterInfo.class);
479+
Resource resource = mock(Resource.class);
480+
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
481+
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
482+
when(resource.getMemory()).thenReturn(1024);
483+
484+
TaskAttemptImpl taImpl =
485+
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
486+
splits, jobConf, taListener,
487+
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
488+
new SystemClock(), appCtx);
489+
490+
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
491+
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
492+
Container container = mock(Container.class);
493+
when(container.getId()).thenReturn(contId);
494+
when(container.getNodeId()).thenReturn(nid);
495+
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
496+
497+
taImpl.handle(new TaskAttemptEvent(attemptId,
498+
TaskAttemptEventType.TA_SCHEDULE));
499+
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
500+
container, mock(Map.class)));
501+
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
502+
assertEquals("Task attempt is not in running state", taImpl.getState(),
503+
TaskAttemptState.RUNNING);
504+
taImpl.handle(new TaskAttemptEvent(attemptId,
505+
TaskAttemptEventType.TA_CONTAINER_CLEANED));
506+
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
507+
eventHandler.internalError);
508+
}
509+
510+
@Test
511+
public void testContainerCleanedWhileCommitting() throws Exception {
512+
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
513+
ApplicationAttemptId appAttemptId =
514+
BuilderUtils.newApplicationAttemptId(appId, 0);
515+
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
516+
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
517+
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
518+
Path jobFile = mock(Path.class);
519+
520+
MockEventHandler eventHandler = new MockEventHandler();
521+
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
522+
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
523+
524+
JobConf jobConf = new JobConf();
525+
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
526+
jobConf.setBoolean("fs.file.impl.disable.cache", true);
527+
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
528+
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
529+
530+
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
531+
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
532+
533+
AppContext appCtx = mock(AppContext.class);
534+
ClusterInfo clusterInfo = mock(ClusterInfo.class);
535+
Resource resource = mock(Resource.class);
536+
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
537+
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
538+
when(resource.getMemory()).thenReturn(1024);
539+
540+
TaskAttemptImpl taImpl =
541+
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
542+
splits, jobConf, taListener,
543+
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
544+
new SystemClock(), appCtx);
545+
546+
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
547+
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
548+
Container container = mock(Container.class);
549+
when(container.getId()).thenReturn(contId);
550+
when(container.getNodeId()).thenReturn(nid);
551+
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
552+
553+
taImpl.handle(new TaskAttemptEvent(attemptId,
554+
TaskAttemptEventType.TA_SCHEDULE));
555+
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
556+
container, mock(Map.class)));
557+
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
558+
taImpl.handle(new TaskAttemptEvent(attemptId,
559+
TaskAttemptEventType.TA_COMMIT_PENDING));
560+
561+
assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
562+
TaskAttemptState.COMMIT_PENDING);
563+
taImpl.handle(new TaskAttemptEvent(attemptId,
564+
TaskAttemptEventType.TA_CONTAINER_CLEANED));
565+
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
566+
eventHandler.internalError);
567+
}
568+
453569
public static class MockEventHandler implements EventHandler {
454570
public boolean internalError;
455571

0 commit comments

Comments
 (0)