Skip to content

Commit 106a45e

Browse files
committed
[FLINK-26583][runtime] Adds log message for when a Job is submitted that is already marked as cleaned
1 parent 208d7dd commit 106a45e

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989

9090
import java.io.IOException;
9191
import java.util.ArrayList;
92+
import java.util.Arrays;
9293
import java.util.Collection;
9394
import java.util.Collections;
9495
import java.util.HashMap;
@@ -428,6 +429,17 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
428429

429430
try {
430431
if (isDuplicateJob(jobGraph.getJobID())) {
432+
if (isInGloballyTerminalState(jobGraph.getJobID())) {
433+
log.warn(
434+
"Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.",
435+
jobGraph.getName(),
436+
jobGraph.getJobID(),
437+
Arrays.stream(JobStatus.values())
438+
.filter(JobStatus::isGloballyTerminalState)
439+
.map(JobStatus::name)
440+
.collect(Collectors.joining(", ")));
441+
}
442+
431443
final DuplicateJobSubmissionException exception =
432444
isInGloballyTerminalState(jobGraph.getJobID())
433445
? DuplicateJobSubmissionException.ofGloballyTerminated(

flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,26 @@ public void testJobSubmission() throws Exception {
207207
}
208208

209209
@Test
210-
public void testDuplicateJobSubmissionWithGloballyTerminatedJobId() throws Exception {
210+
public void testDuplicateJobSubmissionWithGloballyTerminatedButDirtyJob() throws Exception {
211211
final JobResult jobResult =
212212
TestingJobResultStore.createJobResult(
213213
jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
214214
haServices.getJobResultStore().createDirtyResult(new JobResultEntry(jobResult));
215+
assertDuplicateJobSubmission();
216+
}
217+
218+
@Test
219+
public void testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() throws Exception {
220+
final JobResult jobResult =
221+
TestingJobResultStore.createJobResult(
222+
jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
223+
haServices.getJobResultStore().createDirtyResult(new JobResultEntry(jobResult));
224+
haServices.getJobResultStore().markResultAsClean(jobGraph.getJobID());
225+
226+
assertDuplicateJobSubmission();
227+
}
228+
229+
private void assertDuplicateJobSubmission() throws Exception {
215230
dispatcher =
216231
createAndStartDispatcher(
217232
heartbeatServices,

0 commit comments

Comments
 (0)