Skip to content

Commit 1bb183d

Browse files
heesung-snlhotari
authored andcommitted
[improve][broker] Skip unloading when bundle throughput is zero (ExtensibleLoadManagerImpl only) (#23626)
(cherry picked from commit e8657e2)
1 parent 983b73d commit 1bb183d

File tree

4 files changed

+62
-0
lines changed

4 files changed

+62
-0
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java

+6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
7272
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
7373
for (var etr : bundleStats.entrySet()) {
7474
String bundle = etr.getKey();
75+
var stat = etr.getValue();
76+
77+
// skip zero traffic bundles
78+
if (stat.msgThroughputIn + stat.msgThroughputOut == 0) {
79+
continue;
80+
}
7581
// TODO: do not filter system topic while shedding
7682
if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
7783
continue;

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java

+10
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,13 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
528528

529529
var bundleData = e.stats();
530530
double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
531+
if (maxBrokerBundleThroughput == 0) {
532+
if (debugMode) {
533+
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
534+
+ " It has zero throughput.", bundle));
535+
}
536+
continue;
537+
}
531538
boolean swap = false;
532539
List<Unload> minToMaxUnloads = new ArrayList<>();
533540
double minBrokerBundleSwapThroughput = 0.0;
@@ -549,6 +556,9 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
549556
var minBrokerBundleThroughput =
550557
minBrokerBundleData.stats().msgThroughputIn
551558
+ minBrokerBundleData.stats().msgThroughputOut;
559+
if (minBrokerBundleThroughput == 0) {
560+
continue;
561+
}
552562
var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
553563
var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
554564
if (maxBrokerNewThroughputTmp < maxBrokerThroughput

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,17 @@ public void testTopBundlesLoadData() {
8888
var topKBundles = new TopKBundles(pulsar);
8989
NamespaceBundleStats stats1 = new NamespaceBundleStats();
9090
stats1.msgRateIn = 100000;
91+
stats1.msgThroughputOut = 10;
9192
bundleStats.put(bundle1, stats1);
9293

9394
NamespaceBundleStats stats2 = new NamespaceBundleStats();
9495
stats2.msgRateIn = 500;
96+
stats2.msgThroughputOut = 10;
9597
bundleStats.put(bundle2, stats2);
9698

9799
NamespaceBundleStats stats3 = new NamespaceBundleStats();
98100
stats3.msgRateIn = 10000;
101+
stats3.msgThroughputOut = 10;
99102
bundleStats.put(bundle3, stats3);
100103

101104
NamespaceBundleStats stats4 = new NamespaceBundleStats();
@@ -118,10 +121,12 @@ public void testSystemNamespace() {
118121
var topKBundles = new TopKBundles(pulsar);
119122
NamespaceBundleStats stats1 = new NamespaceBundleStats();
120123
stats1.msgRateIn = 500;
124+
stats1.msgThroughputOut = 10;
121125
bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1);
122126

123127
NamespaceBundleStats stats2 = new NamespaceBundleStats();
124128
stats2.msgRateIn = 10000;
129+
stats2.msgThroughputOut = 10;
125130
bundleStats.put(bundle1, stats2);
126131

127132
topKBundles.update(bundleStats, 2);
@@ -131,6 +136,21 @@ public void testSystemNamespace() {
131136
assertEquals(top0.bundleName(), bundle1);
132137
}
133138

139+
@Test
140+
public void testZeroMsgThroughputBundleStats() {
141+
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
142+
var topKBundles = new TopKBundles(pulsar);
143+
NamespaceBundleStats stats1 = new NamespaceBundleStats();
144+
bundleStats.put(bundle1, stats1);
145+
146+
NamespaceBundleStats stats2 = new NamespaceBundleStats();
147+
bundleStats.put(bundle1, stats2);
148+
149+
topKBundles.update(bundleStats, 2);
150+
151+
assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 0);
152+
}
153+
134154

135155
private void setAntiAffinityGroup() throws MetadataStoreException {
136156
LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup");
@@ -166,10 +186,12 @@ public void testIsolationPolicy() throws MetadataStoreException {
166186
var topKBundles = new TopKBundles(pulsar);
167187
NamespaceBundleStats stats1 = new NamespaceBundleStats();
168188
stats1.msgRateIn = 500;
189+
stats1.msgThroughputOut = 10;
169190
bundleStats.put(bundle1, stats1);
170191

171192
NamespaceBundleStats stats2 = new NamespaceBundleStats();
172193
stats2.msgRateIn = 10000;
194+
stats2.msgThroughputOut = 10;
173195
bundleStats.put(bundle2, stats2);
174196

175197
topKBundles.update(bundleStats, 2);
@@ -188,10 +210,12 @@ public void testAntiAffinityGroupPolicy() throws MetadataStoreException {
188210
var topKBundles = new TopKBundles(pulsar);
189211
NamespaceBundleStats stats1 = new NamespaceBundleStats();
190212
stats1.msgRateIn = 500;
213+
stats1.msgThroughputOut = 10;
191214
bundleStats.put(bundle1, stats1);
192215

193216
NamespaceBundleStats stats2 = new NamespaceBundleStats();
194217
stats2.msgRateIn = 10000;
218+
stats2.msgThroughputOut = 10;
195219
bundleStats.put(bundle2, stats2);
196220

197221
topKBundles.update(bundleStats, 2);
@@ -213,10 +237,12 @@ public void testLoadBalancerSheddingBundlesWithPoliciesEnabledConfig() throws Me
213237
var topKBundles = new TopKBundles(pulsar);
214238
NamespaceBundleStats stats1 = new NamespaceBundleStats();
215239
stats1.msgRateIn = 500;
240+
stats1.msgThroughputOut = 10;
216241
bundleStats.put(bundle1, stats1);
217242

218243
NamespaceBundleStats stats2 = new NamespaceBundleStats();
219244
stats2.msgRateIn = 10000;
245+
stats2.msgThroughputOut = 10;
220246
bundleStats.put(bundle2, stats2);
221247

222248
topKBundles.update(bundleStats, 2);

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,26 @@ public void testBundleThroughputLargerThanOffloadThreshold() {
918918
assertEquals(counter.getLoadStd(), setupLoadStd);
919919
}
920920

921+
@Test
922+
public void testZeroBundleThroughput() {
923+
UnloadCounter counter = new UnloadCounter();
924+
TransferShedder transferShedder = new TransferShedder(counter);
925+
var ctx = setupContext();
926+
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
927+
for (var e : topBundlesLoadDataStore.entrySet()) {
928+
for (var stat : e.getValue().getTopBundlesLoadData()) {
929+
stat.stats().msgThroughputOut = 0;
930+
stat.stats().msgThroughputIn = 0;
931+
932+
}
933+
}
934+
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
935+
assertTrue(res.isEmpty());
936+
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
937+
assertEquals(counter.getLoadAvg(), setupLoadAvg);
938+
assertEquals(counter.getLoadStd(), setupLoadStd);
939+
}
940+
921941

922942
@Test
923943
public void testTargetStdAfterTransfer() {

0 commit comments

Comments
 (0)