Skip to content

Commit 563f929

Browse files
authored
[fix][broker] Fix inconsensus namespace policies by getPoliciesIfCached (apache#20855)
1 parent 69d7a2b commit 563f929

File tree

8 files changed

+155
-70
lines changed

8 files changed

+155
-70
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreExce
117117
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
118118
}
119119

120+
/**
121+
* Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache.
122+
*
123+
* @deprecated Since this method may introduce inconsistent namespace policies. we should use
124+
* #{@link NamespaceResources#getPoliciesAsync}
125+
*/
126+
@Deprecated
120127
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
121128
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));
122129
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
22+
import static java.util.Objects.requireNonNull;
2223
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
2324
import com.google.common.base.MoreObjects;
2425
import java.util.ArrayList;
@@ -40,6 +41,7 @@
4041
import java.util.concurrent.atomic.LongAdder;
4142
import java.util.concurrent.locks.ReentrantReadWriteLock;
4243
import java.util.function.ToLongFunction;
44+
import javax.annotation.Nonnull;
4345
import lombok.Getter;
4446
import org.apache.bookkeeper.mledger.util.StatsBuckets;
4547
import org.apache.commons.collections4.CollectionUtils;
@@ -61,6 +63,7 @@
6163
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
6264
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
6365
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
66+
import org.apache.pulsar.common.naming.NamespaceName;
6467
import org.apache.pulsar.common.naming.TopicName;
6568
import org.apache.pulsar.common.policies.data.BacklogQuota;
6669
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
@@ -1128,6 +1131,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
11281131
return brokerService.getBrokerPublishRateLimiter();
11291132
}
11301133

1134+
/**
1135+
* @deprecated Avoid using the deprecated method
1136+
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use
1137+
* #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it.
1138+
*/
1139+
@Deprecated
11311140
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
11321141
Policies policies;
11331142
try {
@@ -1141,17 +1150,20 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
11411150
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
11421151
policies = new Policies();
11431152
}
1153+
updateResourceGroupLimiter(policies);
1154+
}
11441155

1156+
public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
1157+
requireNonNull(namespacePolicies);
11451158
// attach the resource-group level rate limiters, if set
1146-
String rgName = policies.resource_group_name;
1159+
String rgName = namespacePolicies.resource_group_name;
11471160
if (rgName != null) {
11481161
final ResourceGroup resourceGroup =
1149-
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
1162+
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
11501163
if (resourceGroup != null) {
11511164
this.resourceGroupRateLimitingEnabled = true;
11521165
this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter();
1153-
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
1154-
() -> this.enableCnxAutoRead());
1166+
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead);
11551167
log.info("Using resource group {} rate limiter for topic {}", rgName, topic);
11561168
return;
11571169
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
22+
import static java.util.Objects.requireNonNull;
2223
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
2324
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
2425
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -48,11 +49,11 @@
4849
import java.util.HashSet;
4950
import java.util.List;
5051
import java.util.Map;
51-
import java.util.Objects;
5252
import java.util.Optional;
5353
import java.util.Set;
5454
import java.util.concurrent.CancellationException;
5555
import java.util.concurrent.CompletableFuture;
56+
import java.util.concurrent.CompletionStage;
5657
import java.util.concurrent.ConcurrentLinkedQueue;
5758
import java.util.concurrent.ExecutionException;
5859
import java.util.concurrent.RejectedExecutionException;
@@ -68,6 +69,7 @@
6869
import java.util.concurrent.locks.ReentrantReadWriteLock;
6970
import java.util.function.Consumer;
7071
import java.util.function.Predicate;
72+
import javax.annotation.Nonnull;
7173
import lombok.AccessLevel;
7274
import lombok.AllArgsConstructor;
7375
import lombok.Getter;
@@ -1934,7 +1936,7 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
19341936
}
19351937

19361938
public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
1937-
Objects.requireNonNull(oldBundle);
1939+
requireNonNull(oldBundle);
19381940
try {
19391941
// retrieve all topics under existing old bundle
19401942
List<Topic> topics = getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(),
@@ -3267,10 +3269,9 @@ public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final String top
32673269
}
32683270

