Skip to content

Commit a4f64c2

Browse files
author
benjobs
committed
[Improve] trigger savepoint bug fixed.
1 parent 03fffda commit a4f64c2

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ private void handleSavepointResponseFuture(
334334
savepointResponse -> {
335335
if (savepointResponse != null && savepointResponse.savePointDir() != null) {
336336
applicationLog.setSuccess(true);
337-
flinkAppHttpWatcher.cleanSavepoint(application);
338337
String savePointDir = savepointResponse.savePointDir();
339338
log.info("Request savepoint successful, savepointDir: {}", savePointDir);
340339
}

streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ private void doPersistMetrics(Application application, boolean stopWatch) {
449449
} else {
450450
WATCHING_APPS.put(appId, application);
451451
}
452+
452453
StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
453454
StateChangeEvent nowEvent = StateChangeEvent.of(application);
454455
if (!nowEvent.equals(event)) {
@@ -617,8 +618,12 @@ private void cleanOptioning(OptionState optionState, Long key) {
617618
}
618619

619620
public void cleanSavepoint(Application application) {
620-
SAVEPOINT_CACHE.invalidate(application.getId());
621621
application.setOptionState(OptionState.NONE.getValue());
622+
StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
623+
if (event != null && event.getOptionState() == OptionState.SAVEPOINTING) {
624+
doPersistMetrics(application, false);
625+
}
626+
SAVEPOINT_CACHE.invalidate(application.getId());
622627
}
623628

624629
/** set current option state */
@@ -648,6 +653,13 @@ public static void addSavepoint(Long appId) {
648653
}
649654
log.info("[StreamPark][FlinkAppHttpWatcher] add app to savepoint,appId:{}", appId);
650655
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
656+
657+
// update to PREVIOUS_STATUS
658+
StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
659+
if (event != null) {
660+
event.setOptionState(OptionState.SAVEPOINTING);
661+
PREVIOUS_STATUS.put(appId, event);
662+
}
651663
}
652664

653665
public static void unWatching(Long appId) {

0 commit comments

Comments
 (0)