1919package org .apache .flink .runtime .leaderretrieval ;
2020
2121import org .apache .flink .api .common .time .Deadline ;
22- import org .apache .flink .configuration .Configuration ;
23- import org .apache .flink .configuration .HighAvailabilityOptions ;
24- import org .apache .flink .runtime .highavailability .zookeeper .CuratorFrameworkWithUnhandledErrorListener ;
22+ import org .apache .flink .core .testutils .EachCallbackWrapper ;
2523import org .apache .flink .runtime .leaderelection .LeaderInformation ;
2624import org .apache .flink .runtime .testutils .CommonTestUtils ;
27- import org .apache .flink .runtime .util .TestingFatalErrorHandlerResource ;
25+ import org .apache .flink .runtime .util .TestingFatalErrorHandlerExtension ;
2826import org .apache .flink .runtime .util .ZooKeeperUtils ;
27+ import org .apache .flink .runtime .zookeeper .ZooKeeperExtension ;
2928import org .apache .flink .util .Preconditions ;
30- import org .apache .flink .util .TestLogger ;
29+ import org .apache .flink .util .TestLoggerExtension ;
3130
3231import org .apache .flink .shaded .curator5 .org .apache .curator .framework .CuratorFramework ;
3332
34- import org .apache .curator .test .TestingServer ;
35- import org .junit .After ;
36- import org .junit .Before ;
37- import org .junit .Rule ;
38- import org .junit .Test ;
33+ import org .junit .jupiter .api .BeforeEach ;
34+ import org .junit .jupiter .api .Test ;
35+ import org .junit .jupiter .api .extension .ExtendWith ;
36+ import org .junit .jupiter .api .extension .RegisterExtension ;
3937
4038import javax .annotation .Nullable ;
4139
4947import java .util .concurrent .CompletableFuture ;
5048import java .util .concurrent .TimeUnit ;
5149
52- import static org .hamcrest .CoreMatchers .is ;
53- import static org .hamcrest .CoreMatchers .notNullValue ;
54- import static org .hamcrest .CoreMatchers .nullValue ;
55- import static org .hamcrest .MatcherAssert .assertThat ;
50+ import static org .assertj .core .api .Assertions .assertThat ;
5651
5752/**
5853 * Tests for the error handling in case of a suspended connection to the ZooKeeper instance when
5954 * retrieving the leader information.
6055 */
61- public class ZooKeeperLeaderRetrievalConnectionHandlingTest extends TestLogger {
56+ @ ExtendWith (TestLoggerExtension .class )
57+ class ZooKeeperLeaderRetrievalConnectionHandlingTest {
6258
63- private TestingServer testingServer ;
59+ @ RegisterExtension
60+ private final TestingFatalErrorHandlerExtension fatalErrorHandlerResource =
61+ new TestingFatalErrorHandlerExtension ();
6462
65- private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper ;
63+ @ RegisterExtension
64+ private final EachCallbackWrapper <ZooKeeperExtension > zooKeeperExtension =
65+ new EachCallbackWrapper <>(new ZooKeeperExtension ());
6666
67- private CuratorFramework zooKeeperClient ;
67+ @ Nullable private CuratorFramework zooKeeperClient ;
6868
69- @ Rule
70- public final TestingFatalErrorHandlerResource fatalErrorHandlerResource =
71- new TestingFatalErrorHandlerResource ();
72-
73- @ Before
69+ @ BeforeEach
7470 public void before () throws Exception {
75- testingServer = new TestingServer ();
76-
77- final Configuration config = new Configuration ();
78- config .setString (
79- HighAvailabilityOptions .HA_ZOOKEEPER_QUORUM , testingServer .getConnectString ());
80-
81- curatorFrameworkWrapper =
82- ZooKeeperUtils .startCuratorFramework (
83- config , fatalErrorHandlerResource .getFatalErrorHandler ());
84- zooKeeperClient = curatorFrameworkWrapper .asCuratorFramework ();
71+ zooKeeperClient =
72+ zooKeeperExtension
73+ .getCustomExtension ()
74+ .getZooKeeperClient (
75+ fatalErrorHandlerResource .getTestingFatalErrorHandler ());
8576 zooKeeperClient .blockUntilConnected ();
8677 }
8778
88- @ After
89- public void after () throws Exception {
90- closeTestServer ();
91-
92- if (curatorFrameworkWrapper != null ) {
93- curatorFrameworkWrapper .close ();
94- curatorFrameworkWrapper = null ;
95- }
79+ private ZooKeeperExtension getZooKeeper () {
80+ return zooKeeperExtension .getCustomExtension ();
9681 }
9782
9883 @ Test
@@ -106,26 +91,25 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti
10691 ZooKeeperUtils .createLeaderRetrievalDriverFactory (zooKeeperClient )
10792 .createLeaderRetrievalDriver (
10893 queueLeaderElectionListener ,
109- fatalErrorHandlerResource .getFatalErrorHandler ());
94+ fatalErrorHandlerResource .getTestingFatalErrorHandler ());
11095
11196 // do the testing
11297 final CompletableFuture <String > firstAddress =
11398 queueLeaderElectionListener .next (Duration .ofMillis (50 ));
114- assertThat (
115- "No results are expected, yet, since no leader was elected." ,
116- firstAddress ,
117- is (nullValue ()));
99+ assertThat (firstAddress )
100+ .as ("No results are expected, yet, since no leader was elected." )
101+ .isNull ();
118102
119- restartTestServer ();
103+ getZooKeeper (). restart ();
120104
121105 // QueueLeaderElectionListener will be notified with an empty leader when ZK connection
122106 // is suspended
123107 final CompletableFuture <String > secondAddress = queueLeaderElectionListener .next ();
124- assertThat ("The next result must not be missing." , secondAddress , is ( notNullValue ()));
125- assertThat (
126- "The next result is expected to be null." ,
127- secondAddress . get (),
128- is ( nullValue ()) );
108+ assertThat (secondAddress )
109+ . as ( "The next result must not be missing." )
110+ . isNotNull ()
111+ . as ( "The next result is expected to be null." )
112+ . isCompletedWithValue ( null );
129113 } finally {
130114 if (leaderRetrievalDriver != null ) {
131115 leaderRetrievalDriver .close ();
@@ -149,7 +133,7 @@ public void testConnectionSuspendedHandling() throws Exception {
149133 queueLeaderElectionListener ,
150134 ZooKeeperLeaderRetrievalDriver .LeaderInformationClearancePolicy
151135 .ON_SUSPENDED_CONNECTION ,
152- fatalErrorHandlerResource .getFatalErrorHandler ());
136+ fatalErrorHandlerResource .getTestingFatalErrorHandler ());
153137
154138 writeLeaderInformationToZooKeeper (
155139 leaderRetrievalDriver .getConnectionInformationPath (),
@@ -158,19 +142,18 @@ public void testConnectionSuspendedHandling() throws Exception {
158142
159143 // do the testing
160144 CompletableFuture <String > firstAddress = queueLeaderElectionListener .next ();
161- assertThat (
162- "The first result is expected to be the initially set leader address." ,
163- firstAddress .get (),
164- is (leaderAddress ));
145+ assertThat (firstAddress )
146+ .as ("The first result is expected to be the initially set leader address." )
147+ .isCompletedWithValue (leaderAddress );
165148
166- restartTestServer ();
149+ getZooKeeper (). restart ();
167150
168151 CompletableFuture <String > secondAddress = queueLeaderElectionListener .next ();
169- assertThat ("The next result must not be missing." , secondAddress , is ( notNullValue ()));
170- assertThat (
171- "The next result is expected to be null." ,
172- secondAddress . get (),
173- is ( nullValue ()) );
152+ assertThat (secondAddress )
153+ . as ( "The next result must not be missing." )
154+ . isNotNull ()
155+ . as ( "The next result is expected to be null." )
156+ . isCompletedWithValue ( null );
174157 } finally {
175158 if (leaderRetrievalDriver != null ) {
176159 leaderRetrievalDriver .close ();
@@ -195,7 +178,7 @@ public void testSuspendedConnectionDoesNotClearLeaderInformationIfClearanceOnLos
195178 queueLeaderElectionListener ,
196179 ZooKeeperLeaderRetrievalDriver .LeaderInformationClearancePolicy
197180 .ON_LOST_CONNECTION ,
198- fatalErrorHandlerResource .getFatalErrorHandler ());
181+ fatalErrorHandlerResource .getTestingFatalErrorHandler ());
199182
200183 writeLeaderInformationToZooKeeper (
201184 leaderRetrievalDriver .getConnectionInformationPath (),
@@ -204,15 +187,14 @@ public void testSuspendedConnectionDoesNotClearLeaderInformationIfClearanceOnLos
204187
205188 // do the testing
206189 CompletableFuture <String > firstAddress = queueLeaderElectionListener .next ();
207- assertThat (
208- "The first result is expected to be the initially set leader address." ,
209- firstAddress .get (),
210- is (leaderAddress ));
190+ assertThat (firstAddress )
191+ .as ("The first result is expected to be the initially set leader address." )
192+ .isCompletedWithValue (leaderAddress );
211193
212- closeTestServer ();
194+ getZooKeeper (). close ();
213195
214196 // make sure that no new leader information is published
215- assertThat (queueLeaderElectionListener .next (Duration .ofMillis (100L )), is ( nullValue ()) );
197+ assertThat (queueLeaderElectionListener .next (Duration .ofMillis (100L ))). isNull ( );
216198 } finally {
217199 if (leaderRetrievalDriver != null ) {
218200 leaderRetrievalDriver .close ();
@@ -234,7 +216,7 @@ public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Ex
234216 queueLeaderElectionListener ,
235217 ZooKeeperLeaderRetrievalDriver .LeaderInformationClearancePolicy
236218 .ON_SUSPENDED_CONNECTION ,
237- fatalErrorHandlerResource .getFatalErrorHandler ());
219+ fatalErrorHandlerResource .getTestingFatalErrorHandler ());
238220
239221 final String leaderAddress = "foobar" ;
240222 final UUID sessionId = UUID .randomUUID ();
@@ -244,20 +226,20 @@ public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Ex
244226 // pop new leader
245227 queueLeaderElectionListener .next ();
246228
247- testingServer .stop ();
229+ getZooKeeper () .stop ();
248230
249231 final CompletableFuture <String > connectionSuspension =
250232 queueLeaderElectionListener .next ();
251233
252234 // wait until the ZK connection is suspended
253235 connectionSuspension .join ();
254236
255- testingServer .restart ();
237+ getZooKeeper () .restart ();
256238
257239 // new old leader information should be announced
258240 final CompletableFuture <String > connectionReconnect =
259241 queueLeaderElectionListener .next ();
260- assertThat (connectionReconnect . get (), is ( leaderAddress ) );
242+ assertThat (connectionReconnect ). isCompletedWithValue ( leaderAddress );
261243 } finally {
262244 if (leaderRetrievalDriver != null ) {
263245 leaderRetrievalDriver .close ();
@@ -304,7 +286,7 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc
304286 queueLeaderElectionListener ,
305287 ZooKeeperLeaderRetrievalDriver .LeaderInformationClearancePolicy
306288 .ON_SUSPENDED_CONNECTION ,
307- fatalErrorHandlerResource .getFatalErrorHandler ());
289+ fatalErrorHandlerResource .getTestingFatalErrorHandler ());
308290
309291 final String leaderAddress = "foobar" ;
310292 final UUID sessionId = UUID .randomUUID ();
@@ -314,15 +296,15 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc
314296 // pop new leader
315297 queueLeaderElectionListener .next ();
316298
317- testingServer .stop ();
299+ getZooKeeper () .stop ();
318300
319301 final CompletableFuture <String > connectionSuspension =
320302 queueLeaderElectionListener .next ();
321303
322304 // wait until the ZK connection is suspended
323305 connectionSuspension .join ();
324306
325- testingServer .restart ();
307+ getZooKeeper () .restart ();
326308
327309 final String newLeaderAddress = "barfoo" ;
328310 final UUID newSessionId = UUID .randomUUID ();
@@ -346,18 +328,6 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc
346328 }
347329 }
348330
349- private void closeTestServer () throws IOException {
350- if (testingServer != null ) {
351- testingServer .close ();
352- testingServer = null ;
353- }
354- }
355-
356- private void restartTestServer () throws Exception {
357- Preconditions .checkNotNull (testingServer , "TestingServer needs to be initialized." )
358- .restart ();
359- }
360-
361331 private static class QueueLeaderElectionListener implements LeaderRetrievalEventHandler {
362332
363333 private final BlockingQueue <CompletableFuture <String >> queue ;
0 commit comments