Skip to content

Commit b02706c

Browse files
committed
split scaling policy
1 parent 1b81c7b commit b02706c

File tree

2 files changed

+74
-34
lines changed

2 files changed

+74
-34
lines changed

internal/worker/concurrency_auto_scaler.go

+43-17
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import (
3434
)
3535

3636
const (
37-
defaultAutoScalerUpdateTick = time.Second
38-
targetPollerWaitTimeInMsLog2 = 4 // 16 ms
37+
defaultAutoScalerUpdateTick = time.Second
38+
lowerPollerWaitTime = 16 * time.Millisecond
39+
upperPollerWaitTime = 256 * time.Millisecond
3940
numberOfPollsInRollingAverage = 20
4041

4142
autoScalerEventPollerUpdate autoScalerEvent = "update-poller-limit"
@@ -206,22 +207,14 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
206207
c.logEvent(autoScalerEventPollerSkipUpdateCooldown)
207208
return
208209
}
209-
currentQuota := c.concurrency.PollerPermit.Quota()
210-
// smoothing the scaling through log2 with edge case of zero value
211-
var newQuota int
212-
if waitTime := c.pollerWaitTime.Average(); waitTime == 0 {
213-
newQuota = currentQuota * 2
210+
211+
newQuota := c.concurrency.PollerPermit.Quota()
212+
pollerWaitTime := c.pollerWaitTime.Average()
213+
if pollerWaitTime < lowerPollerWaitTime { // pollers are busy
214+
newQuota = c.scaleUpPollerPermit(pollerWaitTime)
215+
} else if pollerWaitTime > upperPollerWaitTime { // pollers are idle
216+
newQuota = c.scaleDownPollerPermit(pollerWaitTime)
214217
} else {
215-
newQuota = int(math.Round(
216-
float64(currentQuota) * targetPollerWaitTimeInMsLog2 / math.Log2(1+float64(waitTime/time.Millisecond))))
217-
}
218-
if newQuota < c.pollerMinCount {
219-
newQuota = c.pollerMinCount
220-
}
221-
if newQuota > c.pollerMaxCount {
222-
newQuota = c.pollerMaxCount
223-
}
224-
if newQuota == currentQuota {
225218
c.logEvent(autoScalerEventPollerSkipUpdateNoChange)
226219
return
227220
}
@@ -230,6 +223,39 @@ func (c *ConcurrencyAutoScaler) updatePollerPermit() {
230223
c.logEvent(autoScalerEventPollerUpdate)
231224
}
232225

226+
func (c *ConcurrencyAutoScaler) scaleUpPollerPermit(pollerWaitTime time.Duration) int {
227+
currentQuota := c.concurrency.PollerPermit.Quota()
228+
229+
// inverse scaling with edge case of 0 wait time
230+
// use logrithm to smooth the scaling to avoid drastic change
231+
newQuota := math.Round(
232+
float64(currentQuota) * smoothingFunc(lowerPollerWaitTime) / smoothingFunc(pollerWaitTime))
233+
newQuota = math.Max(
234+
float64(c.pollerMinCount),
235+
math.Min(float64(c.pollerMaxCount), newQuota),
236+
)
237+
return int(newQuota)
238+
}
239+
240+
func (c *ConcurrencyAutoScaler) scaleDownPollerPermit(pollerWaitTime time.Duration) int {
241+
currentQuota := c.concurrency.PollerPermit.Quota()
242+
243+
// inverse scaling with edge case of 0 wait time
244+
// use logrithm to smooth the scaling to avoid drastic change
245+
newQuota := math.Round(
246+
float64(currentQuota) * smoothingFunc(upperPollerWaitTime) / smoothingFunc(pollerWaitTime))
247+
newQuota = math.Max(
248+
float64(c.pollerMinCount),
249+
math.Min(float64(c.pollerMaxCount), newQuota),
250+
)
251+
return int(newQuota)
252+
}
253+
254+
// smoothingFunc is a log2 function with offset to smooth the scaling and address 0 values
255+
func smoothingFunc(x time.Duration) float64 {
256+
return math.Log2(2+ float64(x/time.Millisecond))
257+
}
258+
233259
type number interface {
234260
int64 | float64 | time.Duration
235261
}

internal/worker/concurrency_auto_scaler_test.go

+31-17
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,22 @@ func TestConcurrencyAutoScaler(t *testing.T) {
8383
{
8484
"just enough pollers",
8585
[]*shared.AutoConfigHint{
86-
{common.PtrOf(true), common.PtrOf(int64(15))}, // <- tick, in cool down
87-
{common.PtrOf(true), common.PtrOf(int64(15))}, // <- tick, no update
86+
{common.PtrOf(true), common.PtrOf(int64(16))}, // <- tick, in cool down
87+
{common.PtrOf(true), common.PtrOf(int64(16))}, // <- tick, no update
88+
},
89+
[]eventLog{
90+
{autoScalerEventStart, false, 100, "00:00:00"},
91+
{autoScalerEventEnable, true, 100, "00:00:00"},
92+
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
93+
{autoScalerEventPollerSkipUpdateNoChange, true, 100, "00:00:02"},
94+
{autoScalerEventStop, true, 100, "00:00:02"},
95+
},
96+
},
97+
{
98+
"poller slightly idle but no change",
99+
[]*shared.AutoConfigHint{
100+
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, in cool down
101+
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, no update
88102
},
89103
[]eventLog{
90104
{autoScalerEventStart, false, 100, "00:00:00"},
@@ -125,22 +139,22 @@ func TestConcurrencyAutoScaler(t *testing.T) {
125139
{
126140
"idl pollers waiting for tasks",
127141
[]*shared.AutoConfigHint{
128-
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, in cool down
129-
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, scale up
142+
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, in cool down
143+
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, scale up
130144
},
131145
[]eventLog{
132146
{autoScalerEventStart, false, 100, "00:00:00"},
133147
{autoScalerEventEnable, true, 100, "00:00:00"},
134148
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
135-
{autoScalerEventPollerUpdate, true, 60, "00:00:02"},
136-
{autoScalerEventStop, true, 60, "00:00:02"},
149+
{autoScalerEventPollerUpdate, true, 80, "00:00:02"},
150+
{autoScalerEventStop, true, 80, "00:00:02"},
137151
},
138152
},
139153
{
140154
"idl pollers, scale down to minimum",
141155
[]*shared.AutoConfigHint{
142-
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, in cool down
143-
{common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale up
156+
{common.PtrOf(true), common.PtrOf(int64(60000))}, // <- tick, in cool down
157+
{common.PtrOf(true), common.PtrOf(int64(60000))}, // <- tick, scale up
144158
},
145159
[]eventLog{
146160
{autoScalerEventStart, false, 100, "00:00:00"},
@@ -166,15 +180,15 @@ func TestConcurrencyAutoScaler(t *testing.T) {
166180
{
167181
"idl pollers but disabled scaling at a later time",
168182
[]*shared.AutoConfigHint{
169-
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, in cool down
170-
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, scale up
171-
{common.PtrOf(false), common.PtrOf(int64(100))}, // <- disable
183+
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, in cool down
184+
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, scale up
185+
{common.PtrOf(false), common.PtrOf(int64(1000))}, // <- disable
172186
},
173187
[]eventLog{
174188
{autoScalerEventStart, false, 100, "00:00:00"},
175189
{autoScalerEventEnable, true, 100, "00:00:00"},
176190
{autoScalerEventPollerSkipUpdateCooldown, true, 100, "00:00:01"},
177-
{autoScalerEventPollerUpdate, true, 60, "00:00:02"},
191+
{autoScalerEventPollerUpdate, true, 80, "00:00:02"},
178192
{autoScalerEventDisable, false, 100, "00:00:02"},
179193
{autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:03"},
180194
{autoScalerEventStop, false, 100, "00:00:03"},
@@ -183,17 +197,17 @@ func TestConcurrencyAutoScaler(t *testing.T) {
183197
{
184198
"idl pollers and enabled at a later time",
185199
[]*shared.AutoConfigHint{
186-
{common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, in cool down
187-
{common.PtrOf(false), common.PtrOf(int64(100))}, // <- tick, not enabled
188-
{common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, enable scale up
200+
{common.PtrOf(false), common.PtrOf(int64(1000))}, // <- tick, in cool down
201+
{common.PtrOf(false), common.PtrOf(int64(1000))}, // <- tick, not enabled
202+
{common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, enable scale up
189203
},
190204
[]eventLog{
191205
{autoScalerEventStart, false, 100, "00:00:00"},
192206
{autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:01"},
193207
{autoScalerEventPollerSkipUpdateNotEnabled, false, 100, "00:00:02"},
194208
{autoScalerEventEnable, true, 100, "00:00:02"},
195-
{autoScalerEventPollerUpdate, true, 60, "00:00:03"},
196-
{autoScalerEventStop, true, 60, "00:00:03"},
209+
{autoScalerEventPollerUpdate, true, 80, "00:00:03"},
210+
{autoScalerEventStop, true, 80, "00:00:03"},
197211
},
198212
},
199213
} {

0 commit comments

Comments
 (0)