Skip to content

Commit de532bb

Browse files
committed
YARN-1816. Fixed ResourceManager to get RMApp correctly handle ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576911 13f79535-47bb-0310-9956-ffa450edef68
1 parent d05bfb7 commit de532bb

File tree

4 files changed

+84
-9
lines changed

4 files changed

+84
-9
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,10 @@ Release 2.4.0 - UNRELEASED
457457
and thus recover app itself synchronously and avoid races with resyncing
458458
NodeManagers. (Jian He via vinodkv)
459459

460+
YARN-1816. Fixed ResourceManager to get RMApp correctly handle
461+
ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts.
462+
(Jian He via vinodkv)
463+
460464
Release 2.3.1 - UNRELEASED
461465

462466
INCOMPATIBLE CHANGES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,14 @@ RMAppEventType.MOVE, new RMAppMoveTransition())
189189
RMAppEventType.ATTEMPT_REGISTERED)
190190
.addTransition(RMAppState.ACCEPTED,
191191
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
192-
// ACCEPTED state is possible to receive ATTEMPT_FAILED event because
193-
// RMAppRecoveredTransition is returning ACCEPTED state directly and
194-
// waiting for the previous AM to exit.
192+
// ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
193+
// event because RMAppRecoveredTransition is returning ACCEPTED state
194+
// directly and waiting for the previous AM to exit.
195195
RMAppEventType.ATTEMPT_FAILED,
196196
new AttemptFailedTransition(RMAppState.ACCEPTED))
197+
.addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
198+
RMAppEventType.ATTEMPT_FINISHED,
199+
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
197200
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
198201
RMAppEventType.KILL, new KillAttemptTransition())
199202
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
@@ -725,11 +728,7 @@ private static final class RMAppRecoveredTransition implements
725728

726729
@Override
727730
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
728-
/*
729-
* If last attempt recovered final state is null .. it means attempt was
730-
* started but AM container may or may not have started / finished.
731-
* Therefore we should wait for it to finish.
732-
*/
731+
733732
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
734733
// synchronously recover attempt to ensure any incoming external events
735734
// to be processed after the attempt processes the recover event.
@@ -744,6 +743,17 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
744743
return app.recoveredFinalState;
745744
}
746745

746+
// Last attempt is in final state, do not add to scheduler and just return
747+
// ACCEPTED waiting for last RMAppAttempt to send finished or failed event
748+
// back.
749+
if (app.currentAttempt != null
750+
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
751+
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
752+
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
753+
&& app.attempts.size() == app.maxAppAttempts))) {
754+
return RMAppState.ACCEPTED;
755+
}
756+
747757
// Notify scheduler about the app on recovery
748758
new AddApplicationToSchedulerTransition().transition(app, event);
749759

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,11 @@ private static class AttemptRecoveredTransition
872872
@Override
873873
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
874874
RMAppAttemptEvent event) {
875+
/*
876+
* If last attempt recovered final state is null .. it means attempt was
877+
* started but AM container may or may not have started / finished.
878+
* Therefore we should wait for it to finish.
879+
*/
875880
if (appAttempt.recoveredFinalState != null) {
876881
appAttempt.progress = 1.0f;
877882
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
@@ -1598,7 +1603,7 @@ private void checkAttemptStoreError(RMAppAttemptEvent event) {
15981603
ExitUtil.terminate(1, storeEvent.getStoredException());
15991604
}
16001605
}
1601-
1606+
16021607
private void storeAttempt() {
16031608
// store attempt data in a non-blocking manner to prevent dispatcher
16041609
// thread starvation and wait for state to be saved

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,63 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
601601
RMAppAttemptState.SCHEDULED);
602602
Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
603603
.getCurrentAppAttempt().getAppAttemptState());
604+
}
605+
606+
// Test RM restarts after previous attempt succeeded and was saved into state
607+
// store but before the RMAppAttempt notifies RMApp that it has succeeded. On
608+
// recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
609+
// that RMApp can recover its state.
610+
@Test
611+
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
612+
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
613+
MemoryRMStateStore memStore = new MemoryRMStateStore() {
614+
int count = 0;
615+
616+
@Override
617+
public void updateApplicationStateInternal(ApplicationId appId,
618+
ApplicationStateDataPBImpl appStateData) throws Exception {
619+
if (count == 0) {
620+
// do nothing; simulate app final state is not saved.
621+
LOG.info(appId + " final state is not saved.");
622+
count++;
623+
} else {
624+
super.updateApplicationStateInternal(appId, appStateData);
625+
}
626+
}
627+
};
628+
memStore.init(conf);
629+
RMState rmState = memStore.getState();
630+
Map<ApplicationId, ApplicationState> rmAppState =
631+
rmState.getApplicationState();
632+
633+
// start RM
634+
MockRM rm1 = new MockRM(conf, memStore);
635+
rm1.start();
636+
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
637+
RMApp app0 = rm1.submitApp(200);
638+
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
639+
640+
FinishApplicationMasterRequest req =
641+
FinishApplicationMasterRequest.newInstance(
642+
FinalApplicationStatus.SUCCEEDED, "", "");
643+
am0.unregisterAppAttempt(req, true);
644+
am0.waitForState(RMAppAttemptState.FINISHING);
645+
// app final state is not saved. This guarantees that RMApp cannot be
646+
// recovered via its own saved state, but only via the event notification
647+
// from the RMAppAttempt on recovery.
648+
Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
604649

650+
// start RM
651+
MockRM rm2 = new MockRM(conf, memStore);
652+
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
653+
rm2.start();
654+
655+
rm2.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(),
656+
RMAppAttemptState.FINISHED);
657+
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
658+
// app final state is saved via the finish event from attempt.
659+
Assert.assertEquals(RMAppState.FINISHED,
660+
rmAppState.get(app0.getApplicationId()).getState());
605661
}
606662

607663
@Test

0 commit comments

Comments
 (0)