Skip to content

Commit 04561e5

Browse files
authored
Allow dynamic window multiplier via a new extension (#833)
1 parent 5e5ebb7 commit 04561e5

File tree

6 files changed

+322
-89
lines changed

6 files changed

+322
-89
lines changed

processor/ratelimitprocessor/README.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ You can override one or more of the following fields:
3737
| Field | Description | Required | Default |
3838
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------|
3939
| `enabled` | Enables the dynamic rate limiting feature. | No | `false` |
40-
| `window_multiplier` | The factor by which the previous window rate is multiplied to get the dynamic limit. | No | `1.3` |
4140
| `window_duration` | The time window duration for calculating traffic rates. | No | `2m` |
41+
| `default_window_multiplier` | The default factor by which the previous window rate is multiplied to get the dynamic limit. Can be overridden by providing a `window_configurator` extension. | No | `1.3` |
42+
| `window_configurator` | An optional extension to calculate window multiplier dynamically based on unique keys. | No | |
4243

4344
### Dynamic Rate Limiting Deep Dive
4445

@@ -59,7 +60,14 @@ Where:
5960

6061
* `previous_rate`: The rate of traffic in the previous window (normalized per second).
6162
* `static_rate`: The configured `rate` in the main configuration.
62-
* `window_multiplier`: A factor applied to the previous window rate to determine the dynamic limit.
63+
* `window_multiplier`: A factor applied to the previous window rate to determine the dynamic limit. It is derived based on the following formula:
64+
65+
```text
66+
window_multiplier = default_window_multiplier
67+
if window_configurator is defined {
68+
window_multiplier = window_configurator::Multiplier(...)
69+
}
70+
```
6371

6472
**Important Notes:**
6573

@@ -72,7 +80,7 @@ Let's walk through a few examples to illustrate the behavior of the dynamic rate
7280
Assume the following configuration:
7381

7482
* `window_duration`: 2m
75-
* `window_multiplier`: 1.5
83+
* `window_multiplier`: 1.5 (with no `window_configurator` provided)
7684
* `rate`: 1000 requests/second (this is our `static_rate`)
7785

7886
#### Scenario 1: Initial Traffic
@@ -296,3 +304,18 @@ Telemetry and metrics:
296304

297305
* `ratelimit.resolver.failures` — total number of resolver failures
298306
* `ratelimit.dynamic.escalations` — number of times dynamic rate was peeked (attributes: `class`, `source_kind`, `success`)
307+
308+
### Throttling rate based on custom logic using `window_configurator`
309+
310+
The `window_configurator` option configures a custom OpenTelemetry Collector extension to dynamically choose a window multiplier for the current period. The value is the extension ID (the extension's configured name). The extension MUST implement the `WindowConfigurator` interface. The multiplier can be used to scale up the rate limit from previous window (by returning a multiplier greater than `1`) or scale down the rate limit from previous window (by returning a multiplier less than `1`, greater than `0`). If the extension returns a negative value then the `default_window_multiplier` will be used. Note that the dynamic limit MUST be at least the configured `static_rate`, ensuring a minimum level of throughput. An example configuration including the window configurator:
311+
312+
```yaml
313+
processors:
314+
ratelimiter:
315+
dynamic_limits:
316+
enabled: true
317+
window_duration: 2m
318+
default_window_multiplier: 1.5
319+
# This is an example window configurator. It doesn't exist.
320+
window_configurator: kafkalagwindowconfiguratorextension
321+
```

processor/ratelimitprocessor/config.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,22 @@ type Config struct {
7171
type DynamicRateLimiting struct {
7272
// Enabled tells the processor to use dynamic rate limiting.
7373
Enabled bool `mapstructure:"enabled"`
74-
// WindowMultiplier is the factor by which the previous window rate is
75-
// multiplied to get the dynamic part of the limit. Defaults to 1.3.
76-
WindowMultiplier float64 `mapstructure:"window_multiplier"`
74+
7775
// WindowDuration defines the time window for which the dynamic rate limit
78-
// is calculated on.
76+
// is calculated on. Defaults to 2 minutes.
7977
WindowDuration time.Duration `mapstructure:"window_duration"`
78+
79+
// DefaultWindowMultiplier is the factor by which the previous window rate is
80+
// multiplied to get the dynamic part of the limit. Defaults to 1.3.
81+
DefaultWindowMultiplier float64 `mapstructure:"default_window_multiplier"`
82+
83+
// WindowConfigurator is the component ID of the extension to dynamically
84+
// determine the window multiplier. The extension is expected to implement
85+
// the `WindowConfigurator` interface. The window configurator is used in
86+
// the hot path so it should respond fast. The effective rate cannot go
87+
// below the configured static rate limit settings. If the configurator
88+
// returns a negative multiplier then the default multiplier will be used.
89+
WindowConfigurator component.ID `mapstructure:"window_configurator"`
8090
}
8191

8292
// Class defines a named rate limit class for class-based dynamic rate limiting.
@@ -98,8 +108,8 @@ func (d *DynamicRateLimiting) Validate() error {
98108
return nil
99109
}
100110
var errs []error
101-
if d.WindowMultiplier < 1 {
102-
errs = append(errs, errors.New("window_multiplier must be greater than or equal to 1"))
111+
if d.DefaultWindowMultiplier < 1 {
112+
errs = append(errs, errors.New("default_window_multiplier must be greater than or equal to 1"))
103113
}
104114
if d.WindowDuration <= 0 {
105115
errs = append(errs, errors.New("window_duration must be greater than zero"))
@@ -244,8 +254,8 @@ func createDefaultConfig() component.Config {
244254
ThrottleInterval: DefaultThrottleInterval,
245255
},
246256
DynamicRateLimiting: DynamicRateLimiting{
247-
WindowMultiplier: 1.3,
248-
WindowDuration: 2 * time.Minute,
257+
DefaultWindowMultiplier: 1.3,
258+
WindowDuration: 2 * time.Minute,
249259
},
250260
Classes: nil,
251261
DefaultClass: "",

processor/ratelimitprocessor/config_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ import (
3232
)
3333

3434
var defaultDynamicRateLimiting = DynamicRateLimiting{
35-
WindowMultiplier: 1.3,
36-
WindowDuration: 2 * time.Minute,
35+
DefaultWindowMultiplier: 1.3,
36+
WindowDuration: 2 * time.Minute,
3737
}
3838

3939
func TestLoadConfig(t *testing.T) {
@@ -211,9 +211,9 @@ func TestLoadConfig(t *testing.T) {
211211
ThrottleInterval: 1 * time.Second,
212212
},
213213
DynamicRateLimiting: DynamicRateLimiting{
214-
Enabled: true,
215-
WindowMultiplier: 1.5,
216-
WindowDuration: time.Minute,
214+
Enabled: true,
215+
DefaultWindowMultiplier: 1.5,
216+
WindowDuration: time.Minute,
217217
},
218218
},
219219
},

processor/ratelimitprocessor/gubernator.go

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,29 @@ type ClassResolver interface {
5151
ResolveClass(ctx context.Context, key string) (string, error)
5252
}
5353

54+
// WindowConfigurator allows adjusting the rates dynamically by configuring
55+
// the multiplier for the next calculation window.
56+
//
57+
// NOTE(lahsivjar): We may want to make the duration configurable too.
58+
type WindowConfigurator interface {
59+
// Multiplier returns the calculated multiplier for the next window.
60+
Multiplier(ctx context.Context, window time.Duration, key string) float64
61+
}
62+
5463
type noopResolver struct{}
5564

5665
func (noopResolver) ResolveClass(context.Context, string) (string, error) {
5766
return "", nil
5867
}
5968

69+
type defaultWindowConfigurator struct {
70+
multiplier float64
71+
}
72+
73+
func (d defaultWindowConfigurator) Multiplier(context.Context, time.Duration, string) float64 {
74+
return d.multiplier
75+
}
76+
6077
type gubernatorRateLimiter struct {
6178
cfg *Config
6279
logger *zap.Logger
@@ -67,8 +84,9 @@ type gubernatorRateLimiter struct {
6784
client gubernator.V1Client
6885
clientConn *grpc.ClientConn
6986
// Class resolver for class-based rate limiting
70-
classResolver ClassResolver
71-
telemetryBuilder *metadata.TelemetryBuilder
87+
classResolver ClassResolver
88+
windowConfigurator WindowConfigurator
89+
telemetryBuilder *metadata.TelemetryBuilder
7290
}
7391

7492
func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, error) {
@@ -102,12 +120,13 @@ func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder
102120
}
103121

104122
return &gubernatorRateLimiter{
105-
cfg: cfg,
106-
logger: logger,
107-
behavior: gubernator.Behavior_BATCHING,
108-
daemonCfg: daemonCfg,
109-
telemetryBuilder: telemetryBuilder,
110-
classResolver: noopResolver{},
123+
cfg: cfg,
124+
logger: logger,
125+
behavior: gubernator.Behavior_BATCHING,
126+
daemonCfg: daemonCfg,
127+
telemetryBuilder: telemetryBuilder,
128+
classResolver: noopResolver{},
129+
windowConfigurator: defaultWindowConfigurator{multiplier: cfg.DynamicRateLimiting.DefaultWindowMultiplier},
111130
}, nil
112131
}
113132

@@ -123,6 +142,17 @@ func (r *gubernatorRateLimiter) Start(ctx context.Context, host component.Host)
123142
r.classResolver = cr.(ClassResolver)
124143
}
125144

145+
if wCon := r.cfg.DynamicRateLimiting.WindowConfigurator; wCon.String() != "" {
146+
wc, ok := host.GetExtensions()[wCon]
147+
if !ok {
148+
return fmt.Errorf("window configurator %s not found", wCon)
149+
}
150+
if err := wc.Start(ctx, host); err != nil {
151+
return fmt.Errorf("failed to start window configurator %s: %w", wCon, err)
152+
}
153+
r.windowConfigurator = wc.(WindowConfigurator)
154+
}
155+
126156
r.daemon, err = gubernator.SpawnDaemon(ctx, r.daemonCfg)
127157
if err != nil {
128158
return fmt.Errorf("failed to spawn gubernator daemon: %w", err)
@@ -154,6 +184,11 @@ func (r *gubernatorRateLimiter) Shutdown(ctx context.Context) error {
154184
return fmt.Errorf("failed to shutdown class resolver: %w", err)
155185
}
156186
}
187+
if w, ok := r.windowConfigurator.(component.Component); ok {
188+
if err := w.Shutdown(ctx); err != nil {
189+
return fmt.Errorf("failed to shutdown window configurator: %w", err)
190+
}
191+
}
157192
return nil
158193
}
159194

@@ -356,20 +391,35 @@ func (r *gubernatorRateLimiter) getDynamicLimit(ctx context.Context,
356391
// reqs/events/bytes per Throttle interval may not be 1s.
357392
staticRate := float64(cfg.Rate) / r.cfg.ThrottleInterval.Seconds()
358393
drc := newDynamicRateContext(uniqueKey, now, r.cfg.DynamicRateLimiting)
359-
// Get current and previous window rates
360-
current, previous, err := r.peekRates(ctx, int64(hits), drc)
394+
// Get current and previous window rates, the current rates are without
395+
// accounting for the new hits.
396+
current, previous, err := r.peekRates(ctx, drc)
361397
if err != nil {
362398
return -1, err
363399
}
400+
windowMultiplier := r.windowConfigurator.Multiplier(
401+
ctx,
402+
drc.WindowDuration,
403+
uniqueKey,
404+
)
405+
if windowMultiplier < 0 {
406+
windowMultiplier = drc.DefaultWindowMultiplier
407+
}
364408
// Only record the incoming hits when the current rate is within the allowed
365409
// range, otherwise, do not record the hits and return the calculated rate.
366-
// The idea is to continuously increase the rate limit. MaxAllowed sets a
367-
// ceiling on it with the window duration.
410+
// MaxAllowed sets a ceiling on the rate with the window duration.
411+
//
368412
// NOTE(marclop) We may want to add a follow-up static ceiling to avoid
369413
// unbounded growth.
370-
maxAllowed := math.Max(staticRate, previous*drc.WindowMultiplier)
414+
maxAllowed := math.Max(staticRate, previous*windowMultiplier)
415+
// Normalise the current rate assuming no more events will occur during the
416+
// rest of the window. This will ensure that we record hits based on the
417+
// currently observed hits and NOT based on extrapolated data.
418+
current = current * drc.elapsed.Seconds() / drc.WindowDuration.Seconds()
371419
if current <= maxAllowed {
372-
if err := r.recordHits(ctx, drc, hits); err != nil {
420+
// Deduce how many hits to record to reach to the max allowed number
421+
remainingHits := int((maxAllowed - current) * drc.WindowDuration.Seconds())
422+
if err := r.recordHits(ctx, drc, min(hits, remainingHits)); err != nil {
373423
return -1, err
374424
}
375425
}
@@ -397,9 +447,10 @@ func (r *gubernatorRateLimiter) newDynamicRequest(
397447
}
398448
}
399449

400-
// peekRates retrieves the current (including incoming hits) and previous rates
401-
// from Gubernator. All Rates are normalized per second.
402-
func (r *gubernatorRateLimiter) peekRates(ctx context.Context, hits int64,
450+
// peekRates retrieves the current and previous rates from Gubernator. All
451+
// Rates are normalized per second.
452+
func (r *gubernatorRateLimiter) peekRates(
453+
ctx context.Context,
403454
drc dynamicRateContext,
404455
) (float64, float64, error) {
405456
// ----------------------- PEEK PHASE -----------------------
@@ -417,9 +468,8 @@ func (r *gubernatorRateLimiter) peekRates(ctx context.Context, hits int64,
417468
return -1, -1, err
418469
}
419470
// Normalize the current rate based on the elapsed time (since the window
420-
// hasn't fully elapsed). Then add the current hits to it.
471+
// hasn't fully elapsed).
421472
currentRate := rateFromResponse(peekResponses[0], drc.elapsed)
422-
currentRate += float64(hits) / drc.elapsed.Seconds()
423473
// Normalize the PREVIOUS rate based on the window duration.
424474
previousRate := rateFromResponse(peekResponses[1], drc.WindowDuration)
425475
return currentRate, previousRate, nil

0 commit comments

Comments
 (0)