Skip to content

Commit f10e105

Browse files
committed
MAPREDUCE-5570. Map task attempt with fetch failure has incorrect attempt finish time. Contributed by Rushabh S Shah
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1577692 13f79535-47bb-0310-9956-ffa450edef68
1 parent 77e0eeb commit f10e105

File tree

3 files changed

+73
-3
lines changed

3 files changed

+73
-3
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ Release 2.4.0 - UNRELEASED
240240
MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed
241241
before registering with RM (Rohith via jlowe)
242242

243+
MAPREDUCE-5570. Map task attempt with fetch failure has incorrect attempt
244+
finish time (Rushabh S Shah via jlowe)
245+
243246
Release 2.3.1 - UNRELEASED
244247

245248
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,8 +1774,6 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
17741774
.checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
17751775
//add to diagnostic
17761776
taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
1777-
//set the finish time
1778-
taskAttempt.setFinishTime();
17791777

17801778
if (taskAttempt.getLaunchTime() != 0) {
17811779
taskAttempt.eventHandler

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertTrue;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.spy;
2526
import static org.mockito.Mockito.times;
@@ -91,7 +92,7 @@
9192

9293
@SuppressWarnings({"unchecked", "rawtypes"})
9394
public class TestTaskAttempt{
94-
95+
9596
static public class StubbedFS extends RawLocalFileSystem {
9697
@Override
9798
public FileStatus getFileStatus(Path f) throws IOException {
@@ -725,6 +726,74 @@ public void testAppDiognosticEventOnNewTask() throws Exception {
725726
eventHandler.internalError);
726727
}
727728

729+
@Test
730+
public void testFetchFailureAttemptFinishTime() throws Exception{
731+
ApplicationId appId = ApplicationId.newInstance(1, 2);
732+
ApplicationAttemptId appAttemptId =
733+
ApplicationAttemptId.newInstance(appId, 0);
734+
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
735+
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
736+
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
737+
Path jobFile = mock(Path.class);
738+
739+
MockEventHandler eventHandler = new MockEventHandler();
740+
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
741+
when(taListener.getAddress()).thenReturn(
742+
new InetSocketAddress("localhost", 0));
743+
744+
JobConf jobConf = new JobConf();
745+
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
746+
jobConf.setBoolean("fs.file.impl.disable.cache", true);
747+
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
748+
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
749+
750+
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
751+
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
752+
753+
AppContext appCtx = mock(AppContext.class);
754+
ClusterInfo clusterInfo = mock(ClusterInfo.class);
755+
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
756+
757+
TaskAttemptImpl taImpl =
758+
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
759+
splits, jobConf, taListener,mock(Token.class), new Credentials(),
760+
new SystemClock(), appCtx);
761+
762+
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
763+
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
764+
Container container = mock(Container.class);
765+
when(container.getId()).thenReturn(contId);
766+
when(container.getNodeId()).thenReturn(nid);
767+
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
768+
769+
taImpl.handle(new TaskAttemptEvent(attemptId,
770+
TaskAttemptEventType.TA_SCHEDULE));
771+
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
772+
container, mock(Map.class)));
773+
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
774+
taImpl.handle(new TaskAttemptEvent(attemptId,
775+
TaskAttemptEventType.TA_DONE));
776+
taImpl.handle(new TaskAttemptEvent(attemptId,
777+
TaskAttemptEventType.TA_CONTAINER_CLEANED));
778+
779+
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
780+
TaskAttemptState.SUCCEEDED);
781+
782+
assertTrue("Task Attempt finish time is not greater than 0",
783+
taImpl.getFinishTime() > 0);
784+
785+
Long finishTime = taImpl.getFinishTime();
786+
Thread.sleep(5);
787+
taImpl.handle(new TaskAttemptEvent(attemptId,
788+
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
789+
790+
assertEquals("Task attempt is not in Too Many Fetch Failure state",
791+
taImpl.getState(), TaskAttemptState.FAILED);
792+
793+
assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
794+
+ " Task attempt finish time is not the same ",
795+
finishTime, Long.valueOf(taImpl.getFinishTime()));
796+
}
728797

729798
public static class MockEventHandler implements EventHandler {
730799
public boolean internalError;

0 commit comments

Comments
 (0)