Skip to content

Commit 25df887

Browse files
committed
MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it recovers from a commit during a previous attempt. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581180 13f79535-47bb-0310-9956-ffa450edef68
1 parent b5d22ae commit 25df887

File tree

6 files changed

+127
-12
lines changed

6 files changed

+127
-12
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ Release 2.4.0 - UNRELEASED
263263
FadviseFileRegion::transferTo does not read disks efficiently.
264264
(Nikola Vujic via cnauroth)
265265

266+
MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it
267+
recovers from a commit during a previous attempt. (Xuan Gong via vinodkv)
268+
266269
Release 2.3.1 - UNRELEASED
267270

268271
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import org.apache.commons.logging.Log;
3333
import org.apache.commons.logging.LogFactory;
34+
import org.apache.hadoop.classification.InterfaceAudience.Private;
3435
import org.apache.hadoop.conf.Configuration;
3536
import org.apache.hadoop.fs.FSDataOutputStream;
3637
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -49,6 +50,7 @@
4950
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
5051
import org.apache.hadoop.mapreduce.v2.app.AppContext;
5152
import org.apache.hadoop.mapreduce.v2.app.job.Job;
53+
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
5254
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
5355
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
5456
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@@ -348,7 +350,9 @@ protected void serviceStop() throws Exception {
348350
JobUnsuccessfulCompletionEvent jucEvent =
349351
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
350352
System.currentTimeMillis(), job.getCompletedMaps(),
351-
job.getCompletedReduces(), JobState.KILLED.toString(),
353+
job.getCompletedReduces(),
354+
createJobStateForJobUnsuccessfulCompletionEvent(
355+
mi.getForcedJobStateOnShutDown()),
352356
job.getDiagnostics());
353357
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
354358
//Bypass the queue mechanism which might wait. Call the method directly
@@ -381,9 +385,10 @@ protected EventWriter createEventWriter(Path historyFilePath)
381385
* This should be the first call to history for a job
382386
*
383387
* @param jobId the jobId.
388+
* @param forcedJobStateOnShutDown
384389
* @throws IOException
385390
*/
386-
protected void setupEventWriter(JobId jobId)
391+
protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
387392
throws IOException {
388393
if (stagingDirPath == null) {
389394
LOG.error("Log Directory is null, returning");
@@ -438,7 +443,7 @@ protected void setupEventWriter(JobId jobId)
438443
}
439444

440445
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
441-
user, jobName, jobId);
446+
user, jobName, jobId, forcedJobStateOnShutDown);
442447
fi.getJobSummary().setJobId(jobId);
443448
fileMap.put(jobId, fi);
444449
}
@@ -481,13 +486,17 @@ private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
481486
return false;
482487
}
483488

