Skip to content

[improve][broker] PIP-380: Support-setting-up-specific-namespaces-to-skipping-the-load-shedding #23549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2928,6 +2928,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The namespaces to be excluded from load shedding"
)
private Set<String> loadBalancerSheddingExcludedNamespaces = new HashSet<>();

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Time to wait before fixing any stuck in-flight service unit states. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.RoundRobinBrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.LookupOptions;
Expand Down Expand Up @@ -161,6 +162,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
@Getter
private final BrokerSelectionStrategy brokerSelectionStrategy;

private final BrokerSelectionStrategy sheddingExcludedNamespaceSelectionStrategy;

@Getter
private final List<BrokerFilter> brokerFilterPipeline;

Expand Down Expand Up @@ -254,6 +257,7 @@ public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
this.brokerSelectionStrategy = createBrokerSelectionStrategy();
this.sheddingExcludedNamespaceSelectionStrategy = new RoundRobinBrokerSelectionStrategy();
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
Expand Down Expand Up @@ -636,11 +640,33 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
return Optional.empty();
}
Set<String> candidateBrokers = availableBrokerCandidates.keySet();
return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
return getBrokerSelectionStrategy(bundle).select(candidateBrokers, bundle, context);
});
});
}

/**
* For shedding excluded namespaces, use RoundRobinBrokerSelector to assign the ownership,
* it can make the assignment more average because these will not automatically rebalance to
* another broker unless manually unloaded it.
*
* @param bundle the bundle to assign
* @return the broker selection strategy
*/
private BrokerSelectionStrategy getBrokerSelectionStrategy(ServiceUnitId bundle) {

Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();

var namespace = NamespaceBundle.getBundleNamespace(bundle.toString());
if (sheddingExcludedNamespaces.contains(namespace)) {
if (debug(conf, log)) {
log.info("Use round robin broker selector for {}", bundle);
}
return sheddingExcludedNamespaceSelectionStrategy;
}
return brokerSelectionStrategy;
}

@Override
public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
return getOwnershipAsync(topic, bundleUnit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
Expand Down Expand Up @@ -68,8 +69,10 @@ public TopKBundles(PulsarService pulsar) {
public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
arr.clear();
try {
var conf = pulsar.getConfiguration();
var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
conf.isLoadBalancerSheddingBundlesWithPoliciesEnabled();
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
var stat = etr.getValue();
Expand All @@ -79,12 +82,16 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
continue;
}
// TODO: do not filter system topic while shedding
if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
String namespace = NamespaceBundle.getBundleNamespace(bundle);
if (NamespaceService.isSystemServiceNamespace(namespace)) {
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) {
continue;
}
if (sheddingExcludedNamespaces.contains(namespace)) {
continue;
}
arr.add(etr);
}
var topKBundlesLoadData = loadData.getTopBundlesLoadData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
}

int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
for (var e : maxBrokerTopBundlesLoadData) {
String bundle = e.bundleName();
if (channel != null && !channel.isOwner(bundle, maxBroker)) {
Expand All @@ -500,6 +501,14 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
}
continue;
}
final String namespaceName = NamespaceBundle.getBundleNamespace(bundle);
if (sheddingExcludedNamespaces.contains(namespaceName)) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " Bundle namespace has been found in sheddingExcludedNamespaces", bundle));
}
continue;
}
if (recentlyUnloadedBundles.containsKey(bundle)) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;

