3939import java .time .Duration ;
4040import java .util .ArrayList ;
4141import java .util .Arrays ;
42+ import java .util .Collection ;
4243import java .util .EnumSet ;
4344import java .util .HashSet ;
4445import java .util .List ;
4546import java .util .Map ;
47+ import java .util .Objects ;
4648import java .util .Optional ;
4749import java .util .Set ;
4850import java .util .UUID ;
4951import java .util .concurrent .CompletableFuture ;
5052import java .util .concurrent .ConcurrentHashMap ;
5153import java .util .concurrent .TimeUnit ;
54+ import java .util .function .Predicate ;
55+ import java .util .stream .Collectors ;
56+ import java .util .stream .Stream ;
5257import javax .ws .rs .BadRequestException ;
5358import javax .ws .rs .ClientErrorException ;
5459import javax .ws .rs .WebApplicationException ;
6166import org .apache .bookkeeper .mledger .LedgerOffloader ;
6267import org .apache .bookkeeper .mledger .ManagedLedgerConfig ;
6368import org .apache .bookkeeper .util .ZkUtils ;
69+ import org .apache .commons .lang3 .StringUtils ;
6470import org .apache .pulsar .broker .BrokerTestUtil ;
6571import org .apache .pulsar .broker .admin .v1 .Namespaces ;
6672import org .apache .pulsar .broker .admin .v1 .PersistentTopics ;
7682import org .apache .pulsar .client .admin .PulsarAdminException ;
7783import org .apache .pulsar .client .admin .PulsarAdminException .NotFoundException ;
7884import org .apache .pulsar .client .admin .internal .BaseResource ;
85+ import org .apache .pulsar .client .api .ClientBuilder ;
7986import org .apache .pulsar .client .api .Consumer ;
8087import org .apache .pulsar .client .api .ConsumerBuilder ;
8188import org .apache .pulsar .client .api .Producer ;
112119import org .mockito .Mockito ;
113120import org .slf4j .Logger ;
114121import org .slf4j .LoggerFactory ;
122+ import org .testng .annotations .AfterClass ;
115123import org .testng .annotations .AfterMethod ;
116124import org .testng .annotations .BeforeClass ;
117- import org .testng .annotations .BeforeMethod ;
118125import org .testng .annotations .Test ;
119126
120127@ Test (groups = "broker-admin" )
@@ -139,8 +146,9 @@ public NamespacesTest() {
139146 super ();
140147 }
141148
149+ @ Override
142150 @ BeforeClass
143- public void initNamespace () throws Exception {
151+ public void setup () throws Exception {
144152 testLocalNamespaces = new ArrayList <>();
145153 testGlobalNamespaces = new ArrayList <>();
146154
@@ -154,11 +162,37 @@ public void initNamespace() throws Exception {
154162 uriField = PulsarWebResource .class .getDeclaredField ("uri" );
155163 uriField .setAccessible (true );
156164 uriInfo = mock (UriInfo .class );
165+
166+ initAndStartBroker ();
157167 }
158168
159169 @ Override
160- @ BeforeMethod
161- public void setup () throws Exception {
170+ @ AfterClass (alwaysRun = true )
171+ public void cleanup () throws Exception {
172+ super .internalCleanup ();
173+ conf .setClusterName (testLocalCluster );
174+ }
175+
176+ @ AfterMethod (alwaysRun = true )
177+ public void cleanupAfterMethod () throws Exception {
178+ // cleanup.
179+ Set <String > existsNsSetAferSetup = Stream .concat (testLocalNamespaces .stream (), testGlobalNamespaces .stream ())
180+ .map (Objects ::toString ).collect (Collectors .toSet ());
181+ cleanupNamespaceByPredicate (this .testTenant , v -> !existsNsSetAferSetup .contains (v ));
182+ cleanupNamespaceByPredicate (this .testOtherTenant , v -> !existsNsSetAferSetup .contains (v ));
183+ }
184+
185+ protected void customizeNewPulsarClientBuilder (ClientBuilder clientBuilder ) {
186+ // Make method "testMaxTopicsPerNamespace" run faster.
187+ clientBuilder .operationTimeout (1 , TimeUnit .SECONDS );
188+ }
189+
190+ private void resetBroker () throws Exception {
191+ cleanup ();
192+ initAndStartBroker ();
193+ }
194+
195+ private void initAndStartBroker () throws Exception {
162196 conf .setTopicLevelPoliciesEnabled (false );
163197 conf .setSystemTopicEnabled (false );
164198 conf .setClusterName (testLocalCluster );
@@ -207,13 +241,6 @@ public void setup() throws Exception {
207241 nsSvc = pulsar .getNamespaceService ();
208242 }
209243
210- @ Override
211- @ AfterMethod (alwaysRun = true )
212- public void cleanup () throws Exception {
213- super .internalCleanup ();
214- conf .setClusterName (testLocalCluster );
215- }
216-
217244 @ Test
218245 public void testCreateNamespaces () throws Exception {
219246 try {
@@ -646,6 +673,8 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {
646673 assertEquals (e .getResponse ().getStatus (), Status .PRECONDITION_FAILED .getStatusCode ());
647674 }
648675
676+ // cleanup
677+ resetBroker ();
649678 }
650679
651680 @ Test
@@ -723,6 +752,9 @@ public boolean matches(NamespaceName nsname) {
723752 assertEquals (captor .getValue ().getResponse ().getStatus (), Status .TEMPORARY_REDIRECT .getStatusCode ());
724753 assertEquals (captor .getValue ().getResponse ().getLocation ().toString (),
725754 UriBuilder .fromUri (uri ).host ("broker-usc.com" ).port (8080 ).toString ());
755+
756+ // cleanup
757+ resetBroker ();
726758 }
727759
728760 @ Test
@@ -802,6 +834,9 @@ public void testDeleteNamespaces() throws Exception {
802834 responseCaptor = ArgumentCaptor .forClass (Response .class );
803835 verify (response , timeout (5000 ).times (1 )).resume (responseCaptor .capture ());
804836 assertEquals (responseCaptor .getValue ().getStatus (), Status .NO_CONTENT .getStatusCode ());
837+
838+ // cleanup
839+ resetBroker ();
805840 }
806841
807842 @ Test
@@ -882,6 +917,9 @@ public boolean matches(NamespaceBundle bundle) {
882917 ArgumentCaptor <Response > captor2 = ArgumentCaptor .forClass (Response .class );
883918 verify (response , timeout (5000 ).times (1 )).resume (captor2 .capture ());
884919 assertEquals (captor2 .getValue ().getStatus (), Status .NO_CONTENT .getStatusCode ());
920+
921+ // cleanup
922+ resetBroker ();
885923 }
886924
887925 @ Test
@@ -901,6 +939,9 @@ public void testUnloadNamespaces() throws Exception {
901939 ArgumentCaptor <Response > captor = ArgumentCaptor .forClass (Response .class );
902940 verify (response , timeout (5000 ).times (1 )).resume (captor .capture ());
903941 assertEquals (captor .getValue ().getStatus (), Status .NO_CONTENT .getStatusCode ());
942+
943+ // cleanup
944+ resetBroker ();
904945 }
905946
906947 @ Test
@@ -940,6 +981,9 @@ public void testSplitBundles() throws Exception {
940981 } catch (RestException re ) {
941982 assertEquals (re .getResponse ().getStatus (), Status .PRECONDITION_FAILED .getStatusCode ());
942983 }
984+
985+ // cleanup
986+ resetBroker ();
943987 }
944988
945989 @ Test
@@ -967,6 +1011,9 @@ public void testSplitBundleWithUnDividedRange() throws Exception {
9671011 "0x08375b1a_0x08375b1b" , false , false , null );
9681012 ArgumentCaptor <Response > captor = ArgumentCaptor .forClass (Response .class );
9691013 verify (response , timeout (5000 ).times (1 )).resume (any (RestException .class ));
1014+
1015+ // cleanup
1016+ resetBroker ();
9701017 }
9711018
9721019 @ Test
@@ -998,6 +1045,9 @@ public void testUnloadNamespaceWithBundles() throws Exception {
9981045 namespaces .unloadNamespaceBundle (response , testTenant , testLocalCluster , bundledNsLocal , "0x00000000_0x80000000" ,
9991046 false );
10001047 verify (response , timeout (5000 ).times (1 )).resume (any (RestException .class ));
1048+
1049+ // cleanup
1050+ resetBroker ();
10011051 }
10021052
10031053 private void createBundledTestNamespaces (String property , String cluster , String namespace , BundlesData bundle )
@@ -1061,6 +1111,9 @@ public void testRetention() throws Exception {
10611111 } catch (RestException e ) {
10621112 fail ("ValidateNamespaceOwnershipWithBundles failed" );
10631113 }
1114+
1115+ // cleanup
1116+ resetBroker ();
10641117 }
10651118
10661119 @ Test
@@ -1133,6 +1186,9 @@ public void testValidateTopicOwnership() throws Exception {
11331186 topics .validateTopicName (topicName .getTenant (), topicName .getCluster (),
11341187 topicName .getNamespacePortion (), topicName .getEncodedLocalName ());
11351188 topics .validateAdminOperationOnTopic (false );
1189+
1190+ // cleanup
1191+ resetBroker ();
11361192 }
11371193
11381194 @ Test
@@ -1566,7 +1622,7 @@ public void testRetentionPolicyValidation() throws Exception {
15661622 public void testMaxTopicsPerNamespace () throws Exception {
15671623 cleanup ();
15681624 conf .setMaxTopicsPerNamespace (15 );
1569- setup ();
1625+ initAndStartBroker ();
15701626
15711627 String namespace = BrokerTestUtil .newUniqueName ("testTenant/ns1" );
15721628 TenantInfoImpl tenantInfo = new TenantInfoImpl (Set .of ("role1" , "role2" ),
@@ -1618,7 +1674,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
16181674 conf .setMaxTopicsPerNamespace (0 );
16191675 conf .setDefaultNumPartitions (3 );
16201676 conf .setAllowAutoTopicCreationType ("partitioned" );
1621- setup ();
1677+ initAndStartBroker ();
16221678
16231679 admin .tenants ().createTenant ("testTenant" , tenantInfo );
16241680 admin .namespaces ().createNamespace (namespace , Set .of ("use" ));
@@ -1647,7 +1703,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
16471703 conf .setMaxTopicsPerNamespace (0 );
16481704 conf .setDefaultNumPartitions (1 );
16491705 conf .setAllowAutoTopicCreationType ("non-partitioned" );
1650- setup ();
1706+ initAndStartBroker ();
16511707
16521708 admin .tenants ().createTenant ("testTenant" , tenantInfo );
16531709 admin .namespaces ().createNamespace (namespace , Set .of ("use" ));
@@ -1682,9 +1738,8 @@ public void testMaxTopicsPerNamespace() throws Exception {
16821738 pulsarClient .newConsumer ().topic (topic + "_c" + i ).subscriptionName ("test_sub" ).subscribe ().close ();
16831739 }
16841740
1685- conf .setMaxTopicsPerNamespace (0 );
1686- conf .setDefaultNumPartitions (1 );
1687- conf .setAllowAutoTopicCreationType ("non-partitioned" );
1741+ // cleanup
1742+ resetBroker ();
16881743 }
16891744
16901745 private void assertInvalidRetentionPolicy (String namespace , int retentionTimeInMinutes , int retentionSizeInMB ) {
@@ -1876,6 +1931,9 @@ public void testSplitBundleForMultiTimes() throws Exception{
18761931 }
18771932 BundlesData bundles = admin .namespaces ().getBundles (namespace );
18781933 assertEquals (bundles .getNumBundles (), 14 );
1934+
1935+ // cleanup
1936+ resetBroker ();
18791937 }
18801938
18811939 @ Test
@@ -1913,5 +1971,31 @@ public void testOperationSubscriptionDispatchRate() throws Exception {
19131971 assertEquals (e .getResponse ().getStatus (), Status .NOT_FOUND .getStatusCode ());
19141972 }
19151973 }
1974+ /**
1975+ * see {@link #cleanupNamespaceByNsCollection(Collection)}
1976+ */
1977+ private void cleanupNamespaceByPredicate (String tenant , Predicate <String > predicate ) throws Exception {
1978+ cleanupNamespaceByNsCollection (admin .namespaces ().getNamespaces (tenant ).stream ()
1979+ .filter (predicate ).collect (Collectors .toSet ()));
1980+ }
1981+
1982+ /**
1983+ * Remove namespaces.
1984+ */
1985+ private void cleanupNamespaceByNsCollection (Collection <String > namespaces )
1986+ throws Exception {
1987+ if (namespaces == null ){
1988+ return ;
1989+ }
1990+ boolean forceDeleteNamespaceAllowedOriginalValue = pulsar .getConfiguration ().isForceDeleteNamespaceAllowed ();
1991+ pulsar .getConfiguration ().setForceDeleteNamespaceAllowed (true );
1992+ for (String ns : namespaces ){
1993+ if (StringUtils .isEmpty (ns )){
1994+ continue ;
1995+ }
1996+ deleteNamespaceGraceFully (ns , true );
1997+ }
1998+ pulsar .getConfiguration ().setForceDeleteNamespaceAllowed (forceDeleteNamespaceAllowedOriginalValue );
1999+ }
19162000
19172001}
0 commit comments