Skip to content

Commit d62be2e

Browse files
committed
[FLINK-26015] Adds testcase covering a basic job execution in HA with object store as a backend storage
1 parent 0d0a3ac commit d62be2e

File tree

7 files changed

+410
-0
lines changed

7 files changed

+410
-0
lines changed

flink-filesystems/flink-s3-fs-base/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,36 @@ under the License.
221221
</exclusion>
222222
</exclusions>
223223
</dependency>
224+
225+
<!-- flink-runtime and curator dependencies are needed for the HAJobRunOnMinioS3StoreITCase -->
226+
<dependency>
227+
<groupId>org.apache.flink</groupId>
228+
<artifactId>flink-runtime</artifactId>
229+
<version>${project.version}</version>
230+
<scope>test</scope>
231+
</dependency>
232+
233+
<dependency>
234+
<groupId>org.apache.flink</groupId>
235+
<artifactId>flink-runtime</artifactId>
236+
<version>${project.version}</version>
237+
<type>test-jar</type>
238+
<scope>test</scope>
239+
</dependency>
240+
241+
<dependency>
242+
<groupId>org.apache.curator</groupId>
243+
<artifactId>curator-test</artifactId>
244+
<version>${curator.version}</version>
245+
<scope>test</scope>
246+
<exclusions>
247+
<!-- This transitive dependency causes version conflicts on the classpath. -->
248+
<exclusion>
249+
<groupId>com.google.guava</groupId>
250+
<artifactId>guava</artifactId>
251+
</exclusion>
252+
</exclusions>
253+
</dependency>
224254
</dependencies>
225255

226256
<build>
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.fs.s3.common;
20+
21+
import org.apache.flink.api.common.time.Deadline;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.core.testutils.AllCallbackWrapper;
24+
import org.apache.flink.core.testutils.TestContainerExtension;
25+
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
26+
import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase;
27+
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
28+
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
29+
import org.apache.flink.runtime.testutils.CommonTestUtils;
30+
31+
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
32+
33+
import com.amazonaws.services.s3.model.S3ObjectSummary;
34+
import org.apache.commons.lang3.StringUtils;
35+
import org.junit.jupiter.api.extension.RegisterExtension;
36+
37+
import java.time.Duration;
38+
import java.util.List;
39+
40+
import static org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not;
41+
import static org.assertj.core.api.Assertions.assertThat;
42+
43+
/**
44+
* {@code HAJobRunOnMinioS3StoreITCase} covers a job run where the HA data is stored in Minio. The
45+
* implementation verifies whether the {@code JobResult} was written into the FileSystem-backed
46+
* {@code JobResultStore}.
47+
*/
48+
public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCase {
49+
50+
private static final String CLUSTER_ID = "test-cluster";
51+
private static final String JOB_RESULT_STORE_FOLDER = "jrs";
52+
53+
@RegisterExtension
54+
private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>>
55+
MINIO_EXTENSION =
56+
new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new));
57+
58+
private static MinioTestContainer getMinioContainer() {
59+
return MINIO_EXTENSION.getCustomExtension().getTestContainer();
60+
}
61+
62+
private static String createS3URIWithSubPath(String... subfolders) {
63+
return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders);
64+
}
65+
66+
private static List<S3ObjectSummary> getObjectsFromJobResultStore() {
67+
return getMinioContainer()
68+
.getClient()
69+
.listObjects(
70+
getMinioContainer().getDefaultBucketName(),
71+
createSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER))
72+
.getObjectSummaries();
73+
}
74+
75+
private static String createSubPath(String... subfolders) {
76+
final String pathSeparator = "/";
77+
return pathSeparator + StringUtils.join(subfolders, pathSeparator);
78+
}
79+
80+
@Override
81+
protected String getHAStoragePath() {
82+
return createS3URIWithSubPath(CLUSTER_ID);
83+
}
84+
85+
@Override
86+
protected Configuration createConfiguration() {
87+
final Configuration config = new Configuration();
88+
89+
getMinioContainer().setS3ConfigOptions(config);
90+
91+
// JobResultStore configuration
92+
config.set(JobResultStoreOptions.DELETE_ON_COMMIT, Boolean.FALSE);
93+
config.set(
94+
JobResultStoreOptions.STORAGE_PATH,
95+
createS3URIWithSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER));
96+
97+
return config;
98+
}
99+
100+
@Override
101+
protected void runAfterJobTermination() throws Exception {
102+
CommonTestUtils.waitUntilCondition(
103+
() -> {
104+
final List<S3ObjectSummary> objects = getObjectsFromJobResultStore();
105+
return objects.stream()
106+
.map(S3ObjectSummary::getKey)
107+
.anyMatch(
108+
FileSystemJobResultStore
109+
::hasValidJobResultStoreEntryExtension)
110+
&& objects.stream()
111+
.map(S3ObjectSummary::getKey)
112+
.noneMatch(
113+
FileSystemJobResultStore
114+
::hasValidDirtyJobResultStoreEntryExtension);
115+
},
116+
Deadline.fromNow(Duration.ofSeconds(60)),
117+
2000L,
118+
"Wait for the JobResult being written to the JobResultStore.");
119+
120+
final S3ObjectSummary objRef = Iterables.getOnlyElement(getObjectsFromJobResultStore());
121+
assertThat(objRef.getKey())
122+
.matches(FileSystemJobResultStore::hasValidJobResultStoreEntryExtension)
123+
.matches(not(FileSystemJobResultStore::hasValidDirtyJobResultStoreEntryExtension));
124+
125+
final String objContent =
126+
getMinioContainer()
127+
.getClient()
128+
.getObjectAsString(objRef.getBucketName(), objRef.getKey());
129+
assertThat(objContent).contains(ApplicationStatus.SUCCEEDED.name());
130+
}
131+
}