484-
protected void handleEvent(JobHistoryEvent event) {
489+
@Private
490+
public void handleEvent(JobHistoryEvent event) {
485491
synchronized (lock) {
486492

487493
// If this is JobSubmitted Event, setup the writer
488494
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
489495
try {
490-
setupEventWriter(event.getJobID());
496+
AMStartedEvent amStartedEvent =
497+
(AMStartedEvent) event.getHistoryEvent();
498+
setupEventWriter(event.getJobID(),
499+
amStartedEvent.getForcedJobStateOnShutDown());
491500
} catch (IOException ioe) {
492501
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
493502
ioe);
@@ -804,16 +813,18 @@ protected class MetaInfo {
804813
Timer flushTimer;
805814
FlushTimerTask flushTimerTask;
806815
private boolean isTimerShutDown = false;
816+
private String forcedJobStateOnShutDown;
807817

808818
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
809-
String jobName, JobId jobId) {
819+
String jobName, JobId jobId, String forcedJobStateOnShutDown) {
810820
this.historyFile = historyFile;
811821
this.confFile = conf;
812822
this.writer = writer;
813823
this.jobIndexInfo =
814824
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
815825
this.jobSummary = new JobSummary();
816826
this.flushTimer = new Timer("FlushTimer", true);
827+
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
817828
}
818829

819830
Path getHistoryFile() {
@@ -840,6 +851,10 @@ boolean isTimerShutDown() {
840851
return isTimerShutDown;
841852
}
842853

854+
String getForcedJobStateOnShutDown() {
855+
return forcedJobStateOnShutDown;
856+
}
857+
843858
@Override
844859
public String toString() {
845860
return "Job MetaInfo for "+ jobSummary.getJobId()
@@ -983,4 +998,20 @@ public void setForcejobCompletion(boolean forceJobCompletion) {
983998
LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
984999
+ forceJobCompletion);
9851000
}
1001+
1002+
private String createJobStateForJobUnsuccessfulCompletionEvent(
1003+
String forcedJobStateOnShutDown) {
1004+
if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown
1005+
.isEmpty()) {
1006+
return JobState.KILLED.toString();
1007+
} else if (forcedJobStateOnShutDown.equals(
1008+
JobStateInternal.ERROR.toString()) ||
1009+
forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
1010+
return JobState.FAILED.toString();
1011+
} else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED
1012+
.toString())) {
1013+
return JobState.SUCCEEDED.toString();
1014+
}
1015+
return JobState.KILLED.toString();
1016+
}
9861017
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,14 +1026,13 @@ protected void serviceStart() throws Exception {
10261026
AMInfo amInfo =
10271027
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
10281028
nmPort, nmHttpPort);
1029-
amInfos.add(amInfo);
10301029

10311030
// /////////////////// Create the job itself.
10321031
job = createJob(getConfig(), forcedState, shutDownMessage);
10331032

10341033
// End of creating the job.
10351034

1036-
// Send out an MR AM inited event for this AM and all previous AMs.
1035+
// Send out an MR AM inited event for all previous AMs.
10371036
for (AMInfo info : amInfos) {
10381037
dispatcher.getEventHandler().handle(
10391038
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
@@ -1042,6 +1041,15 @@ protected void serviceStart() throws Exception {
10421041
.getNodeManagerHttpPort())));
10431042
}
10441043

1044+
// Send out an MR AM inited event for this AM.
1045+
dispatcher.getEventHandler().handle(
1046+
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
1047+
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
1048+
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
1049+
.getNodeManagerHttpPort(), this.forcedState == null ? null
1050+
: this.forcedState.toString())));
1051+
amInfos.add(amInfo);
1052+
10451053
// metrics system init is really init & start.
10461054
// It's more test friendly to put it here.
10471055
DefaultMetricsSystem.initialize("MRAppMaster");

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ public void addToFileMap(JobId jobId) {
497497
JobHistoryEvent lastEventHandled;
498498
int eventsHandled = 0;
499499
@Override
500-
protected void handleEvent(JobHistoryEvent event) {
500+
public void handleEvent(JobHistoryEvent event) {
501501
this.lastEventHandled = event;
502502
this.eventsHandled++;
503503
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import static org.junit.Assert.assertTrue;
2222
import static org.junit.Assert.fail;
2323
import static org.mockito.Mockito.mock;
24-
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.times;
2526
import java.io.File;
2627
import java.io.FileNotFoundException;
2728
import java.io.IOException;
@@ -44,6 +45,10 @@
4445
import org.apache.hadoop.mapreduce.MRJobConfig;
4546
import org.apache.hadoop.mapreduce.OutputCommitter;
4647
import org.apache.hadoop.mapreduce.TypeConverter;
48+
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
49+
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
50+
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
51+
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
4752
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
4853
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
4954
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@@ -70,6 +75,8 @@
7075
import org.junit.Before;
7176
import org.junit.BeforeClass;
7277
import org.junit.Test;
78+
import org.mockito.ArgumentCaptor;
79+
import org.mockito.Mockito;
7380

7481
public class TestMRAppMaster {
7582
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
@@ -120,7 +127,7 @@ public void testMRAppMasterForDifferentUser() throws IOException,
120127
assertEquals(userStagingPath.toString(),
121128
appMaster.stagingDirPath.toString());
122129
}
123-
130+
124131
@Test
125132
public void testMRAppMasterMidLock() throws IOException,
126133
InterruptedException {
@@ -154,6 +161,9 @@ public void testMRAppMasterMidLock() throws IOException,
154161
assertTrue(appMaster.errorHappenedShutDown);
155162
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
156163
appMaster.stop();
164+
165+
// verify the final status is FAILED
166+
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
157167
}
158168

159169
@Test
@@ -190,6 +200,9 @@ public void testMRAppMasterSuccessLock() throws IOException,
190200
assertTrue(appMaster.errorHappenedShutDown);
191201
assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
192202
appMaster.stop();
203+
204+
// verify the final status is SUCCEEDED
205+
verifyFailedStatus((MRAppMasterTest)appMaster, "SUCCEEDED");
193206
}
194207

195208
@Test
@@ -226,6 +239,9 @@ public void testMRAppMasterFailLock() throws IOException,
226239
assertTrue(appMaster.errorHappenedShutDown);
227240
assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
228241
appMaster.stop();
242+
243+
// verify the final status is FAILED
244+
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
229245
}
230246

231247
@Test
@@ -423,8 +439,20 @@ public void testMRAppMasterCredentials() throws Exception {
423439

424440

425441
}
426-
}
427442

