Skip to content

Commit ac72d66

Browse files
committed
weightedroundrobin: Implements slow start [WiP]
Implements slow start functionality as described in [gRFC A100](grpc/proposal#498) I still need to do some work on tests and do an e2e experiment with xDS to ensure that things are working as expected, but the core implementation is in a state that can be reviewed. At this phase I'm mainly looking for feedback on the overall approach and design and in the test code because I am worried that I might be missing some test utility that would make testing with `time` easier. Pending Items before this is ready to be merged: - [ ] Add more unit tests for slow start config cases - [ ] Manually verify with an xDS experiment that slow start is working as expected Release Notes * weightedroundrobin: Implements slow start functionality as described in [gRFC A100](grpc/proposal#498) Signed-off-by: sotiris <[email protected]>
1 parent 363018c commit ac72d66

File tree

7 files changed

+237
-24
lines changed

7 files changed

+237
-24
lines changed

balancer/weightedroundrobin/balancer.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ package weightedroundrobin
3030
import (
3131
"encoding/json"
3232
"fmt"
33+
"math"
3334
rand "math/rand/v2"
3435
"sync"
3536
"sync/atomic"
@@ -91,6 +92,15 @@ var (
9192
OptionalLabels: []string{"grpc.lb.locality"},
9293
Default: false,
9394
})
95+
96+
endpointsInSlowStartMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
97+
Name: "grpc.lb.wrr.endpoints_in_slow_start",
98+
Description: "EXPERIMENTAL. Number of endpoints currently in the slow start period. This is incremented when a new scheduler is created.",
99+
Unit: "{endpoint}",
100+
Labels: []string{"grpc.target"},
101+
OptionalLabels: []string{"grpc.lb.locality"},
102+
Default: false,
103+
})
94104
)
95105

96106
func init() {
@@ -132,6 +142,28 @@ func (bb) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, er
132142
return nil, fmt.Errorf("wrr: errorUtilizationPenalty must be non-negative")
133143
}
134144

145+
// Validate slow start configuration if provided
146+
if lbCfg.SlowStartConfig != nil {
147+
ssc := lbCfg.SlowStartConfig
148+
if ssc.Aggression != 0 && ssc.Aggression <= 0 {
149+
return nil, fmt.Errorf("wrr: slowStartConfig.aggression must be greater than 0.0")
150+
}
151+
if ssc.MinWeightPercent != 0 && (ssc.MinWeightPercent < 0 || ssc.MinWeightPercent > 100) {
152+
return nil, fmt.Errorf("wrr: slowStartConfig.minWeightPercent must be in range [0.0, 100.0]")
153+
}
154+
if ssc.SlowStartWindow != 0 && ssc.SlowStartWindow <= 0 {
155+
return nil, fmt.Errorf("wrr: slowStartConfig.slowStartWindow must be greater than 0")
156+
}
157+
158+
// Set defaults for slow start config
159+
if ssc.Aggression == 0 {
160+
ssc.Aggression = 1.0
161+
}
162+
if ssc.MinWeightPercent == 0 {
163+
ssc.MinWeightPercent = 10.0
164+
}
165+
}
166+
135167
// For easier comparisons later, ensure the OOB reporting period is unset
136168
// (0s) when OOB reports are disabled.
137169
if !lbCfg.EnableOOBLoadReport {
@@ -361,6 +393,7 @@ func (b *wrrBalancer) updateSubConnState(sc balancer.SubConn, state balancer.Sub
361393
ew.mu.Lock()
362394
ew.nonEmptySince = time.Time{}
363395
ew.lastUpdated = time.Time{}
396+
ew.readySince = internal.TimeNow() // Set readySince for slow start period
364397
cfg := ew.cfg
365398
ew.mu.Unlock()
366399
ew.updateORCAListener(cfg)
@@ -380,6 +413,9 @@ func (b *wrrBalancer) updateSubConnState(sc balancer.SubConn, state balancer.Sub
380413
if ew.stopORCAListener != nil {
381414
ew.stopORCAListener()
382415
}
416+
ew.mu.Lock()
417+
ew.readySince = time.Time{} // Reset readySince when going non-READY
418+
ew.mu.Unlock()
383419
ew.pickedSC = nil
384420
}
385421
}
@@ -427,7 +463,9 @@ func (p *picker) endpointWeights(recordMetrics bool) []float64 {
427463
wp := make([]float64, len(p.weightedPickers))
428464
now := internal.TimeNow()
429465
for i, wpi := range p.weightedPickers {
430-
wp[i] = wpi.weightedEndpoint.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod), recordMetrics)
466+
baseWeight := wpi.weightedEndpoint.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod), recordMetrics)
467+
slowStartScale := wpi.weightedEndpoint.slowStartScale(now, recordMetrics)
468+
wp[i] = baseWeight * slowStartScale
431469
}
432470
return wp
433471
}
@@ -519,6 +557,7 @@ type endpointWeight struct {
519557
weightVal float64
520558
nonEmptySince time.Time
521559
lastUpdated time.Time
560+
readySince time.Time // Time when endpoint transitioned to ready state (for slow start period)
522561
cfg *lbConfig
523562
}
524563

