Skip to content

Commit 3f63768

Browse files
[fix][broker] Avoid infinite bundle unloading (apache#20822)
1 parent 563f929 commit 3f63768

File tree

2 files changed

+166
-99
lines changed

2 files changed

+166
-99
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 117 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.impl;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.collect.Multimap;
2223
import com.google.common.collect.Sets;
2324
import java.util.ArrayList;
@@ -659,11 +660,24 @@ public synchronized void doLoadShedding() {
659660
if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
660661
return;
661662
}
663+
NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle);
664+
Optional<String> destBroker = this.selectBroker(bundleToUnload);
665+
if (!destBroker.isPresent()) {
666+
log.info("[{}] No broker available to unload bundle {} from broker {}",
667+
strategy.getClass().getSimpleName(), bundle, broker);
668+
return;
669+
}
670+
if (destBroker.get().equals(broker)) {
671+
log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}",
672+
strategy.getClass().getSimpleName(), destBroker.get(), bundle);
673+
return;
674+
}
662675

663-
log.info("[{}] Unloading bundle: {} from broker {}",
664-
strategy.getClass().getSimpleName(), bundle, broker);
676+
log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}",
677+
strategy.getClass().getSimpleName(), bundle, broker, destBroker.get());
665678
try {
666-
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
679+
pulsar.getAdminClient().namespaces()
680+
.unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get());
667681
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
668682
} catch (PulsarServerException | PulsarAdminException e) {
669683
log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
@@ -837,99 +851,119 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
837851
// If the given bundle is already in preallocated, return the selected broker.
838852
return Optional.of(preallocatedBundleToBroker.get(bundle));
839853
}
840-
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
841-
key -> getBundleDataOrDefault(bundle));
842-
brokerCandidateCache.clear();
843-
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
844-
getAvailableBrokers(),
845-
brokerTopicLoadingPredicate);
846854

847-
// filter brokers which owns topic higher than threshold
848-
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
849-
conf.getLoadBalancerBrokerMaxTopics());
850-
851-
// distribute namespaces to domain and brokers according to anti-affinity-group
852-
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(),
853-
brokerCandidateCache,
854-
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
855-
856-
// distribute bundles evenly to candidate-brokers if enable
857-
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
858-
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
859-
brokerCandidateCache,
860-
brokerToNamespaceToBundleRange);
861-
if (log.isDebugEnabled()) {
862-
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
863-
brokerCandidateCache.size());
864-
}
855+
Optional<String> broker = selectBroker(serviceUnit);
856+
if (!broker.isPresent()) {
857+
// If no broker is selected, return empty.
858+
return broker;
865859
}
866-
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
860+
// Add new bundle to preallocated.
861+
preallocateBundle(bundle, broker.get());
862+
return broker;
863+
}
864+
} finally {
865+
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
866+
}
867+
}
867868

868-
// Use the filter pipeline to finalize broker candidates.
869-
try {
870-
for (BrokerFilter filter : filterPipeline) {
871-
filter.filter(brokerCandidateCache, data, loadData, conf);
872-
}
873-
} catch (BrokerFilterException x) {
874-
// restore the list of brokers to the full set
875-
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
876-
getAvailableBrokers(),
877-
brokerTopicLoadingPredicate);
878-
}
869+
private void preallocateBundle(String bundle, String broker) {
870+
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
871+
key -> getBundleDataOrDefault(bundle));
872+
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
873+
preallocatedBundleToBroker.put(bundle, broker);
874+
875+
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
876+
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
877+
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
878+
brokerToNamespaceToBundleRange
879+
.computeIfAbsent(broker,
880+
k -> ConcurrentOpenHashMap.<String,
881+
ConcurrentOpenHashSet<String>>newBuilder()
882+
.build());
883+
synchronized (namespaceToBundleRange) {
884+
namespaceToBundleRange.computeIfAbsent(namespaceName,
885+
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
886+
.add(bundleRange);
887+
}
888+
}
879889

