Skip to content

Commit f9af424

Browse files
authored
[fix] [broker] Incorrect service name selection logic (apache#19505)
When calling the method `PulsarWebResource.getRedirectionUrl`, reuse the same `PulsarServiceNameResolver` instance.
1 parent ca0b25e commit f9af424

File tree

3 files changed

+30
-9
lines changed

3 files changed

+30
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static java.util.concurrent.TimeUnit.SECONDS;
2323
import static org.apache.commons.lang3.StringUtils.isBlank;
24+
import com.github.benmanes.caffeine.cache.CacheLoader;
25+
import com.github.benmanes.caffeine.cache.Caffeine;
26+
import com.github.benmanes.caffeine.cache.LoadingCache;
2427
import com.google.common.collect.BoundType;
2528
import com.google.common.collect.Range;
2629
import com.google.common.collect.Sets;
2730
import java.net.MalformedURLException;
2831
import java.net.URI;
2932
import java.net.URL;
33+
import java.time.Duration;
3034
import java.util.ArrayList;
3135
import java.util.List;
3236
import java.util.Optional;
@@ -86,6 +90,8 @@
8690
import org.apache.pulsar.common.policies.path.PolicyPath;
8791
import org.apache.pulsar.common.util.FutureUtil;
8892
import org.apache.pulsar.metadata.api.MetadataStoreException;
93+
import org.checkerframework.checker.nullness.qual.NonNull;
94+
import org.checkerframework.checker.nullness.qual.Nullable;
8995
import org.slf4j.Logger;
9096
import org.slf4j.LoggerFactory;
9197

@@ -96,6 +102,17 @@ public abstract class PulsarWebResource {
96102

97103
private static final Logger log = LoggerFactory.getLogger(PulsarWebResource.class);
98104

105+
private static final LoadingCache<String, PulsarServiceNameResolver> SERVICE_NAME_RESOLVER_CACHE =
106+
Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(5)).build(
107+
new CacheLoader<>() {
108+
@Override
109+
public @Nullable PulsarServiceNameResolver load(@NonNull String serviceUrl) throws Exception {
110+
PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver();
111+
serviceNameResolver.updateServiceUrl(serviceUrl);
112+
return serviceNameResolver;
113+
}
114+
});
115+
99116
static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
100117

101118
@Context
@@ -476,17 +493,21 @@ protected void validateClusterOwnership(String cluster) throws WebApplicationExc
476493

477494
private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException {
478495
try {
479-
PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver();
496+
PulsarServiceNameResolver serviceNameResolver;
480497
if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
481498
&& StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
482-
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
499+
serviceNameResolver = SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrlTls());
483500
} else {
484-
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
501+
serviceNameResolver = SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrl());
485502
}
486503
URL webUrl = new URL(serviceNameResolver.resolveHostUri().toString());
487504
return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
488-
} catch (PulsarClientException.InvalidServiceURL exception) {
489-
throw new MalformedURLException(exception.getMessage());
505+
} catch (Exception exception) {
506+
if (exception.getCause() != null
507+
&& exception.getCause() instanceof PulsarClientException.InvalidServiceURL) {
508+
throw new MalformedURLException(exception.getMessage());
509+
}
510+
throw exception;
490511
}
491512
}
492513

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ public void activeBrokerParse() throws Exception {
233233
pulsar1.getConfiguration().setAuthorizationEnabled(true);
234234
//init clusterData
235235

236-
String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", pulsar2.getWebServiceAddress());
236+
String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678,localhost:5677,localhost:5676",
237+
pulsar2.getWebServiceAddress());
237238
ClusterData cluster2Data = ClusterData.builder().serviceUrl(cluster2ServiceUrls).build();
238239
String cluster2 = "activeCLuster2";
239240
admin2.clusters().createCluster(cluster2, cluster2Data);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ public InetSocketAddress resolveHost() {
5252
if (list.size() == 1) {
5353
return list.get(0);
5454
} else {
55-
CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) % list.size());
56-
return list.get(currentIndex);
57-
55+
int originalIndex = CURRENT_INDEX_UPDATER.getAndUpdate(this, last -> (last + 1) % list.size());
56+
return list.get((originalIndex + 1) % list.size());
5857
}
5958
}
6059

0 commit comments

Comments
 (0)