@@ -634,3 +673,37 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
634673

635674
return w.weightVal
636675
}
676+
677+
// slowStartScale returns the scaling factor for slow start.
678+
// Returns 1.0 if slow start is not configured or endpoint is outside slow start window.
679+
func (w *endpointWeight) slowStartScale(now time.Time, recordMetrics bool) float64 {
680+
w.mu.Lock()
681+
defer w.mu.Unlock()
682+
683+
// No config, slow start config, or readySince not set
684+
if w.cfg == nil || w.cfg.SlowStartConfig == nil || w.readySince.Equal(time.Time{}) {
685+
return 1.0
686+
}
687+
688+
if recordMetrics {
689+
defer func() {
690+
endpointsInSlowStartMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
691+
}()
692+
}
693+
694+
ssc := w.cfg.SlowStartConfig
695+
slowStartWindow := time.Duration(ssc.SlowStartWindow)
696+
timeSinceReady := now.Sub(w.readySince)
697+
// Outside slow start window
698+
if timeSinceReady >= slowStartWindow {
699+
return 1.0
700+
}
701+
702+
// Calculate scaling factor using the formula from gRFC A100:
703+
// scaled_weight = weight * max(min_weight_percent/100, time_factor ^ (1/aggression))
704+
// where time_factor = max(time_since_ready_seconds, 1) / slow_start_window_seconds
705+
timeFactor := math.Max(timeSinceReady.Seconds(), 1.0) / slowStartWindow.Seconds()
706+
minScale := ssc.MinWeightPercent / 100.0
707+
scale := math.Pow(timeFactor, 1.0/ssc.Aggression)
708+
return math.Max(minScale, scale)
709+
}

balancer/weightedroundrobin/balancer_test.go

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,54 @@ func stringp(s string) *string { return &s }
6767

