Skip to content

Commit 8a9699e

Browse files
authored
processor/ratelimitprocessor: add retry_delay to surface gRPC RetryInfo (#844)
Introduces a new `retry_delay` `1s default` top level key to communicate a suggested client backoff via gRPC `errdetails.RetryInfo` when the request is throttled. This allows upstream components to retry the request. See more details in https://opentelemetry.io/docs/specs/otlp/#otlpgrpc-throttling. --------- Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 1a728ab commit 8a9699e

File tree

6 files changed

+53
-9
lines changed

6 files changed

+53
-9
lines changed

processor/ratelimitprocessor/README.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ a in-memory rate limiter, or makes use of a [gubernator](https://github.com/gube
1515
| `burst` | Maximum number of tokens that can be consumed. | Yes | |
1616
| `throttle_behavior` | Processor behavior for when the rate limit is exceeded. Options are `error`, return an error immediately on throttle and does not send the event, and `delay`, delay the sending until it is no longer throttled. | Yes | `error` |
1717
| `throttle_interval` | Time interval for throttling. It has effects only when `type` is `gubernator`. | No | `1s` |
18+
| `retry_delay` | Suggested client retry delay included via gRPC `RetryInfo` when throttled. | No | `1s` |
1819
| `type` | Type of rate limiter. Options are `local` or `gubernator`. | No | `local` |
1920
| `overrides` | Allows customizing rate limiting parameters for specific metadata key-value pairs. Use this to apply different rate limits to different tenants, projects, or other entities identified by metadata. Each override is keyed by a metadata value and can specify custom `rate`, `burst`, and `throttle_interval` settings that take precedence over the global configuration for matching requests. | No | |
2021
| `dynamic_limits` | Holds the dynamic rate limiting configuration. This is only applicable when the rate limiter type is `gubernator`. | No | |
2122
| `classes` | Named rate limit class definitions for class-based dynamic rate limiting. Only applicable when the rate limiter type is `gubernator`. | No | |
2223
| `default_class` | Default class name to use when resolver returns unknown/empty class. Must exist in classes when set. Only applicable when the rate limiter type is `gubernator`. | No | |
24+
| `class_resolver` | Extension ID used to resolve a class name for a given unique key. Only applicable when the rate limiter type is `gubernator`. | No | |
2325

2426
### Overrides
2527

@@ -281,13 +283,13 @@ Behavior and notes:
281283

282284
* The resolver is optional. If no `class_resolver` is configured the processor skips class resolution and falls back to the top-level `rate`/`burst` values.
283285

284-
* The processor initializes the resolver during Start. If the extension is not found the processor logs a warning and proceeds without resolution.
286+
* The processor initializes the resolver during Start. If a `class_resolver` is configured but the extension is not found or fails to start, the processor fails to start with an error.
285287

286288
* When the resolver returns an unknown or empty class name, the processor treats it as "unknown" and uses the configured `default_class` (if set) or falls back to the top-level rate/burst.
287289

288290
Caching and performance:
289291

290-
* Implementations of resolver extensions should be mindful of latency; the processor assumes the resolver is reasonably fast. The processor will fall back when resolver errors occur or if the extension is absent.
292+
* Implementations of resolver extensions should be mindful of latency; the processor assumes the resolver is reasonably fast. If the resolver returns an error at runtime the processor logs a warning and falls back to default/class precedence for that request.
291293

292294
* Ideally, implementations of the `class_resolver` implement their own caching to guarantee performance if they require on an external source.
293295

@@ -298,12 +300,16 @@ Telemetry and metrics:
298300
* `rate_source`: one of `static`, `dynamic`, `fallback`, or `degraded` (indicates whether dynamic calculation was used or not)
299301
* `class`: resolved class name when applicable
300302
* `source_kind`: which precedence path was used (`override`, `class`, or `fallback`)
301-
* `result`: one of `gubernator_error`, `success` or `skipped`.
303+
* `reason`: for dynamic escalations, one of `gubernator_error`, `success`, or `skipped`
302304

303-
* Counters introduced to observe resolver and dynamic behavior include:
305+
* Metrics exposed by the processor include:
304306

305-
* `ratelimit.resolver.failures` — total number of resolver failures
306-
* `ratelimit.dynamic.escalations` — number of times dynamic rate was peeked (attributes: `class`, `source_kind`, `success`)
307+
* `otelcol_ratelimit.requests` — total number of rate limiting requests
308+
* `otelcol_ratelimit.request_duration` — histogram of request processing duration (seconds)
309+
* `otelcol_ratelimit.request_size` — histogram of bytes per request (only when strategy is `bytes`)
310+
* `otelcol_ratelimit.concurrent_requests` — current number of in-flight requests
311+
* `otelcol_ratelimit.resolver.failures` — total number of class resolver failures
312+
* `otelcol_ratelimit.dynamic.escalations` — count of dynamic rate decisions (attributes: `class`, `source_kind`, `reason`)
307313

308314
### Throttling rate based on custom logic using `window_configurator`
309315

processor/ratelimitprocessor/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ type RateLimitSettings struct {
152152
// Defaults to 1s
153153
ThrottleInterval time.Duration `mapstructure:"throttle_interval"`
154154

155+
// RetryDelay holds the time delay to return to the client through RPC
156+
// errdetails.RetryInfo. See more details of this in the documentation.
157+
// https://opentelemetry.io/docs/specs/otlp/#otlpgrpc-throttling.
158+
//
159+
// Defaults to 1s
160+
RetryDelay time.Duration `mapstructure:"retry_delay"`
161+
155162
disableDynamic bool `mapstructure:"-"`
156163
}
157164

@@ -215,6 +222,9 @@ const (
215222
// DefaultThrottleInterval is the default value for the
216223
// throttle interval.
217224
DefaultThrottleInterval time.Duration = 1 * time.Second
225+
226+
// DefaultRetryDelay is the default value for the retry delay.
227+
DefaultRetryDelay time.Duration = 1 * time.Second
218228
)
219229

220230
// ThrottleBehavior identifies the behavior when rate limit is exceeded.
@@ -249,6 +259,7 @@ func createDefaultConfig() component.Config {
249259
Strategy: StrategyRateLimitRequests,
250260
ThrottleBehavior: ThrottleBehaviorError,
251261
ThrottleInterval: DefaultThrottleInterval,
262+
RetryDelay: DefaultRetryDelay,
252263
},
253264
DynamicRateLimiting: DynamicRateLimiting{
254265
DefaultWindowMultiplier: 1.3,

processor/ratelimitprocessor/config_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func TestLoadConfig(t *testing.T) {
5555
Strategy: StrategyRateLimitRequests,
5656
ThrottleBehavior: ThrottleBehaviorError,
5757
ThrottleInterval: 1 * time.Second,
58+
RetryDelay: 1 * time.Second,
5859
},
5960
DynamicRateLimiting: defaultDynamicRateLimiting,
6061
},
@@ -69,6 +70,7 @@ func TestLoadConfig(t *testing.T) {
6970
Strategy: StrategyRateLimitBytes,
7071
ThrottleBehavior: ThrottleBehaviorError,
7172
ThrottleInterval: 1 * time.Second,
73+
RetryDelay: 1 * time.Second,
7274
},
7375
DynamicRateLimiting: defaultDynamicRateLimiting,
7476
},
@@ -83,6 +85,7 @@ func TestLoadConfig(t *testing.T) {
8385
Strategy: StrategyRateLimitRequests,
8486
ThrottleBehavior: ThrottleBehaviorError,
8587
ThrottleInterval: 1 * time.Second,
88+
RetryDelay: 1 * time.Second,
8689
},
8790
DynamicRateLimiting: defaultDynamicRateLimiting,
8891
},
@@ -97,6 +100,7 @@ func TestLoadConfig(t *testing.T) {
97100
Strategy: StrategyRateLimitRequests,
98101
ThrottleBehavior: ThrottleBehaviorError,
99102
ThrottleInterval: 1 * time.Second,
103+
RetryDelay: 1 * time.Second,
100104
},
101105
DynamicRateLimiting: defaultDynamicRateLimiting,
102106
MetadataKeys: []string{"project_id"},
@@ -112,6 +116,7 @@ func TestLoadConfig(t *testing.T) {
112116
Strategy: StrategyRateLimitBytes,
113117
ThrottleBehavior: ThrottleBehaviorError,
114118
ThrottleInterval: 1 * time.Second,
119+
RetryDelay: 1 * time.Second,
115120
},
116121
DynamicRateLimiting: defaultDynamicRateLimiting,
117122
Overrides: map[string]RateLimitOverrides{
@@ -132,6 +137,7 @@ func TestLoadConfig(t *testing.T) {
132137
Strategy: StrategyRateLimitBytes,
133138
ThrottleBehavior: ThrottleBehaviorError,
134139
ThrottleInterval: 1 * time.Second,
140+
RetryDelay: 1 * time.Second,
135141
},
136142
DynamicRateLimiting: defaultDynamicRateLimiting,
137143
Overrides: map[string]RateLimitOverrides{
@@ -151,6 +157,7 @@ func TestLoadConfig(t *testing.T) {
151157
Strategy: StrategyRateLimitBytes,
152158
ThrottleBehavior: ThrottleBehaviorError,
153159
ThrottleInterval: 1 * time.Second,
160+
RetryDelay: 1 * time.Second,
154161
},
155162
DynamicRateLimiting: defaultDynamicRateLimiting,
156163
Overrides: map[string]RateLimitOverrides{
@@ -170,6 +177,7 @@ func TestLoadConfig(t *testing.T) {
170177
Strategy: StrategyRateLimitBytes,
171178
ThrottleBehavior: ThrottleBehaviorError,
172179
ThrottleInterval: 1 * time.Second,
180+
RetryDelay: 1 * time.Second,
173181
},
174182
DynamicRateLimiting: defaultDynamicRateLimiting,
175183
Overrides: map[string]RateLimitOverrides{
@@ -190,6 +198,7 @@ func TestLoadConfig(t *testing.T) {
190198
Strategy: StrategyRateLimitBytes,
191199
ThrottleBehavior: ThrottleBehaviorError,
192200
ThrottleInterval: 1 * time.Second,
201+
RetryDelay: 1 * time.Second,
193202
},
194203
DynamicRateLimiting: defaultDynamicRateLimiting,
195204
Overrides: map[string]RateLimitOverrides{
@@ -209,6 +218,7 @@ func TestLoadConfig(t *testing.T) {
209218
Strategy: StrategyRateLimitBytes,
210219
ThrottleBehavior: ThrottleBehaviorError,
211220
ThrottleInterval: 1 * time.Second,
221+
RetryDelay: 1 * time.Second,
212222
},
213223
DynamicRateLimiting: DynamicRateLimiting{
214224
Enabled: true,

processor/ratelimitprocessor/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ require (
3232
golang.org/x/time v0.11.0
3333
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5
3434
google.golang.org/grpc v1.76.0
35+
google.golang.org/protobuf v1.36.9
3536
)
3637

3738
require (
@@ -132,7 +133,6 @@ require (
132133
golang.org/x/text v0.28.0 // indirect
133134
golang.org/x/tools v0.35.0 // indirect
134135
google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect
135-
google.golang.org/protobuf v1.36.9 // indirect
136136
gopkg.in/inf.v0 v0.9.1 // indirect
137137
gopkg.in/yaml.v2 v2.4.0 // indirect
138138
gopkg.in/yaml.v3 v3.0.1 // indirect

processor/ratelimitprocessor/processor_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync"
2323
"sync/atomic"
2424
"testing"
25+
"time"
2526

2627
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
2728
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadatatest"
@@ -135,6 +136,8 @@ func TestConsume_Logs(t *testing.T) {
135136
Rate: 1,
136137
Burst: 1,
137138
ThrottleBehavior: ThrottleBehaviorError,
139+
RetryDelay: 1 * time.Second,
140+
ThrottleInterval: 1 * time.Second,
138141
},
139142
})
140143
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
@@ -188,6 +191,8 @@ func TestConsume_Metrics(t *testing.T) {
188191
Rate: 1,
189192
Burst: 1,
190193
ThrottleBehavior: ThrottleBehaviorError,
194+
RetryDelay: 1 * time.Second,
195+
ThrottleInterval: 1 * time.Second,
191196
},
192197
})
193198
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
@@ -241,6 +246,8 @@ func TestConsume_Traces(t *testing.T) {
241246
Rate: 1,
242247
Burst: 1,
243248
ThrottleBehavior: ThrottleBehaviorError,
249+
RetryDelay: 1 * time.Second,
250+
ThrottleInterval: 1 * time.Second,
244251
},
245252
})
246253
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
@@ -294,6 +301,8 @@ func TestConsume_Profiles(t *testing.T) {
294301
Rate: 1,
295302
Burst: 1,
296303
ThrottleBehavior: ThrottleBehaviorError,
304+
RetryDelay: 1 * time.Second,
305+
ThrottleInterval: 1 * time.Second,
297306
},
298307
})
299308
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
@@ -348,6 +357,8 @@ func TestConcurrentRequestsTelemetry(t *testing.T) {
348357
Rate: 10,
349358
Burst: 10,
350359
ThrottleBehavior: ThrottleBehaviorError,
360+
RetryDelay: 1 * time.Second,
361+
ThrottleInterval: 1 * time.Second,
351362
},
352363
})
353364
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
@@ -490,13 +501,16 @@ func testError(t *testing.T, err error) {
490501
assert.Equal(t, codes.ResourceExhausted, st.Code())
491502
assert.Equal(t, "rpc error: code = ResourceExhausted desc = too many requests", st.Err().Error())
492503
details := st.Details()
493-
require.Len(t, details, 1, "expected 1 errorinfo detail")
504+
require.Len(t, details, 2, "expected 2 details")
494505
errorInfo, ok := details[0].(*errdetails.ErrorInfo)
495506
require.True(t, ok, "expected errorinfo detail")
496507
assert.Equal(t, "ingest.elastic.co", errorInfo.Domain)
497508
assert.Equal(t, map[string]string{
498509
"component": "ratelimitprocessor",
499510
"limit": "1",
500-
"throttle_interval": "0s",
511+
"throttle_interval": "1s",
501512
}, errorInfo.Metadata)
513+
retryInfo, ok := details[1].(*errdetails.RetryInfo)
514+
require.True(t, ok, "expected retryinfo detail")
515+
assert.Equal(t, "seconds:1", retryInfo.RetryDelay.String())
502516
}

processor/ratelimitprocessor/ratelimiter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"google.golang.org/genproto/googleapis/rpc/errdetails"
2828
"google.golang.org/grpc/codes"
2929
"google.golang.org/grpc/status"
30+
"google.golang.org/protobuf/types/known/durationpb"
3031

3132
"go.opentelemetry.io/collector/client"
3233
)
@@ -107,6 +108,8 @@ func errorWithDetails(err error, cfg RateLimitSettings) error {
107108
"limit": fmt.Sprintf("%d", cfg.Rate),
108109
"throttle_interval": cfg.ThrottleInterval.String(),
109110
},
111+
}, &errdetails.RetryInfo{
112+
RetryDelay: durationpb.New(cfg.RetryDelay),
110113
}); stErr == nil {
111114
return detailedSt.Err()
112115
}

0 commit comments

Comments
 (0)