443+
private void verifyFailedStatus(MRAppMasterTest appMaster,
444+
String expectedJobState) {
445+
ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
446+
.forClass(JobHistoryEvent.class);
447+
// handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
448+
verify(appMaster.spyHistoryService, times(2))
449+
.handleEvent(captor.capture());
450+
HistoryEvent event = captor.getValue().getHistoryEvent();
451+
assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
452+
assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
453+
, expectedJobState);
454+
}
455+
}
428456
class MRAppMasterTest extends MRAppMaster {
429457

430458
Path stagingDirPath;
@@ -434,6 +462,7 @@ class MRAppMasterTest extends MRAppMaster {
434462
ContainerAllocator mockContainerAllocator;
435463
CommitterEventHandler mockCommitterEventHandler;
436464
RMHeartbeatHandler mockRMHeartbeatHandler;
465+
JobHistoryEventHandler spyHistoryService;
437466

438467
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
439468
ContainerId containerId, String host, int port, int httpPort,
@@ -502,4 +531,14 @@ public Credentials getCredentials() {
502531
public UserGroupInformation getUgi() {
503532
return currentUser;
504533
}
534+
535+
@Override
536+
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
537+
AppContext context) {
538+
spyHistoryService =
539+
Mockito.spy((JobHistoryEventHandler) super
540+
.createJobHistoryHandler(context));
541+
spyHistoryService.setForcejobCompletion(this.isLastAMRetry);
542+
return spyHistoryService;
543+
}
505544
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
@InterfaceStability.Unstable
3535
public class AMStartedEvent implements HistoryEvent {
3636
private AMStarted datum = new AMStarted();
37+
private String forcedJobStateOnShutDown;
3738

3839
/**
3940
* Create an event to record the start of an MR AppMaster
@@ -54,12 +55,38 @@ public class AMStartedEvent implements HistoryEvent {
5455
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
5556
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
5657
int nodeManagerHttpPort) {
58+
this(appAttemptId, startTime, containerId, nodeManagerHost,
59+
nodeManagerPort, nodeManagerHttpPort, null);
60+
}
61+
62+
/**
63+
* Create an event to record the start of an MR AppMaster
64+
*
65+
* @param appAttemptId
66+
* the application attempt id.
67+
* @param startTime
68+
* the start time of the AM.
69+
* @param containerId
70+
* the containerId of the AM.
71+
* @param nodeManagerHost
72+
* the node on which the AM is running.
73+
* @param nodeManagerPort
74+
* the port on which the AM is running.
75+
* @param nodeManagerHttpPort
76+
* the httpPort for the node running the AM.
77+
* @param forcedJobStateOnShutDown
78+
* the state to force the job into
79+
*/
80+
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
81+
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
82+
int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
5783
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
5884
datum.startTime = startTime;
5985
datum.containerId = new Utf8(containerId.toString());
6086
datum.nodeManagerHost = new Utf8(nodeManagerHost);
6187
datum.nodeManagerPort = nodeManagerPort;
6288
datum.nodeManagerHttpPort = nodeManagerHttpPort;
89+
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
6390
}
6491

6592
AMStartedEvent() {
@@ -116,6 +143,13 @@ public int getNodeManagerHttpPort() {
116143
return datum.nodeManagerHttpPort;
117144
}
118145

146+
/**
147+
* @return the state to force the job into
148+
*/
149+
public String getForcedJobStateOnShutDown() {
150+
return this.forcedJobStateOnShutDown;
151+
}
152+
119153
/** Get the attempt id */
120154

121155
@Override

0 commit comments

Comments
 (0)