Skip to content

Commit 3116abf

Browse files
authored
[fix][broker] Fix get topic policies as null during clean cache (apache#20763)
1 parent 7577956 commit 3116abf

File tree

3 files changed

+99
-7
lines changed

3 files changed

+99
-7
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import javax.annotation.Nonnull;
33+
import org.apache.commons.lang3.tuple.MutablePair;
3334
import org.apache.pulsar.broker.PulsarServerException;
3435
import org.apache.pulsar.broker.PulsarService;
3536
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -220,12 +221,25 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
220221
NamespaceName namespace = topicName.getNamespaceObject();
221222
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
222223
}
223-
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
224-
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
225-
throw new TopicPoliciesCacheNotInitException();
224+
225+
MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
226+
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
227+
if (initialized == null || !initialized) {
228+
result.setLeft(new TopicPoliciesCacheNotInitException());
229+
} else {
230+
TopicPolicies topicPolicies =
231+
isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
232+
: policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
233+
result.setRight(topicPolicies);
234+
}
235+
return initialized;
236+
});
237+
238+
if (result.getLeft() != null) {
239+
throw result.getLeft();
240+
} else {
241+
return result.getRight();
226242
}
227-
return isGlobal ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
228-
: policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
229243
}
230244

231245
@Override
@@ -388,7 +402,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
388402

389403
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
390404
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
391-
policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
405+
392406
if (cleanOwnedBundlesCount) {
393407
ownedBundlesCountPerNamespace.remove(namespace);
394408
}
@@ -399,7 +413,11 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
399413
return null;
400414
});
401415
}
402-
policyCacheInitMap.remove(namespace);
416+
417+
policyCacheInitMap.compute(namespace, (k, v) -> {
418+
policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
419+
return null;
420+
});
403421
}
404422

405423
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3015,6 +3015,7 @@ public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
30153015
});
30163016
}
30173017
}
3018+
30183019
@Test
30193020
public void testGlobalTopicPolicies() throws Exception {
30203021
final String topic = testTopic + UUID.randomUUID();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@
3434
import java.util.Set;
3535
import java.util.UUID;
3636
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.ConcurrentHashMap;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.concurrent.Executors;
3940
import java.util.concurrent.ScheduledExecutorService;
4041
import java.util.concurrent.TimeUnit;
42+
import lombok.extern.slf4j.Slf4j;
43+
import org.apache.commons.lang3.reflect.FieldUtils;
4144
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4245
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
4346
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
@@ -54,13 +57,16 @@
5457
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
5558
import org.apache.pulsar.common.policies.data.TopicPolicies;
5659
import org.apache.pulsar.common.util.FutureUtil;
60+
import org.assertj.core.api.Assertions;
5761
import org.awaitility.Awaitility;
62+
import org.mockito.Mockito;
5863
import org.testng.Assert;
5964
import org.testng.annotations.AfterMethod;
6065
import org.testng.annotations.BeforeMethod;
6166
import org.testng.annotations.Test;
6267

6368
@Test(groups = "broker")
69+
@Slf4j
6470
public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
6571

6672
private static final String NAMESPACE1 = "system-topic/namespace-1";
@@ -384,4 +390,71 @@ public void testHandleNamespaceBeingDeleted() throws Exception {
384390
});
385391
service.deleteTopicPoliciesAsync(TOPIC1).get();
386392
}
393+
394+
@Test
395+
public void testGetTopicPoliciesWithCleanCache() throws Exception {
396+
final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID();
397+
pulsarClient.newProducer().topic(topic).create().close();
398+
399+
SystemTopicBasedTopicPoliciesService topicPoliciesService =
400+
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
401+
402+
ConcurrentHashMap<TopicName, TopicPolicies> spyPoliciesCache = spy(new ConcurrentHashMap<TopicName, TopicPolicies>());
403+
FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true);
404+
405+
Awaitility.await().untilAsserted(() -> {
406+
Assertions.assertThat(topicPoliciesService.getTopicPolicies(TopicName.get(topic))).isNull();
407+
});
408+
409+
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
410+
Awaitility.await().untilAsserted(() -> {
411+
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
412+
});
413+
414+
Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers =
415+
(Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>)
416+
FieldUtils.readDeclaredField(topicPoliciesService, "readerCaches", true);
417+
418+
Mockito.doAnswer(invocation -> {
419+
Thread.sleep(1000);
420+
return invocation.callRealMethod();
421+
}).when(spyPoliciesCache).get(Mockito.any());
422+
423+
CompletableFuture<Void> result = new CompletableFuture<>();
424+
Thread thread = new Thread(() -> {
425+
TopicPolicies topicPolicies;
426+
for (int i = 0; i < 10; i++) {
427+
try {
428+
topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic));
429+
Assert.assertNotNull(topicPolicies);
430+
Thread.sleep(500);
431+
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
432+
log.warn("topic policies cache not init, retry...");
433+
} catch (Throwable e) {
434+
log.error("ops: ", e);
435+
result.completeExceptionally(e);
436+
return;
437+
}
438+
}
439+
result.complete(null);
440+
});
441+
442+
Thread thread2 = new Thread(() -> {
443+
for (int i = 0; i < 10; i++) {
444+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
445+
readers.get(TopicName.get(topic).getNamespaceObject());
446+
if (readerCompletableFuture != null) {
447+
readerCompletableFuture.join().closeAsync().join();
448+
}
449+
}
450+
});
451+
452+
thread.start();
453+
thread2.start();
454+
455+
thread.join();
456+
thread2.join();
457+
458+
result.join();
459+
}
387460
}

0 commit comments

Comments
 (0)