import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* Simple Round Robin Broker Selection Strategy.
*/
public class RoundRobinBrokerSelectionStrategy implements BrokerSelectionStrategy {
private final RoundRobinBrokerSelector selector = new RoundRobinBrokerSelector();

@Override
public Optional<String> select(Set<String> brokers, ServiceUnitId bundle, LoadManagerContext context) {
return selector.selectBroker(brokers, null, null, context.brokerConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Strategy used to determine where new topics should be placed.
private ModularLoadManagerStrategy placementStrategy;

private ModularLoadManagerStrategy sheddingExcludedNamespaceSelectionStrategy;

// Policies used to determine which brokers are available for particular namespaces.
private SimpleResourceAllocationPolicies policies;

Expand Down Expand Up @@ -251,6 +253,7 @@ public void initialize(final PulsarService pulsar) {
defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;

placementStrategy = ModularLoadManagerStrategy.create(conf);
sheddingExcludedNamespaceSelectionStrategy = new RoundRobinBrokerSelector();
policies = new SimpleResourceAllocationPolicies(pulsar);
filterPipeline.add(new BrokerLoadManagerClassFilter());
filterPipeline.add(new BrokerVersionFilter());
Expand Down Expand Up @@ -630,13 +633,21 @@ public synchronized void doLoadShedding() {
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);

Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
final Multimap<String, String> bundlesToUnload = loadSheddingStrategy.findBundlesForUnloading(loadData, conf);

bundlesToUnload.asMap().forEach((broker, bundles) -> {
AtomicBoolean unloadBundleForBroker = new AtomicBoolean(false);
bundles.forEach(bundle -> {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
if (sheddingExcludedNamespaces.contains(namespaceName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping load shedding for namespace {}",
loadSheddingStrategy.getClass().getSimpleName(), namespaceName);
}
return;
}
if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)) {
return;
}
Expand Down Expand Up @@ -914,8 +925,22 @@ Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
brokerTopicLoadingPredicate);
}

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
Optional<String> broker;
// For shedding excluded namespaces, use RoundRobinBrokerSelector to assign the ownership,
// it can make the assignment more average because these will not automatically rebalance to
// another broker unless manually unloaded it.
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
String namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
if (sheddingExcludedNamespaces.contains(namespaceNameFromBundleName)) {
if (log.isDebugEnabled()) {
log.debug("Use round robin broker selector for {}", bundle);
}
broker = sheddingExcludedNamespaceSelectionStrategy
.selectBroker(brokerCandidateCache, data, loadData, conf);
} else {
// Choose a broker among the potentially smaller filtered list, when possible
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
}
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}
Expand Down Expand Up @@ -1122,7 +1147,15 @@ public void writeBrokerDataOnZooKeeper(boolean force) {
*/
private int selectTopKBundle() {
bundleArr.clear();
bundleArr.addAll(loadData.getBundleData().entrySet());
Set<String> sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces();
for (Map.Entry<String, BundleData> entry : loadData.getBundleData().entrySet()) {
String bundle = entry.getKey();
String namespace = NamespaceBundle.getBundleNamespace(bundle);
if (sheddingExcludedNamespaces.contains(namespace)) {
continue;
}
bundleArr.add(entry);
}

int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration()
.getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,41 @@ public void testAssign() throws Exception {
assertEquals(webServiceUrl.get().toString(), brokerLookupData.get().getWebServiceUrl());
}

// Test that the load manager will use round-robin assignment
// if the namespace is in loadBalancerSheddingExcludedNamespaces.
@Test
public void testSelectBrokerForSheddingExcludedNamespaces() throws Exception {
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of(defaultTestNamespace));
try {
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-topic" + UUID.randomUUID());
NamespaceBundle bundle1 = topicAndBundle.getRight();
Optional<BrokerLookupData> brokerLookupData1 = primaryLoadManager.assign(Optional.empty(), bundle1,
LookupOptions.builder().build()).get();
assertTrue(brokerLookupData1.isPresent());
log.info("Assign the bundle1 {} to {}", bundle1, brokerLookupData1);

String webServiceUrl1 = brokerLookupData1.get().getWebServiceUrl();

Pair<TopicName, NamespaceBundle> topicAndBundle2 =
getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID());

while (topicAndBundle2.getRight().toString().equals(topicAndBundle.getRight().toString())
|| primaryLoadManager.checkOwnershipAsync(Optional.empty(), topicAndBundle2.getRight()).get()) {
topicAndBundle2 = getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID());
}
NamespaceBundle bundle2 = topicAndBundle2.getRight();
Optional<BrokerLookupData> brokerLookupData2 = primaryLoadManager.assign(Optional.empty(), bundle2,
LookupOptions.builder().build()).get();
assertTrue(brokerLookupData2.isPresent());
log.info("Assign the bundle2 {} to {}", bundle2, brokerLookupData2);
String webServiceUrl2 = brokerLookupData2.get().getWebServiceUrl();
assertNotEquals(webServiceUrl1, webServiceUrl2);
} finally {
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of());
}
}

@Test
public void testLookupOptions() throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
Expand Down Expand Up @@ -123,6 +124,25 @@
stats1.msgRateIn = 500;
stats1.msgThroughputOut = 10;
bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1);
NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
bundleStats.put(bundle1, stats2);

topKBundles.update(bundleStats, 2);

assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 1);

Check failure on line 133 in pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 1

TopKBundlesTest.testSystemNamespace

expected [1] but found [0]
var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0);
assertEquals(top0.bundleName(), bundle1);
}

@Test
public void testSheddingExcludedNamespaces() {
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
var topKBundles = new TopKBundles(pulsar);
pulsar.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of("my-tenant/my-namespace2"));
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
bundleStats.put("my-tenant/my-namespace2/0x00000000_0x0FFFFFFF", stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,25 @@ public void testRecentlyUnloadedBundles() {
assertEquals(counter.getLoadStd(), setupLoadStd);
}

@Test
public void testSheddingExcludedNamespaces() {
UnloadCounter counter = new UnloadCounter();
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
ctx.brokerConfiguration().setLoadBalancerSheddingExcludedNamespaces(
Set.of("my-tenant/my-namespaceE", "my-tenant/my-namespaceD"));

var res = transferShedder.findBundlesForUnloading(ctx, new HashMap<>(), Map.of());
var expected = new HashSet<UnloadDecision>();
expected.add(new UnloadDecision(new Unload("broker3:8080",
"my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
}

@Test
public void testGetAvailableBrokersFailed() {
UnloadCounter counter = new UnloadCounter();
Expand Down
Loading