Skip to content

Commit 11006a4

Browse files
committed
xds: Normalize weights before combining endpoint and locality weights
Previously, the number of endpoints in a locality would skew how much traffic was sent to that locality. Also, if endpoints in localities had wildly different weights, that would impact cross-locality weighting. For example, consider: LocalityA weight=1 endpointWeights=[100, 100, 100, 100] LocalityB weight=1 endpointWeights=[1] The endpoint in LocalityB should have an endpoint weight that is half the total sum of endpoint weights, in order to receive half the traffic. But the multiple endpoints in LocalityA would cause it to get 4x the traffic and the endpoint weights in LocalityA causes them to get 100x the traffic. See gRFC A113
1 parent c589bef commit 11006a4

File tree

3 files changed

+91
-7
lines changed

3 files changed

+91
-7
lines changed

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
2323

2424
import com.google.common.collect.ImmutableMap;
25+
import com.google.common.primitives.UnsignedInts;
2526
import com.google.errorprone.annotations.CheckReturnValue;
2627
import io.grpc.Attributes;
2728
import io.grpc.EquivalentAddressGroup;
@@ -33,6 +34,7 @@
3334
import io.grpc.NameResolver;
3435
import io.grpc.Status;
3536
import io.grpc.StatusOr;
37+
import io.grpc.internal.GrpcUtil;
3638
import io.grpc.util.GracefulSwitchLoadBalancer;
3739
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
3840
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
@@ -74,6 +76,9 @@
7476
* by a group of sub-clusters in a tree hierarchy.
7577
*/
7678
final class CdsLoadBalancer2 extends LoadBalancer {
79+
static boolean pickFirstWeightedShuffling =
80+
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING", true);
81+
7782
private final XdsLogger logger;
7883
private final Helper helper;
7984
private final LoadBalancerRegistry lbRegistry;
@@ -222,6 +227,26 @@ private String errorPrefix() {
222227
return "CdsLb for " + clusterName + ": ";
223228
}
224229

230+
/**
231+
* The number of bits assigned to the fractional part of fixed-point values. We normalize weights
232+
* to a fixed-point number between 0 and 1, representing that item's proportion of traffic (1 ==
233+
* 100% of traffic). We reserve at least one bit for the whole number so that we don't need to
234+
* special case a single item, and so that we can round up very low values without risking uint32
235+
* overflow of the sum of weights.
236+
*/
237+
private static final int FIXED_POINT_FRACTIONAL_BITS = 31;
238+
239+
/** Divide two uint32s and produce a fixed-point uint32 result. */
240+
private static long fractionToFixedPoint(long numerator, long denominator) {
241+
long one = 1L << FIXED_POINT_FRACTIONAL_BITS;
242+
return numerator * one / denominator;
243+
}
244+
245+
/** Multiply two uint32 fixed-point numbers, returning a uint32 fixed-point. */
246+
private static long fixedPointMultiply(long a, long b) {
247+
return (a * b) >> FIXED_POINT_FRACTIONAL_BITS;
248+
}
249+
225250
private static StatusOr<EdsUpdate> getEdsUpdate(XdsConfig xdsConfig, String cluster) {
226251
StatusOr<XdsClusterConfig> clusterConfig = xdsConfig.getClusters().get(cluster);
227252
if (clusterConfig == null) {
@@ -286,17 +311,61 @@ StatusOr<ClusterResolutionResult> edsUpdateToResult(
286311
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
287312
List<String> sortedPriorityNames =
288313
generatePriorityNames(clusterName, localityLbEndpoints);
314+
Map<String, Long> priorityLocalityWeightSums;
315+
if (pickFirstWeightedShuffling) {
316+
priorityLocalityWeightSums = new HashMap<>(sortedPriorityNames.size() * 2);
317+
for (Locality locality : localityLbEndpoints.keySet()) {
318+
LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
319+
String priorityName = localityPriorityNames.get(locality);
320+
Long sum = priorityLocalityWeightSums.get(priorityName);
321+
if (sum == null) {
322+
sum = 0L;
323+
}
324+
long weight = UnsignedInts.toLong(localityLbInfo.localityWeight());
325+
priorityLocalityWeightSums.put(priorityName, sum + weight);
326+
}
327+
} else {
328+
priorityLocalityWeightSums = null;
329+
}
330+
289331
for (Locality locality : localityLbEndpoints.keySet()) {
290332
LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
291333
String priorityName = localityPriorityNames.get(locality);
292334
boolean discard = true;
335+
// These sums _should_ fit in uint32, but XdsEndpointResource isn't actually verifying that
336+
// is true today. Since we are using long to avoid signedness trouble, the math happens to
337+
// still work if it turns out the sums exceed uint32.
338+
long localityWeightSum = 0;
339+
long endpointWeightSum = 0;
340+
if (pickFirstWeightedShuffling) {
341+
localityWeightSum = priorityLocalityWeightSums.get(priorityName);
342+
for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
343+
if (endpoint.isHealthy()) {
344+
endpointWeightSum += UnsignedInts.toLong(endpoint.loadBalancingWeight());
345+
}
346+
}
347+
}
293348
for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
294349
if (endpoint.isHealthy()) {
295350
discard = false;
296-
long weight = localityLbInfo.localityWeight();
297-
if (endpoint.loadBalancingWeight() != 0) {
298-
weight *= endpoint.loadBalancingWeight();
351+
long weight;
352+
if (pickFirstWeightedShuffling) {
353+
// Combine locality and endpoint weights as defined by gRFC A113
354+
long localityWeight = fractionToFixedPoint(
355+
UnsignedInts.toLong(localityLbInfo.localityWeight()), localityWeightSum);
356+
long endpointWeight = fractionToFixedPoint(
357+
UnsignedInts.toLong(endpoint.loadBalancingWeight()), endpointWeightSum);
358+
weight = fixedPointMultiply(localityWeight, endpointWeight);
359+
if (weight == 0) {
360+
weight = 1;
361+
}
362+
} else {
363+
weight = localityLbInfo.localityWeight();
364+
if (endpoint.loadBalancingWeight() != 0) {
365+
weight *= endpoint.loadBalancingWeight();
366+
}
299367
}
368+
300369
String localityName = localityName(locality);
301370
Attributes attr =
302371
endpoint.eag().getAttributes().toBuilder()

xds/src/main/java/io/grpc/xds/Endpoints.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ abstract static class LbEndpoint {
5959
// The endpoint address to be connected to.
6060
abstract EquivalentAddressGroup eag();
6161

62-
// Endpoint's weight for load balancing. If unspecified, value of 0 is returned.
62+
// Endpoint's weight for load balancing. Guaranteed not to be 0.
6363
abstract int loadBalancingWeight();
6464

6565
// Whether the endpoint is healthy.
@@ -71,6 +71,9 @@ abstract static class LbEndpoint {
7171

7272
static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
7373
boolean isHealthy, String hostname, ImmutableMap<String, Object> endpointMetadata) {
74+
if (loadBalancingWeight == 0) {
75+
loadBalancingWeight = 1;
76+
}
7477
return new AutoValue_Endpoints_LbEndpoint(
7578
eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata);
7679
}

xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,18 @@ public void tearDown() throws Exception {
260260
assertThat(fakeClock.getPendingTasks()).isEmpty();
261261
}
262262

263+
@Test
264+
public void edsClustersWithRingHashEndpointLbPolicy_oppositePickFirstWeightedShuffling()
265+
throws Exception {
266+
boolean original = CdsLoadBalancer2.pickFirstWeightedShuffling;
267+
CdsLoadBalancer2.pickFirstWeightedShuffling = !CdsLoadBalancer2.pickFirstWeightedShuffling;
268+
try {
269+
edsClustersWithRingHashEndpointLbPolicy();
270+
} finally {
271+
CdsLoadBalancer2.pickFirstWeightedShuffling = original;
272+
}
273+
}
274+
263275
@Test
264276
public void edsClustersWithRingHashEndpointLbPolicy() throws Exception {
265277
boolean originalVal = LoadStatsManager2.isEnabledOrcaLrsPropagation;
@@ -306,15 +318,15 @@ public void edsClustersWithRingHashEndpointLbPolicy() throws Exception {
306318
assertThat(addr1.getAddresses())
307319
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.1", 8080)));
308320
assertThat(addr1.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
309-
.isEqualTo(10);
321+
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10);
310322
assertThat(addr2.getAddresses())
311323
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.2", 8080)));
312324
assertThat(addr2.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
313-
.isEqualTo(10);
325+
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10);
314326
assertThat(addr3.getAddresses())
315327
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.1.1", 8080)));
316328
assertThat(addr3.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
317-
.isEqualTo(50 * 60);
329+
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x6AAAAAAA /* 5/6 */ : 50 * 60);
318330
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
319331
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
320332
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER + "[child1]");

0 commit comments

Comments
 (0)