Skip to content

Commit cff4a1a

Browse files
committed
MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1431131 13f79535-47bb-0310-9956-ffa450edef68
1 parent 5fa646e commit cff4a1a

File tree

4 files changed

+128
-4
lines changed

4 files changed

+128
-4
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,9 @@ Release 0.23.6 - UNRELEASED
685685
MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally
686686
exits (Jason Lowe via tgraves)
687687

688+
MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
689+
Chen via jlowe)
690+
688691
Release 0.23.5 - UNRELEASED
689692

690693
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
579579
*/
580580
protected Recovery createRecoveryService(AppContext appContext) {
581581
return new RecoveryService(appContext.getApplicationAttemptId(),
582-
appContext.getClock(), getCommitter());
582+
appContext.getClock(), getCommitter(), isNewApiCommitter());
583583
}
584584

585585
/** Create and initialize (but don't start) a single job.

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.fs.FSDataInputStream;
3232
import org.apache.hadoop.fs.FileContext;
3333
import org.apache.hadoop.fs.Path;
34+
import org.apache.hadoop.mapred.JobConf;
3435
import org.apache.hadoop.mapreduce.MRJobConfig;
3536
import org.apache.hadoop.mapreduce.OutputCommitter;
3637
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -100,6 +101,7 @@ public class RecoveryService extends CompositeService implements Recovery {
100101

101102
private final ApplicationAttemptId applicationAttemptId;
102103
private final OutputCommitter committer;
104+
private final boolean newApiCommitter;
103105
private final Dispatcher dispatcher;
104106
private final ControlledClock clock;
105107

@@ -113,10 +115,11 @@ public class RecoveryService extends CompositeService implements Recovery {
113115
private volatile boolean recoveryMode = false;
114116

115117
public RecoveryService(ApplicationAttemptId applicationAttemptId,
116-
Clock clock, OutputCommitter committer) {
118+
Clock clock, OutputCommitter committer, boolean newApiCommitter) {
117119
super("RecoveringDispatcher");
118120
this.applicationAttemptId = applicationAttemptId;
119121
this.committer = committer;
122+
this.newApiCommitter = newApiCommitter;
120123
this.dispatcher = createRecoveryDispatcher();
121124
this.clock = new ControlledClock(clock);
122125
addService((Service) dispatcher);
@@ -360,8 +363,17 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)
360363
switch (state) {
361364
case SUCCEEDED:
362365
//recover the task output
363-
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
364-
attInfo.getAttemptId());
366+
367+
// check the committer type and construct corresponding context
368+
TaskAttemptContext taskContext = null;
369+
if(newApiCommitter) {
370+
taskContext = new TaskAttemptContextImpl(getConfig(),
371+
attInfo.getAttemptId());
372+
} else {
373+
taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
374+
TypeConverter.fromYarn(aId));
375+
}
376+
365377
try {
366378
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
367379
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,115 @@ public void testOutputRecoveryMapsOnly() throws Exception {
626626
validateOutput();
627627
}
628628

629+
@Test
630+
public void testRecoveryWithOldCommiter() throws Exception {
631+
int runCount = 0;
632+
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
633+
true, ++runCount);
634+
Configuration conf = new Configuration();
635+
conf.setBoolean("mapred.mapper.new-api", false);
636+
conf.setBoolean("mapred.reducer.new-api", false);
637+
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
638+
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
639+
Job job = app.submit(conf);
640+
app.waitForState(job, JobState.RUNNING);
641+
Assert.assertEquals("No of tasks not correct",
642+
3, job.getTasks().size());
643+
Iterator<Task> it = job.getTasks().values().iterator();
644+
Task mapTask1 = it.next();
645+
Task reduceTask1 = it.next();
646+
647+
// all maps must be running
648+
app.waitForState(mapTask1, TaskState.RUNNING);
649+
650+
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
651+
.next();
652+
653+
//before sending the TA_DONE, event make sure attempt has come to
654+
//RUNNING state
655+
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
656+
657+
//send the done signal to the map
658+
app.getContext().getEventHandler().handle(
659+
new TaskAttemptEvent(
660+
task1Attempt1.getID(),
661+
TaskAttemptEventType.TA_DONE));
662+
663+
//wait for map task to complete
664+
app.waitForState(mapTask1, TaskState.SUCCEEDED);
665+
666+
// Verify the shuffle-port
667+
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
668+
669+
app.waitForState(reduceTask1, TaskState.RUNNING);
670+
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
671+
672+
// write output corresponding to reduce1
673+
writeOutput(reduce1Attempt1, conf);
674+
675+
//send the done signal to the 1st reduce
676+
app.getContext().getEventHandler().handle(
677+
new TaskAttemptEvent(
678+
reduce1Attempt1.getID(),
679+
TaskAttemptEventType.TA_DONE));
680+
681+
//wait for first reduce task to complete
682+
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
683+
684+
//stop the app before the job completes.
685+
app.stop();
686+
687+
//rerun
688+
//in rerun the map will be recovered from previous run
689+
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
690+
++runCount);
691+
conf = new Configuration();
692+
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
693+
conf.setBoolean("mapred.mapper.new-api", false);
694+
conf.setBoolean("mapred.reducer.new-api", false);
695+
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
696+
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
697+
job = app.submit(conf);
698+
app.waitForState(job, JobState.RUNNING);
699+
Assert.assertEquals("No of tasks not correct",
700+
3, job.getTasks().size());
701+
it = job.getTasks().values().iterator();
702+
mapTask1 = it.next();
703+
reduceTask1 = it.next();
704+
Task reduceTask2 = it.next();
705+
706+
// map will be recovered, no need to send done
707+
app.waitForState(mapTask1, TaskState.SUCCEEDED);
708+
709+
// Verify the shuffle-port after recovery
710+
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
711+
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
712+
713+
// first reduce will be recovered, no need to send done
714+
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
715+
716+
app.waitForState(reduceTask2, TaskState.RUNNING);
717+
718+
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
719+
.iterator().next();
720+
//before sending the TA_DONE, event make sure attempt has come to
721+
//RUNNING state
722+
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
723+
724+
//send the done signal to the 2nd reduce task
725+
app.getContext().getEventHandler().handle(
726+
new TaskAttemptEvent(
727+
reduce2Attempt.getID(),
728+
TaskAttemptEventType.TA_DONE));
729+
730+
//wait to get it completed
731+
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
732+
733+
app.waitForState(job, JobState.SUCCEEDED);
734+
app.verifyCompleted();
735+
validateOutput();
736+
}
737+
629738
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
630739
throws Exception {
631740
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,

0 commit comments

Comments
 (0)