6868
var (
6969
perCallConfig = iwrr.LBConfig{
70+
EnableOOBLoadReport: boolp(false),
71+
OOBReportingPeriod: stringp("0.005s"),
72+
BlackoutPeriod: stringp("0s"),
73+
WeightExpirationPeriod: stringp("60s"),
74+
WeightUpdatePeriod: stringp(".050s"),
75+
ErrorUtilizationPenalty: float64p(0.),
76+
}
77+
perCallConfigWithSlowStart = iwrr.LBConfig{
7078
EnableOOBLoadReport: boolp(false),
7179
OOBReportingPeriod: stringp("0.005s"),
7280
BlackoutPeriod: stringp("0s"),
7381
WeightExpirationPeriod: stringp("60s"),
7482
WeightUpdatePeriod: stringp(".050s"),
7583
ErrorUtilizationPenalty: float64p(0),
84+
SlowStartConfig: &iwrr.SlowStartConfig{
85+
SlowStartWindow: stringp("30s"),
86+
Aggression: float64p(1.0),
87+
MinWeightPercent: float64p(10.0),
88+
},
7689
}
7790
oobConfig = iwrr.LBConfig{
7891
EnableOOBLoadReport: boolp(true),
7992
OOBReportingPeriod: stringp("0.005s"),
8093
BlackoutPeriod: stringp("0s"),
8194
WeightExpirationPeriod: stringp("60s"),
8295
WeightUpdatePeriod: stringp(".050s"),
83-
ErrorUtilizationPenalty: float64p(0),
96+
ErrorUtilizationPenalty: float64p(0.),
97+
}
98+
oobConfigWithSlowStart = iwrr.LBConfig{
99+
EnableOOBLoadReport: boolp(true),
100+
OOBReportingPeriod: stringp("0.005s"),
101+
BlackoutPeriod: stringp("0s"),
102+
WeightExpirationPeriod: stringp("60s"),
103+
WeightUpdatePeriod: stringp(".050s"),
104+
ErrorUtilizationPenalty: float64p(0.),
105+
SlowStartConfig: &iwrr.SlowStartConfig{
106+
SlowStartWindow: stringp("30s"),
107+
Aggression: float64p(1.0),
108+
MinWeightPercent: float64p(10.0),
109+
},
84110
}
85111
testMetricsConfig = iwrr.LBConfig{
86112
EnableOOBLoadReport: boolp(false),
87113
OOBReportingPeriod: stringp("0.005s"),
88114
BlackoutPeriod: stringp("0s"),
89115
WeightExpirationPeriod: stringp("60s"),
90116
WeightUpdatePeriod: stringp("30s"),
91-
ErrorUtilizationPenalty: float64p(0),
117+
ErrorUtilizationPenalty: float64p(0.0),
92118
}
93119
)
94120

@@ -176,8 +202,11 @@ func (s) TestBalancer_OneAddress(t *testing.T) {
176202
cfg iwrr.LBConfig
177203
}{
178204
{rt: reportNone, cfg: perCallConfig},
205+
{rt: reportCall, cfg: perCallConfigWithSlowStart},
179206
{rt: reportCall, cfg: perCallConfig},
207+
{rt: reportCall, cfg: perCallConfigWithSlowStart},
180208
{rt: reportOOB, cfg: oobConfig},
209+
{rt: reportOOB, cfg: oobConfigWithSlowStart},
181210
}
182211

183212
for _, tc := range testCases {
@@ -652,6 +681,66 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
652681
}
653682
}
654683