flink-filesystems/flink-s3-fs-hadoop/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,44 @@ under the License.
9292
<scope>test</scope>
9393
<type>test-jar</type>
9494
</dependency>
95+
96+
<!-- for the HAJobRunOnHadoopS3FileSystemITCase -->
97+
<dependency>
98+
<groupId>org.apache.flink</groupId>
99+
<artifactId>flink-s3-fs-base</artifactId>
100+
<version>${project.version}</version>
101+
<type>test-jar</type>
102+
<scope>test</scope>
103+
</dependency>
104+
105+
<dependency>
106+
<groupId>org.apache.flink</groupId>
107+
<artifactId>flink-runtime</artifactId>
108+
<version>${project.version}</version>
109+
<type>test-jar</type>
110+
<scope>test</scope>
111+
</dependency>
112+
113+
<dependency>
114+
<groupId>org.apache.flink</groupId>
115+
<artifactId>flink-runtime</artifactId>
116+
<version>${project.version}</version>
117+
<scope>test</scope>
118+
</dependency>
119+
120+
<dependency>
121+
<groupId>org.apache.curator</groupId>
122+
<artifactId>curator-test</artifactId>
123+
<version>${curator.version}</version>
124+
<scope>test</scope>
125+
<exclusions>
126+
<!-- This transitive dependency causes version conflicts on the classpath. -->
127+
<exclusion>
128+
<groupId>com.google.guava</groupId>
129+
<artifactId>guava</artifactId>
130+
</exclusion>
131+
</exclusions>
132+
</dependency>
95133
</dependencies>
96134

97135
<build>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.fs.s3hadoop;
20+
21+
import org.apache.flink.fs.s3.common.HAJobRunOnMinioS3StoreITCase;
22+
23+
/** Runs the {@link HAJobRunOnMinioS3StoreITCase} on the Hadoop S3 file system. */
24+
public class HAJobRunOnHadoopS3FileSystemITCase extends HAJobRunOnMinioS3StoreITCase {}

flink-filesystems/flink-s3-fs-presto/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,44 @@ under the License.
4646
<scope>provided</scope>
4747
</dependency>
4848

49+
<!-- for the HAJobRunOnPrestoS3FileSystemITCase -->
50+
<dependency>
51+
<groupId>org.apache.flink</groupId>
52+
<artifactId>flink-runtime</artifactId>
53+
<version>${project.version}</version>
54+
<scope>test</scope>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-runtime</artifactId>
60+
<version>${project.version}</version>
61+
<type>test-jar</type>
62+
<scope>test</scope>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.apache.flink</groupId>
67+
<artifactId>flink-s3-fs-base</artifactId>
68+
<version>${project.version}</version>
69+
<type>test-jar</type>
70+
<scope>test</scope>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.apache.curator</groupId>
75+
<artifactId>curator-test</artifactId>
76+
<version>${curator.version}</version>
77+
<scope>test</scope>
78+
<exclusions>
79+
<!-- This transitive dependency causes version conflicts on the classpath. -->
80+
<exclusion>
81+
<groupId>com.google.guava</groupId>
82+
<artifactId>guava</artifactId>
83+
</exclusion>
84+
</exclusions>
85+
</dependency>
86+
4987
<!-- S3 base (bundled) -->
5088
<dependency>
5189
<groupId>org.apache.flink</groupId>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.fs.s3presto;
20+
21+
import org.apache.flink.fs.s3.common.HAJobRunOnMinioS3StoreITCase;
22+
23+
/** Runs the {@link HAJobRunOnMinioS3StoreITCase} on the Presto S3 file system. */
24+
public class HAJobRunOnPrestoS3FileSystemITCase extends HAJobRunOnMinioS3StoreITCase {}

0 commit comments

Comments
 (0)