Skip to content

Commit bac15f8

Browse files
authored
ratelimit: add decision attributes to request size (#876)
Adds decision and reason attributes to the request size metric to alllow breaking down the request size metric by decision and reason. Related to #874 Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent ea82cbc commit bac15f8

File tree

2 files changed

+137
-63
lines changed

2 files changed

+137
-63
lines changed

processor/ratelimitprocessor/processor.go

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -220,25 +220,34 @@ func rateLimit(ctx context.Context,
220220
hits int,
221221
rateLimit func(ctx context.Context, n int) error,
222222
metadataKeys []string,
223-
telemetryBuilder *metadata.TelemetryBuilder,
223+
tb *metadata.TelemetryBuilder,
224224
logger *zap.Logger,
225225
inflight *int64,
226226
) error {
227227
current := atomic.AddInt64(inflight, 1)
228228
attrsCommon := getAttrsFromContext(ctx, metadataKeys)
229-
telemetryBuilder.RatelimitConcurrentRequests.Record(ctx, current, metric.WithAttributes(attrsCommon...))
229+
attrsSet := attribute.NewSet(attrsCommon...)
230+
tb.RatelimitConcurrentRequests.Record(ctx, current,
231+
metric.WithAttributeSet(attrsSet),
232+
)
230233

231234
defer func(start time.Time) {
232235
atomic.AddInt64(inflight, -1)
233-
telemetryBuilder.RatelimitRequestDuration.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes(attrsCommon...))
236+
tb.RatelimitRequestDuration.Record(ctx, time.Since(start).Seconds(),
237+
metric.WithAttributeSet(attrsSet),
238+
)
234239
}(time.Now())
235240

236241
err := rateLimit(ctx, hits)
242+
attrRequests := getTelemetryAttrs(attrsCommon, err)
243+
attrRequestsSet := attribute.NewSet(attrRequests...)
244+
tb.RatelimitRequestSize.Record(ctx, int64(hits),
245+
metric.WithAttributeSet(attrRequestsSet),
246+
)
237247
if err != nil {
238248
// enhance error logging with metadata keys
239-
fields := []zap.Field{
240-
zap.Int("hits", hits),
241-
}
249+
fields := make([]zap.Field, 0, len(attrsCommon)+1)
250+
fields = append(fields, zap.Int("hits", hits))
242251
for _, kv := range attrsCommon {
243252
switch kv.Value.Type() {
244253
case attribute.STRINGSLICE:
@@ -250,26 +259,14 @@ func rateLimit(ctx context.Context,
250259
logger.Error("request is over the limits defined by the rate limiter", append(fields, zap.Error(err))...)
251260
}
252261

253-
attrRequests := getTelemetryAttrs(attrsCommon, err)
254-
telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributes(attrRequests...))
255-
262+
tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet))
256263
return err
257264
}
258265

259-
func recordRequestSize(ctx context.Context, tb *metadata.TelemetryBuilder, strategy Strategy, hits int, metadataKeys []string) {
260-
if tb != nil && strategy == StrategyRateLimitBytes {
261-
attrsCommon := getAttrsFromContext(ctx, metadataKeys)
262-
tb.RatelimitRequestSize.Record(ctx, int64(hits), metric.WithAttributes(attrsCommon...))
263-
}
264-
}
265-
266266
func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
267-
hits := r.count(ld)
268-
recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys)
269-
270267
if err := rateLimit(
271268
ctx,
272-
hits,
269+
r.count(ld),
273270
r.rl.RateLimit,
274271
r.metadataKeys,
275272
r.telemetryBuilder,
@@ -278,17 +275,13 @@ func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs
278275
); err != nil {
279276
return err
280277
}
281-
282278
return r.next(ctx, ld)
283279
}
284280