684+
func (s) TestBalancer_SlowStart(t *testing.T) {
685+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
686+
defer cancel()
687+
688+
srv1 := startServer(t, reportBoth)
689+
srv2 := startServer(t, reportBoth)
690+
srv3 := startServer(t, reportBoth)
691+
692+
// srv1, srv2, srv3 all start loaded equally.
693+
srv1.oobMetrics.SetQPS(10.0)
694+
srv1.oobMetrics.SetApplicationUtilization(1.0)
695+
696+
srv2.oobMetrics.SetQPS(10.0)
697+
srv2.oobMetrics.SetApplicationUtilization(1.0)
698+
699+
srv3.oobMetrics.SetQPS(10.0)
700+
srv3.oobMetrics.SetApplicationUtilization(1.0)
701+
702+
slowStartPeriod := 2 * time.Second
703+
slowStartConfig := &iwrr.SlowStartConfig{
704+
SlowStartWindow: stringp("2s"),
705+
Aggression: float64p(1.0),
706+
MinWeightPercent: float64p(10.0),
707+
}
708+
cfg := oobConfig
709+
cfg.BlackoutPeriod = stringp("0.1s")
710+
cfg.SlowStartConfig = slowStartConfig
711+
sc := svcConfig(t, cfg)
712+
713+
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
714+
t.Fatalf("Error starting client: %v", err)
715+
}
716+
717+
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
718+
srv1.R.UpdateState(resolver.State{Addresses: addrs})
719+
720+
// Call each backend once to ensure the weights have been received.
721+
ensureReached(ctx, t, srv1.Client, 2)
722+
time.Sleep(slowStartPeriod + weightUpdatePeriod)
723+
checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 10})
724+
725+
// Add backend 3
726+
addrs = append(addrs, resolver.Address{Addr: srv3.Address})
727+
srv1.R.UpdateState(resolver.State{Addresses: addrs})
728+
ensureReached(ctx, t, srv1.Client, 3)
729+
730+
// validate that srv3 is in slow start
731+
time.Sleep(weightUpdatePeriod)
732+
checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 10}, srvWeight{srv3, 5})
733+
734+
// validate that srv3 exits slow start
735+
time.Sleep(slowStartPeriod + weightUpdatePeriod)
736+
checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 10}, srvWeight{srv3, 10})
737+
fmt.Printf("Test log ---------------------> End\n")
738+
if srv1.CC != nil {
739+
srv1.CC.Close()
740+
time.Sleep(weightUpdatePeriod) // Wait for cleanup
741+
}
742+
}
743+
655744
// Tests that the weight expiration period causes backends to use 0 as their
656745
// weight (meaning to use the average weight) once the expiration period
657746
// elapses.
@@ -789,7 +878,9 @@ func (s) TestBalancer_AddressesChanging(t *testing.T) {
789878
func ensureReached(ctx context.Context, t *testing.T, c testgrpc.TestServiceClient, n int) {
790879
t.Helper()
791880
reached := make(map[string]struct{})
881+
i := 0
792882
for len(reached) != n {
883+
i++
793884
var peer peer.Peer
794885
if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
795886
t.Fatalf("Error from EmptyCall: %v", err)

balancer/weightedroundrobin/config.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,35 @@ type lbConfig struct {
5656
// The multiplier used to adjust endpoint weights with the error rate
5757
// calculated as eps/qps. Default is 1.0.
5858
ErrorUtilizationPenalty float64 `json:"errorUtilizationPenalty,omitempty"`
59+
60+
// Configuration for slow start feature
61+
SlowStartConfig *SlowStartConfig `json:"slowStartConfig,omitempty"`
62+
}
63+
64+
// SlowStartConfig contains configuration for the slow start feature.
65+
type SlowStartConfig struct {
66+
// Represents the size of slow start window.
67+
//
68+
// The newly created endpoint remains in slow start mode starting from its creation time
69+
// for the duration of slow start window.
70+
SlowStartWindow iserviceconfig.Duration `json:"slowStartWindow,omitempty"`
71+
72+
// This parameter controls the speed of traffic increase over the slow start window. Defaults to 1.0,
73+
// so that endpoint would get linearly increasing amount of traffic.
74+
// When increasing the value for this parameter, the speed of traffic ramp-up increases non-linearly.
75+
// The value of aggression parameter must be greater than 0.0.
76+
// By tuning the parameter, it is possible to achieve polynomial or exponential shape of ramp-up curve.
77+
//
78+
// During slow start window, effective weight of an endpoint would be scaled with time factor and aggression:
79+
// ``new_weight = weight * max(min_weight_percent / 100, time_factor ^ (1 / aggression))``,
80+
// where ``time_factor = max(time_since_start_seconds, 1) / slow_start_window_seconds``.
81+
//
82+
// As time progresses, more and more traffic would be sent to endpoint, which is in slow start window.
83+
// Once endpoint exits slow start, time_factor and aggression no longer affect its weight.
84+
Aggression float64 `json:"aggression,omitempty"`
85+
86+
// Configures the minimum percentage of the original weight that will be used for an endpoint
87+
// in slow start. This helps to avoid a scenario in which endpoints receive no traffic during the
88+
// slow start window. Valid range is [0.0, 100.0]. If the value is not specified, the default is 10%.
89+
MinWeightPercent float64 `json:"minWeightPercent,omitempty"`
5990
}

balancer/weightedroundrobin/internal/internal.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,19 @@ var AllowAnyWeightUpdatePeriod bool
3131
// LBConfig allows tests to produce a JSON form of the config from the struct
3232
// instead of using a string.
3333
type LBConfig struct {
34-
EnableOOBLoadReport *bool `json:"enableOobLoadReport,omitempty"`
35-
OOBReportingPeriod *string `json:"oobReportingPeriod,omitempty"`
36-
BlackoutPeriod *string `json:"blackoutPeriod,omitempty"`
37-
WeightExpirationPeriod *string `json:"weightExpirationPeriod,omitempty"`
38-
WeightUpdatePeriod *string `json:"weightUpdatePeriod,omitempty"`
39-
ErrorUtilizationPenalty *float64 `json:"errorUtilizationPenalty,omitempty"`
34+
EnableOOBLoadReport *bool `json:"enableOobLoadReport,omitempty"`
35+
OOBReportingPeriod *string `json:"oobReportingPeriod,omitempty"`
36+
BlackoutPeriod *string `json:"blackoutPeriod,omitempty"`
37+
WeightExpirationPeriod *string `json:"weightExpirationPeriod,omitempty"`
38+
WeightUpdatePeriod *string `json:"weightUpdatePeriod,omitempty"`
39+
ErrorUtilizationPenalty *float64 `json:"errorUtilizationPenalty,omitempty"`
40+
SlowStartConfig *SlowStartConfig `json:"slowStartConfig,omitempty"`
41+
}
42+
43+
type SlowStartConfig struct {
44+
SlowStartWindow *string `json:"slowStartWindow,omitempty"`
45+
Aggression *float64 `json:"aggression,omitempty"`
46+
MinWeightPercent *float64 `json:"minWeightPercent,omitempty"`
4047
}
4148

4249
// TimeNow can be overridden by tests to return a different value for the

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ go 1.24.0
44

55
require (
66
github.com/cespare/xxhash/v2 v2.3.0
7-
github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f
8-
github.com/envoyproxy/go-control-plane v0.13.5-0.20251024222203-75eaa193e329
9-
github.com/envoyproxy/go-control-plane/envoy v1.35.0
7+
github.com/cncf/xds/go v0.0.0-20251014123835-2ee22ca58382
8+
github.com/envoyproxy/go-control-plane v0.13.4
9+
github.com/envoyproxy/go-control-plane/envoy v1.35.1-0.20251015221300-4138018a492b
1010
github.com/golang/glog v1.2.5
1111
github.com/golang/protobuf v1.5.4
1212
github.com/google/go-cmp v0.7.0

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0
66
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0=
77
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
88
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
9-
github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f h1:Y8xYupdHxryycyPlc9Y+bSQAYZnetRJ70VMVKm5CKI0=
10-
github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4=
9+
github.com/cncf/xds/go v0.0.0-20251014123835-2ee22ca58382 h1:5IeUoAZvqwF6LcCnV99NbhrGKN6ihZgahJv5jKjmZ3k=
10+
github.com/cncf/xds/go v0.0.0-20251014123835-2ee22ca58382/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4=
1111
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
1212
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
13-
github.com/envoyproxy/go-control-plane v0.13.5-0.20251024222203-75eaa193e329 h1:K+fnvUM0VZ7ZFJf0n4L/BRlnsb9pL/GuDG6FqaH+PwM=
14-
github.com/envoyproxy/go-control-plane v0.13.5-0.20251024222203-75eaa193e329/go.mod h1:Alz8LEClvR7xKsrq3qzoc4N0guvVNSS8KmSChGYr9hs=
15-
github.com/envoyproxy/go-control-plane/envoy v1.35.0 h1:ixjkELDE+ru6idPxcHLj8LBVc2bFP7iBytj353BoHUo=
16-
github.com/envoyproxy/go-control-plane/envoy v1.35.0/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs=
13+
github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
14+
github.com/envoyproxy/go-control-plane v0.13.4/go.mod h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
15+
github.com/envoyproxy/go-control-plane/envoy v1.35.1-0.20251015221300-4138018a492b h1:rDyuvoLwwYo/TeYQF/6Ag7BG/LYbrnLp3qvXzzF5JRk=
16+
github.com/envoyproxy/go-control-plane/envoy v1.35.1-0.20251015221300-4138018a492b/go.mod h1:ty89S1YCCVruQAm9OtKeEkQLTb+Lkz0k8v9W0Oxsv98=
1717
github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
1818
github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
1919
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=

0 commit comments

Comments
 (0)