880-
if (brokerCandidateCache.isEmpty()) {
881-
// restore the list of brokers to the full set
882-
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
883-
getAvailableBrokers(),
884-
brokerTopicLoadingPredicate);
885-
}
890+
@VisibleForTesting
891+
Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
892+
synchronized (brokerCandidateCache) {
893+
final String bundle = serviceUnit.toString();
894+
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
895+
key -> getBundleDataOrDefault(bundle));
896+
brokerCandidateCache.clear();
897+
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
898+
getAvailableBrokers(),
899+
brokerTopicLoadingPredicate);
900+
901+
// filter brokers which owns topic higher than threshold
902+
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
903+
conf.getLoadBalancerBrokerMaxTopics());
886904

887-
// Choose a broker among the potentially smaller filtered list, when possible
888-
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
905+
// distribute namespaces to domain and brokers according to anti-affinity-group
906+
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, bundle,
907+
brokerCandidateCache,
908+
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
909+
910+
// distribute bundles evenly to candidate-brokers if enable
911+
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
912+
LoadManagerShared.removeMostServicingBrokersForNamespace(bundle,
913+
brokerCandidateCache,
914+
brokerToNamespaceToBundleRange);
889915
if (log.isDebugEnabled()) {
890-
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
916+
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
917+
brokerCandidateCache.size());
891918
}
919+
}
892920

893-
if (!broker.isPresent()) {
894-
// No brokers available
895-
return broker;
896-
}
921+
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
897922

898-
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
899-
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
900-
if (maxUsage > overloadThreshold) {
901-
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
902-
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
903-
getAvailableBrokers(),
904-
brokerTopicLoadingPredicate);
905-
Optional<String> brokerTmp =
906-
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
907-
if (brokerTmp.isPresent()) {
908-
broker = brokerTmp;
909-
}
923+
// Use the filter pipeline to finalize broker candidates.
924+
try {
925+
for (BrokerFilter filter : filterPipeline) {
926+
filter.filter(brokerCandidateCache, data, loadData, conf);
910927
}
928+
} catch (BrokerFilterException x) {
929+
// restore the list of brokers to the full set
930+
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
931+
getAvailableBrokers(),
932+
brokerTopicLoadingPredicate);
933+
}
911934

912-
// Add new bundle to preallocated.
913-
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
914-
preallocatedBundleToBroker.put(bundle, broker.get());
915-
916-
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
917-
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
918-
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
919-
brokerToNamespaceToBundleRange
920-
.computeIfAbsent(broker.get(),
921-
k -> ConcurrentOpenHashMap.<String,
922-
ConcurrentOpenHashSet<String>>newBuilder()
923-
.build());
924-
synchronized (namespaceToBundleRange) {
925-
namespaceToBundleRange.computeIfAbsent(namespaceName,
926-
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
927-
.add(bundleRange);
928-
}
935+
if (brokerCandidateCache.isEmpty()) {
936+
// restore the list of brokers to the full set
937+
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
938+
getAvailableBrokers(),
939+
brokerTopicLoadingPredicate);
940+
}
941+
942+
// Choose a broker among the potentially smaller filtered list, when possible
943+
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
944+
if (log.isDebugEnabled()) {
945+
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
946+
}
947+
948+
if (!broker.isPresent()) {
949+
// No brokers available
929950
return broker;
930951
}
931-
} finally {
932-
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
952+
953+
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
954+
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
955+
if (maxUsage > overloadThreshold) {
956+
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
957+
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
958+
getAvailableBrokers(),
959+
brokerTopicLoadingPredicate);
960+
Optional<String> brokerTmp =
961+
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
962+
if (brokerTmp.isPresent()) {
963+
broker = brokerTmp;
964+
}
965+
}
966+
return broker;
933967
}
934968
}
935969

0 commit comments

Comments
 (0)