285281
func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
286-
hits := r.count(md)
287-
recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys)
288-
289282
if err := rateLimit(
290283
ctx,
291-
hits,
284+
r.count(md),
292285
r.rl.RateLimit,
293286
r.metadataKeys,
294287
r.telemetryBuilder,
@@ -297,17 +290,13 @@ func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pme
297290
); err != nil {
298291
return err
299292
}
300-
301293
return r.next(ctx, md)
302294
}
303295

304296
func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
305-
hits := r.count(td)
306-
recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys)
307-
308297
if err := rateLimit(
309298
ctx,
310-
hits,
299+
r.count(td),
311300
r.rl.RateLimit,
312301
r.metadataKeys,
313302
r.telemetryBuilder,
@@ -316,17 +305,13 @@ func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrac
316305
); err != nil {
317306
return err
318307
}
319-
320308
return r.next(ctx, td)
321309
}
322310

323311
func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) error {
324-
hits := r.count(pd)
325-
recordRequestSize(ctx, r.telemetryBuilder, r.strategy, hits, r.metadataKeys)
326-
327312
if err := rateLimit(
328313
ctx,
329-
hits,
314+
r.count(pd),
330315
r.rl.RateLimit,
331316
r.metadataKeys,
332317
r.telemetryBuilder,
@@ -335,14 +320,13 @@ func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd p
335320
); err != nil {
336321
return err
337322
}
338-
339323
return r.next(ctx, pd)
340324
}
341325

342326
func getLogsCountFunc(strategy Strategy) func(ld plog.Logs) int {
343327
switch strategy {
344328
case StrategyRateLimitRequests:
345-
return func(ld plog.Logs) int {
329+
return func(plog.Logs) int {
346330
return 1
347331
}
348332
case StrategyRateLimitRecords:
@@ -355,13 +339,13 @@ func getLogsCountFunc(strategy Strategy) func(ld plog.Logs) int {
355339
return pm.LogsSize(ld)
356340
}
357341
}
358-
return nil
342+
return nil // cannot happen, prevented by config.Validate()
359343
}
360344

361345
func getMetricsCountFunc(strategy Strategy) func(md pmetric.Metrics) int {
362346
switch strategy {
363347
case StrategyRateLimitRequests:
364-
return func(md pmetric.Metrics) int {
348+
return func(pmetric.Metrics) int {
365349
return 1
366350
}
367351
case StrategyRateLimitRecords:
@@ -374,14 +358,13 @@ func getMetricsCountFunc(strategy Strategy) func(md pmetric.Metrics) int {
374358
return pm.MetricsSize(md)
375359
}
376360
}
377-
// cannot happen, prevented by config.Validate()
378-
return nil
361+
return nil // cannot happen, prevented by config.Validate()
379362
}
380363

381364
func getTracesCountFunc(strategy Strategy) func(td ptrace.Traces) int {
382365
switch strategy {
383366
case StrategyRateLimitRequests:
384-
return func(td ptrace.Traces) int {
367+
return func(ptrace.Traces) int {
385368
return 1
386369
}
387370
case StrategyRateLimitRecords:
@@ -394,14 +377,13 @@ func getTracesCountFunc(strategy Strategy) func(td ptrace.Traces) int {
394377
return pm.TracesSize(td)
395378
}
396379
}
397-
// cannot happen, prevented by config.Validate()
398-
return nil
380+
return nil // cannot happen, prevented by config.Validate()
399381
}
400382

401383
func getProfilesCountFunc(strategy Strategy) func(pd pprofile.Profiles) int {
402384
switch strategy {
403385
case StrategyRateLimitRequests:
404-
return func(pd pprofile.Profiles) int {
386+
return func(pprofile.Profiles) int {
405387
return 1
406388
}
407389
case StrategyRateLimitRecords:
@@ -414,6 +396,5 @@ func getProfilesCountFunc(strategy Strategy) func(pd pprofile.Profiles) int {
414396
return pm.ProfilesSize(pd)
415397
}
416398
}
417-
// cannot happen, prevented by config.Validate()
418-
return nil
399+
return nil // cannot happen, prevented by config.Validate()
419400
}

0 commit comments

Comments
 (0)