Skip to content

Commit dd3fec2

Browse files
committed
MAPREDUCE-5079. Changes job recovery to restore state directly from job history, instaed of simulating state machine events. Contributed by Jason Lowe and Robert Parker.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466767 13f79535-47bb-0310-9956-ffa450edef68
1 parent 29c964d commit dd3fec2

File tree

21 files changed

+1147
-771
lines changed

21 files changed

+1147
-771
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ Release 2.0.5-alpha - UNRELEASED
169169
MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
170170
acmurthy)
171171

172+
MAPREDUCE-5079. Changes job recovery to restore state directly from job
173+
history, instaed of simulating state machine events.
174+
(Jason Lowe and Robert Parker via sseth)
175+
172176
OPTIMIZATIONS
173177

174178
BUG FIXES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

Lines changed: 116 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
import java.lang.reflect.InvocationTargetException;
2525
import java.security.PrivilegedExceptionAction;
2626
import java.util.ArrayList;
27+
import java.util.HashMap;
28+
import java.util.Iterator;
2729
import java.util.LinkedList;
2830
import java.util.List;
2931
import java.util.Map;
32+
import java.util.Map.Entry;
3033
import java.util.concurrent.ConcurrentHashMap;
3134

3235
import org.apache.commons.io.IOUtils;
@@ -46,6 +49,7 @@
4649
import org.apache.hadoop.mapreduce.OutputCommitter;
4750
import org.apache.hadoop.mapreduce.OutputFormat;
4851
import org.apache.hadoop.mapreduce.TaskAttemptContext;
52+
import org.apache.hadoop.mapreduce.TaskAttemptID;
4953
import org.apache.hadoop.mapreduce.TypeConverter;
5054
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
5155
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
@@ -54,13 +58,17 @@
5458
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
5559
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
5660
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
61+
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
62+
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
63+
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
5764
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
5865
import org.apache.hadoop.mapreduce.security.TokenCache;
5966
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
6067
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
6168
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
6269
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
6370
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
71+
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
6472
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
6573
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
6674
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@@ -74,6 +82,7 @@
7482
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
7583
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
7684
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
85+
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
7786
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
7887
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
7988
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -84,8 +93,6 @@
8493
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
8594
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
8695
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
87-
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
88-
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
8996
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
9097
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
9198
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
@@ -94,6 +101,7 @@
94101
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
95102
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
96103
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
104+
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
97105
import org.apache.hadoop.mapreduce.v2.util.MRApps;
98106
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
99107
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -167,7 +175,6 @@ public class MRAppMaster extends CompositeService {
167175
private AppContext context;
168176
private Dispatcher dispatcher;
169177
private ClientService clientService;
170-
private Recovery recoveryServ;
171178
private ContainerAllocator containerAllocator;
172179
private ContainerLauncher containerLauncher;
173180
private EventHandler<CommitterEvent> committerEventHandler;
@@ -180,7 +187,6 @@ public class MRAppMaster extends CompositeService {
180187
private OutputCommitter committer;
181188
private JobEventDispatcher jobEventDispatcher;
182189
private JobHistoryEventHandler jobHistoryEventHandler;
183-
private boolean inRecovery = false;
184190
private SpeculatorEventDispatcher speculatorEventDispatcher;
185191

186192
private Job job;
@@ -193,6 +199,8 @@ public class MRAppMaster extends CompositeService {
193199
private String shutDownMessage = null;
194200
JobStateInternal forcedState = null;
195201

202+
private long recoveredJobStartTime = 0;
203+
196204
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
197205
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
198206
long appSubmitTime, int maxAppAttempts) {
@@ -340,34 +348,9 @@ public void init(final Configuration conf) {
340348
}
341349
} else {
342350
committer = createOutputCommitter(conf);
343-
boolean recoveryEnabled = conf.getBoolean(
344-
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
345-
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
346-
347-
// If a shuffle secret was not provided by the job client then this app
348-
// attempt will generate one. However that disables recovery if there
349-
// are reducers as the shuffle secret would be app attempt specific.
350-
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
351-
TokenCache.getShuffleSecretKey(fsTokens) != null);
352-
353-
if (recoveryEnabled && recoverySupportedByCommitter
354-
&& shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
355-
LOG.info("Recovery is enabled. "
356-
+ "Will try to recover from previous life on best effort basis.");
357-
recoveryServ = createRecoveryService(context);
358-
addIfService(recoveryServ);
359-
dispatcher = recoveryServ.getDispatcher();
360-
clock = recoveryServ.getClock();
361-
inRecovery = true;
362-
} else {
363-
LOG.info("Not starting RecoveryService: recoveryEnabled: "
364-
+ recoveryEnabled + " recoverySupportedByCommitter: "
365-
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
366-
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
367-
+ appAttemptID.getAttemptId());
368-
dispatcher = createDispatcher();
369-
addIfService(dispatcher);
370-
}
351+
352+
dispatcher = createDispatcher();
353+
addIfService(dispatcher);
371354

372355
//service to handle requests from JobClient
373356
clientService = createClientService(context);
@@ -595,15 +578,6 @@ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
595578
return new JobFinishEventHandler();
596579
}
597580

598-
/**
599-
* Create the recovery service.
600-
* @return an instance of the recovery service.
601-
*/
602-
protected Recovery createRecoveryService(AppContext appContext) {
603-
return new RecoveryService(appContext.getApplicationAttemptId(),
604-
appContext.getClock(), getCommitter(), isNewApiCommitter());
605-
}
606-
607581
/** Create and initialize (but don't start) a single job.
608582
* @param forcedState a state to force the job into or null for normal operation.
609583
* @param diagnostic a diagnostic message to include with the job.
@@ -615,7 +589,8 @@ protected Job createJob(Configuration conf, JobStateInternal forcedState,
615589
Job newJob =
616590
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
617591
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
618-
completedTasksFromPreviousRun, metrics, newApiCommitter,
592+
completedTasksFromPreviousRun, metrics,
593+
committer, newApiCommitter,
619594
currentUser.getUserName(), appSubmitTime, amInfos, context,
620595
forcedState, diagnostic);
621596
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@@ -978,18 +953,8 @@ public ClusterInfo getClusterInfo() {
978953
public void start() {
979954

980955
amInfos = new LinkedList<AMInfo>();
981-
982-
// Pull completedTasks etc from recovery
983-
if (inRecovery) {
984-
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
985-
amInfos = recoveryServ.getAMInfos();
986-
} else {
987-
// Get the amInfos anyways irrespective of whether recovery is enabled or
988-
// not IF this is not the first AM generation
989-
if (appAttemptID.getAttemptId() != 1) {
990-
amInfos.addAll(readJustAMInfos());
991-
}
992-
}
956+
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
957+
processRecovery();
993958

994959
// Current an AMInfo for the current AM generation.
995960
AMInfo amInfo =
@@ -1051,13 +1016,105 @@ public void start() {
10511016
startJobs();
10521017
}
10531018

1019+
private void processRecovery() {
1020+
if (appAttemptID.getAttemptId() == 1) {
1021+
return; // no need to recover on the first attempt
1022+
}
1023+
1024+
boolean recoveryEnabled = getConfig().getBoolean(
1025+
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
1026+
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
1027+
boolean recoverySupportedByCommitter =
1028+
committer != null && committer.isRecoverySupported();
1029+
1030+
// If a shuffle secret was not provided by the job client then this app
1031+
// attempt will generate one. However that disables recovery if there
1032+
// are reducers as the shuffle secret would be app attempt specific.
1033+
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
1034+
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
1035+
TokenCache.getShuffleSecretKey(fsTokens) != null);
1036+
1037+
if (recoveryEnabled && recoverySupportedByCommitter
1038+
&& shuffleKeyValidForRecovery) {
1039+
LOG.info("Recovery is enabled. "
1040+
+ "Will try to recover from previous life on best effort basis.");
1041+
try {
1042+
parsePreviousJobHistory();
1043+
} catch (IOException e) {
1044+
LOG.warn("Unable to parse prior job history, aborting recovery", e);
1045+
// try to get just the AMInfos
1046+
amInfos.addAll(readJustAMInfos());
1047+
}
1048+
} else {
1049+
LOG.info("Will not try to recover. recoveryEnabled: "
1050+
+ recoveryEnabled + " recoverySupportedByCommitter: "
1051+
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
1052+
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
1053+
+ appAttemptID.getAttemptId());
1054+
// Get the amInfos anyways whether recovery is enabled or not
1055+
amInfos.addAll(readJustAMInfos());
1056+
}
1057+
}
1058+
1059+
private static FSDataInputStream getPreviousJobHistoryStream(
1060+
Configuration conf, ApplicationAttemptId appAttemptId)
1061+
throws IOException {
1062+
Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
1063+
appAttemptId);
1064+
LOG.info("Previous history file is at " + historyFile);
1065+
return historyFile.getFileSystem(conf).open(historyFile);
1066+
}
1067+
1068+
private void parsePreviousJobHistory() throws IOException {
1069+
FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
1070+
appAttemptID);
1071+
JobHistoryParser parser = new JobHistoryParser(in);
1072+
JobInfo jobInfo = parser.parse();
1073+
Exception parseException = parser.getParseException();
1074+
if (parseException != null) {
1075+
LOG.info("Got an error parsing job-history file" +
1076+
", ignoring incomplete events.", parseException);
1077+
}
1078+
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
1079+
.getAllTasks();
1080+
for (TaskInfo taskInfo : taskInfos.values()) {
1081+
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
1082+
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
1083+
taskInfo.getAllTaskAttempts().entrySet().iterator();
1084+
while (taskAttemptIterator.hasNext()) {
1085+
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
1086+
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
1087+
taskAttemptIterator.remove();
1088+
}
1089+
}
1090+
completedTasksFromPreviousRun
1091+
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
1092+
LOG.info("Read from history task "
1093+
+ TypeConverter.toYarn(taskInfo.getTaskId()));
1094+
}
1095+
}
1096+
LOG.info("Read completed tasks from history "
1097+
+ completedTasksFromPreviousRun.size());
1098+
recoveredJobStartTime = jobInfo.getLaunchTime();
1099+
1100+
// recover AMInfos
1101+
List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
1102+
if (jhAmInfoList != null) {
1103+
for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
1104+
AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
1105+
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
1106+
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
1107+
jhAmInfo.getNodeManagerHttpPort());
1108+
amInfos.add(amInfo);
1109+
}
1110+
}
1111+
}
1112+
10541113
private List<AMInfo> readJustAMInfos() {
10551114
List<AMInfo> amInfos = new ArrayList<AMInfo>();
10561115
FSDataInputStream inputStream = null;
10571116
try {
1058-
inputStream =
1059-
RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
1060-
appAttemptID);
1117+
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
10611118
EventReader jobHistoryEventReader = new EventReader(inputStream);
10621119

10631120
// All AMInfos are contiguous. Track when the first AMStartedEvent
@@ -1108,7 +1165,8 @@ private List<AMInfo> readJustAMInfos() {
11081165
@SuppressWarnings("unchecked")
11091166
protected void startJobs() {
11101167
/** create a job-start event to get this ball rolling */
1111-
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
1168+
JobEvent startJobEvent = new JobStartEvent(job.getID(),
1169+
recoveredJobStartTime);
11121170
/** send the job-start event. this triggers the job execution. */
11131171
dispatcher.getEventHandler().handle(startJobEvent);
11141172
}
Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/**
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -15,6 +15,25 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
@InterfaceAudience.Private
19-
package org.apache.hadoop.mapreduce.v2.app.recover;
20-
import org.apache.hadoop.classification.InterfaceAudience;
18+
19+
package org.apache.hadoop.mapreduce.v2.app.job.event;
20+
21+
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
22+
23+
public class JobStartEvent extends JobEvent {
24+
25+
long recoveredJobStartTime;
26+
27+
public JobStartEvent(JobId jobID) {
28+
this(jobID, 0);
29+
}
30+
31+
public JobStartEvent(JobId jobID, long recoveredJobStartTime) {
32+
super(jobID, JobEventType.JOB_START);
33+
this.recoveredJobStartTime = recoveredJobStartTime;
34+
}
35+
36+
public long getRecoveredJobStartTime() {
37+
return recoveredJobStartTime;
38+
}
39+
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
2626
//Producer:Task
2727
TA_SCHEDULE,
2828
TA_RESCHEDULE,
29+
TA_RECOVER,
2930

3031
//Producer:Client, Task
3132
TA_KILL,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.mapreduce.v2.app.job.event;
20+
21+
import org.apache.hadoop.mapreduce.OutputCommitter;
22+
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
23+
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
24+
25+
public class TaskAttemptRecoverEvent extends TaskAttemptEvent {
26+
27+
private TaskAttemptInfo taInfo;
28+
private OutputCommitter committer;
29+
private boolean recoverAttemptOutput;
30+
31+
public TaskAttemptRecoverEvent(TaskAttemptId id, TaskAttemptInfo taInfo,
32+
OutputCommitter committer, boolean recoverOutput) {
33+
super(id, TaskAttemptEventType.TA_RECOVER);
34+
this.taInfo = taInfo;
35+
this.committer = committer;
36+
this.recoverAttemptOutput = recoverOutput;
37+
}
38+
39+
public TaskAttemptInfo getTaskAttemptInfo() {
40+
return taInfo;
41+
}
42+
43+
public OutputCommitter getCommitter() {
44+
return committer;
45+
}
46+
47+
public boolean getRecoverOutput() {
48+
return recoverAttemptOutput;
49+
}
50+
}

0 commit comments

Comments
 (0)