32693271
public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName) {
3270-
Optional<Policies> policies =
3271-
pulsar.getPulsarResources().getNamespaceResources()
3272-
.getPoliciesIfCached(topicName.getNamespaceObject());
3273-
return isAllowAutoTopicCreationAsync(topicName, policies);
3272+
return pulsar.getPulsarResources().getNamespaceResources()
3273+
.getPoliciesAsync(topicName.getNamespaceObject())
3274+
.thenCompose(policies -> isAllowAutoTopicCreationAsync(topicName, policies));
32743275
}
32753276

32763277
private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName,
@@ -3340,11 +3341,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t
33403341
return null;
33413342
}
33423343

3344+
/**
3345+
* @deprecated Avoid using the deprecated method
3346+
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
3347+
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
3348+
*/
3349+
@Deprecated
33433350
public boolean isAllowAutoSubscriptionCreation(final String topic) {
33443351
TopicName topicName = TopicName.get(topic);
33453352
return isAllowAutoSubscriptionCreation(topicName);
33463353
}
33473354

3355+
/**
3356+
* @deprecated Avoid using the deprecated method
3357+
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
3358+
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
3359+
*/
3360+
@Deprecated
33483361
public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
33493362
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
33503363
getAutoSubscriptionCreationOverride(topicName);
@@ -3355,6 +3368,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
33553368
}
33563369
}
33573370

3371+
/**
3372+
* @deprecated Avoid using the deprecated method
3373+
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
3374+
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
3375+
*/
3376+
@Deprecated
33583377
private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
33593378
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
33603379
if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
@@ -3371,6 +3390,25 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin
33713390
return null;
33723391
}
33733392

