Skip to content

Commit cfbbb81

Browse files
liufangqiMyasuka
authored andcommitted
[FLINK-25528][state-processor-api] state processor api do not support increment checkpoint
1. limit the checkpoint type in the savepoint type and full checkpoint type. 2. add the SnapshotUtils test with the full checkpoint.
1 parent c7b3b53 commit cfbbb81

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.core.execution.SavepointFormatType;
2424
import org.apache.flink.core.fs.Path;
2525
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
26+
import org.apache.flink.runtime.checkpoint.CheckpointType;
2627
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
2728
import org.apache.flink.runtime.checkpoint.SavepointType;
2829
import org.apache.flink.runtime.checkpoint.SnapshotType;
@@ -33,6 +34,7 @@
3334
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
3435
import org.apache.flink.streaming.api.operators.StreamOperator;
3536
import org.apache.flink.util.MathUtils;
37+
import org.apache.flink.util.Preconditions;
3638

3739
import java.io.IOException;
3840

@@ -57,6 +59,10 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
5759
SnapshotType snapshotType)
5860
throws Exception {
5961

62+
Preconditions.checkArgument(
63+
snapshotType.isSavepoint() || CheckpointType.FULL_CHECKPOINT.equals(snapshotType),
64+
"the snapshot type require savepoint type or full checkpoint type.");
65+
6066
CheckpointOptions options =
6167
CheckpointOptions.forConfig(
6268
snapshotType,

flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.core.fs.Path;
2323
import org.apache.flink.metrics.groups.OperatorMetricGroup;
2424
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
25+
import org.apache.flink.runtime.checkpoint.CheckpointType;
2526
import org.apache.flink.runtime.jobgraph.OperatorID;
2627
import org.apache.flink.runtime.state.CheckpointStreamFactory;
2728
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
@@ -52,6 +53,7 @@ public class SnapshotUtilsTest {
5253

5354
@Test
5455
public void testSnapshotUtilsLifecycle() throws Exception {
56+
ACTUAL_ORDER_TRACKING.clear();
5557
StreamOperator<Void> operator = new LifecycleOperator();
5658
Path path = new Path(folder.newFolder().getAbsolutePath());
5759

@@ -60,6 +62,25 @@ public void testSnapshotUtilsLifecycle() throws Exception {
6062
Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
6163
}
6264

65+
@Test
66+
public void testSnapshotUtilsLifecycleWithFullCheckpoint() throws Exception {
67+
ACTUAL_ORDER_TRACKING.clear();
68+
StreamOperator<Void> operator = new LifecycleOperator();
69+
Path path = new Path(folder.newFolder().getAbsolutePath());
70+
71+
SnapshotUtils.snapshot(
72+
operator,
73+
0,
74+
0L,
75+
true,
76+
false,
77+
new Configuration(),
78+
path,
79+
CheckpointType.FULL_CHECKPOINT);
80+
81+
Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT, ACTUAL_ORDER_TRACKING);
82+
}
83+
6384
private static class LifecycleOperator implements StreamOperator<Void> {
6485
private static final long serialVersionUID = 1L;
6586

0 commit comments

Comments
 (0)