@@ -119,7 +119,7 @@ public synchronized Multimap<String, String> findBundlesForUnloading(final LoadD
119
119
}
120
120
});
121
121
if (selectedBundlesCache .isEmpty () && conf .isLowerBoundarySheddingEnabled ()) {
122
- tryLowerBoundaryShedding (loadData , conf );
122
+ tryLowerBoundaryShedding (loadData , threshold , conf );
123
123
}
124
124
return selectedBundlesCache ;
125
125
}
@@ -182,25 +182,24 @@ private double updateAvgResourceUsage(String broker, LocalBrokerData localBroker
182
182
return historyUsage ;
183
183
}
184
184
185
- private void tryLowerBoundaryShedding (LoadData loadData , ServiceConfiguration conf ) {
185
+ private void tryLowerBoundaryShedding (LoadData loadData , double threshold , ServiceConfiguration conf ) {
186
186
// Select the broker with the most resource usage.
187
- final double threshold = conf .getLoadBalancerBrokerThresholdShedderPercentage () / 100.0 ;
188
187
final double avgUsage = getBrokerAvgUsage (loadData , conf .getLoadBalancerHistoryResourcePercentage (), conf );
189
188
Pair <Boolean , String > result = getMaxUsageBroker (loadData , threshold , avgUsage );
190
189
boolean hasBrokerBelowLowerBound = result .getLeft ();
191
190
String maxUsageBroker = result .getRight ();
192
- BrokerData brokerData = loadData .getBrokerData ().get (maxUsageBroker );
193
- if (brokerData == null ) {
194
- log .info ("Load data is null or bundle <=1, skipping bundle unload." );
195
- return ;
196
- }
197
191
if (!hasBrokerBelowLowerBound ) {
198
192
log .info ("No broker is below the lower bound, threshold is {}, "
199
193
+ "avgUsage usage is {}, max usage of Broker {} is {}" ,
200
194
threshold , avgUsage , maxUsageBroker ,
201
195
brokerAvgResourceUsage .getOrDefault (maxUsageBroker , 0.0 ));
202
196
return ;
203
197
}
198
+ BrokerData brokerData = loadData .getBrokerData ().get (maxUsageBroker );
199
+ if (brokerData == null ) {
200
+ log .info ("Load data is null or bundle <=1, skipping bundle unload." );
201
+ return ;
202
+ }
204
203
LocalBrokerData localData = brokerData .getLocalData ();
205
204
double brokerCurrentThroughput = localData .getMsgThroughputIn () + localData .getMsgThroughputOut ();
206
205
double minimumThroughputToOffload = brokerCurrentThroughput * threshold * LOWER_BOUNDARY_THRESHOLD_MARGIN ;
0 commit comments