3393+
public @Nonnull CompletionStage<Boolean> isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
3394+
requireNonNull(tpName);
3395+
// topic level policies
3396+
final var topicPolicies = getTopicPolicies(tpName);
3397+
if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
3398+
return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride()
3399+
.isAllowAutoSubscriptionCreation());
3400+
}
3401+
// namespace level policies
3402+
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
3403+
.thenApply(policies -> {
3404+
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
3405+
return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
3406+
}
3407+
// broker level policies
3408+
return pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
3409+
});
3410+
}
3411+
33743412
public boolean isSystemTopic(String topic) {
33753413
return isSystemTopic(TopicName.get(topic));
33763414
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 60 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.Optional;
5656
import java.util.Set;
5757
import java.util.concurrent.CompletableFuture;
58+
import java.util.concurrent.CompletionStage;
5859
import java.util.concurrent.Semaphore;
5960
import java.util.concurrent.TimeUnit;
6061
import java.util.regex.Pattern;
@@ -1210,39 +1211,43 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
12101211
.failedFuture(new TopicNotFoundException(
12111212
"Topic " + topicName + " does not exist"));
12121213
}
1214+
final Topic topic = optTopic.get();
1215+
return service.isAllowAutoSubscriptionCreationAsync(topicName)
1216+
.thenCompose(isAllowedAutoSubscriptionCreation -> {
1217+
boolean rejectSubscriptionIfDoesNotExist = isDurable
1218+
&& !isAllowedAutoSubscriptionCreation
1219+
&& !topic.getSubscriptions().containsKey(subscriptionName)
1220+
&& topic.isPersistent();
1221+
1222+
if (rejectSubscriptionIfDoesNotExist) {
1223+
return FutureUtil
1224+
.failedFuture(
1225+
new SubscriptionNotFoundException(
1226+
"Subscription does not exist"));
1227+
}
12131228

1214-
Topic topic = optTopic.get();
1215-
1216-
boolean rejectSubscriptionIfDoesNotExist = isDurable
1217-
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
1218-
&& !topic.getSubscriptions().containsKey(subscriptionName)
1219-
&& topic.isPersistent();
1220-
1221-
if (rejectSubscriptionIfDoesNotExist) {
1222-
return FutureUtil
1223-
.failedFuture(
1224-
new SubscriptionNotFoundException(
1225-
"Subscription does not exist"));
1226-
}
1227-
1228-
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
1229-
.subscriptionName(subscriptionName)
1230-
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
1231-
.consumerName(consumerName).isDurable(isDurable)
1232-
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
1233-
.initialPosition(initialPosition)
1234-
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
1235-
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
1236-
.subscriptionProperties(subscriptionProperties)
1237-
.consumerEpoch(consumerEpoch)
1238-
.schemaType(schema == null ? null : schema.getType())
1239-
.build();
1240-
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
1241-
return topic.addSchemaIfIdleOrCheckCompatible(schema)
1242-
.thenCompose(v -> topic.subscribe(option));
1243-
} else {
1244-
return topic.subscribe(option);
1245-
}
1229+
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
1230+
.subscriptionName(subscriptionName)
1231+
.consumerId(consumerId).subType(subType)
1232+
.priorityLevel(priorityLevel)
1233+
.consumerName(consumerName).isDurable(isDurable)
1234+
.startMessageId(startMessageId).metadata(metadata)
1235+
.readCompacted(readCompacted)
1236+
.initialPosition(initialPosition)
1237+
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
1238+
.replicatedSubscriptionStateArg(isReplicated)
1239+
.keySharedMeta(keySharedMeta)
1240+
.subscriptionProperties(subscriptionProperties)
1241+
.consumerEpoch(consumerEpoch)
1242+
.schemaType(schema == null ? null : schema.getType())
1243+
.build();
1244+
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
1245+
return topic.addSchemaIfIdleOrCheckCompatible(schema)
1246+
.thenCompose(v -> topic.subscribe(option));
1247+
} else {
1248+
return topic.subscribe(option);
1249+
}
1250+
});
12461251
})
12471252
.thenAccept(consumer -> {
12481253
if (consumerFuture.complete(consumer)) {
@@ -1461,33 +1466,38 @@ protected void handleProducer(final CommandProducer cmdProducer) {
14611466

14621467
schemaVersionFuture.thenAccept(schemaVersion -> {
14631468
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
1464-
CompletableFuture<Subscription> createInitSubFuture;
1469+
CompletionStage<Subscription> createInitSubFuture;
14651470
if (!Strings.isNullOrEmpty(initialSubscriptionName)
14661471
&& topic.isPersistent()
14671472
&& !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
1468-
if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
1469-
String msg =
1470-
"Could not create the initial subscription due to the auto subscription "
1471-
+ "creation is not allowed.";
1472-
if (producerFuture.completeExceptionally(
1473-
new BrokerServiceException.NotAllowedException(msg))) {
1474-
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
1475-
remoteAddress, msg, initialSubscriptionName, topicName);
1476-
commandSender.sendErrorResponse(requestId,
1477-
ServerError.NotAllowedError, msg);
1478-
}
1479-
producers.remove(producerId, producerFuture);
1480-
return;
1481-
}
1482-
createInitSubFuture =
1483-
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
1484-
false, null);
1473+
createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName)
1474+
.thenCompose(isAllowAutoSubscriptionCreation -> {
1475+
if (!isAllowAutoSubscriptionCreation) {
1476+
return CompletableFuture.failedFuture(
1477+
new BrokerServiceException.NotAllowedException(
1478+
"Could not create the initial subscription due to"
1479+
+ " the auto subscription creation is not allowed."));
1480+
}
1481+
return topic.createSubscription(initialSubscriptionName,
1482+
InitialPosition.Earliest, false, null);
1483+
});
14851484
} else {
14861485
createInitSubFuture = CompletableFuture.completedFuture(null);
14871486
}
14881487

14891488
createInitSubFuture.whenComplete((sub, ex) -> {
14901489
if (ex != null) {
1490+
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
1491+
if (rc instanceof BrokerServiceException.NotAllowedException) {
1492+
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
1493+
remoteAddress, rc.getMessage(), initialSubscriptionName, topicName);
1494+
if (producerFuture.completeExceptionally(rc)) {
1495+
commandSender.sendErrorResponse(requestId,
1496+
ServerError.NotAllowedError, rc.getMessage());
1497+
}
1498+
producers.remove(producerId, producerFuture);
1499+
return;
1500+
}
14911501
String msg =
14921502
"Failed to create the initial subscription: " + ex.getCause().getMessage();
14931503
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",

0 commit comments

Comments
 (0)