Skip to content

Commit 5b8392b

Browse files
authored
[ratelimit] Add concurrent_requests and request_duration telemetry metrics (#588)
This PR is to add two more telemetry metrics for rate limit processor: otelcol_ratelimit.concurrent_requests: records the number of requests that are being processed by the rate limiter at any given moment. otelcol_ratelimit.request_duration: Measures the duration of time taken to process each rate limiting request.
1 parent 66a496a commit 5b8392b

File tree

8 files changed

+205
-58
lines changed

8 files changed

+205
-58
lines changed

processor/ratelimitprocessor/documentation.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@
66

77
The following telemetry is emitted by this component.
88

9+
### otelcol_ratelimit.concurrent_requests
10+
11+
Number of in-flight requests at any given time
12+
13+
| Unit | Metric Type | Value Type |
14+
| ---- | ----------- | ---------- |
15+
| {requests} | Gauge | Int |
16+
17+
### otelcol_ratelimit.request_duration
18+
19+
Time(in seconds) taken to process a rate limit request
20+
21+
| Unit | Metric Type | Value Type |
22+
| ---- | ----------- | ---------- |
23+
| {seconds} | Histogram | Double |
24+
925
### otelcol_ratelimit.requests
1026

1127
Number of rate-limiting requests

processor/ratelimitprocessor/factory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,16 @@ func createLogsProcessor(
7575
if err != nil {
7676
return nil, err
7777
}
78+
79+
var inflight int64
7880
return NewLogsRateLimiterProcessor(
7981
rateLimiter,
8082
set.TelemetrySettings,
8183
config.Strategy,
8284
func(ctx context.Context, ld plog.Logs) error {
8385
return nextConsumer.ConsumeLogs(ctx, ld)
8486
},
87+
&inflight,
8588
)
8689
}
8790

@@ -96,13 +99,15 @@ func createMetricsProcessor(
9699
if err != nil {
97100
return nil, err
98101
}
102+
var inflight int64
99103
return NewMetricsRateLimiterProcessor(
100104
rateLimiter,
101105
set.TelemetrySettings,
102106
config.Strategy,
103107
func(ctx context.Context, md pmetric.Metrics) error {
104108
return nextConsumer.ConsumeMetrics(ctx, md)
105109
},
110+
&inflight,
106111
)
107112
}
108113

@@ -117,13 +122,15 @@ func createTracesProcessor(
117122
if err != nil {
118123
return nil, err
119124
}
125+
var inflight int64
120126
return NewTracesRateLimiterProcessor(
121127
rateLimiter,
122128
set.TelemetrySettings,
123129
config.Strategy,
124130
func(ctx context.Context, td ptrace.Traces) error {
125131
return nextConsumer.ConsumeTraces(ctx, td)
126132
},
133+
&inflight,
127134
)
128135
}
129136

@@ -138,12 +145,14 @@ func createProfilesProcessor(
138145
if err != nil {
139146
return nil, err
140147
}
148+
var inflight int64
141149
return NewProfilesRateLimiterProcessor(
142150
rateLimiter,
143151
set.TelemetrySettings,
144152
config.Strategy,
145153
func(ctx context.Context, td pprofile.Profiles) error {
146154
return nextConsumer.ConsumeProfiles(ctx, td)
147155
},
156+
&inflight,
148157
)
149158
}

processor/ratelimitprocessor/internal/metadata/generated_telemetry.go

Lines changed: 19 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/ratelimitprocessor/metadata.yaml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,21 @@ telemetry:
2121
value_type: int
2222
monotonic: true
2323
attributes: ["decision", "reason"]
24-
24+
ratelimit.request_duration:
25+
enabled: true
26+
description: Time(in seconds) taken to process a rate limit request
27+
unit: "{seconds}"
28+
histogram:
29+
value_type: double
30+
monotonic: true
31+
bucket_boundaries: [ 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0 ]
32+
ratelimit.concurrent_requests:
33+
enabled: true
34+
description: Number of in-flight requests at any given time
35+
unit: "{requests}"
36+
gauge:
37+
value_type: int
38+
monotonic: true
2539
attributes:
2640
decision:
2741
description: rate limit decision

processor/ratelimitprocessor/processor.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"sync/atomic"
25+
"time"
2426

25-
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
26-
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/telemetry"
2727
"go.opentelemetry.io/collector/component"
2828
"go.opentelemetry.io/collector/consumer"
2929
"go.opentelemetry.io/collector/pdata/plog"
@@ -34,13 +34,16 @@ import (
3434
"go.opentelemetry.io/otel/metric"
3535

3636
"github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent"
37+
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
38+
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/telemetry"
3739
)
3840

3941
type rateLimiterProcessor struct {
4042
component.Component
4143
rl RateLimiter
4244
metadataKeys []string
4345
telemetryBuilder *metadata.TelemetryBuilder
46+
inflight *int64
4447
}
4548

4649
type LogsRateLimiterProcessor struct {
@@ -72,6 +75,7 @@ func NewLogsRateLimiterProcessor(
7275
telemetrySettings component.TelemetrySettings,
7376
strategy Strategy,
7477
next func(ctx context.Context, logs plog.Logs) error,
78+
inflight *int64,
7579
) (*LogsRateLimiterProcessor, error) {
7680
telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings)
7781
if err != nil {
@@ -83,6 +87,7 @@ func NewLogsRateLimiterProcessor(
8387
Component: rateLimiter,
8488
rl: rateLimiter.Unwrap(),
8589
telemetryBuilder: telemetryBuilder,
90+
inflight: inflight,
8691
},
8792
count: getLogsCountFunc(strategy),
8893
next: next,
@@ -94,6 +99,7 @@ func NewMetricsRateLimiterProcessor(
9499
telemetrySettings component.TelemetrySettings,
95100
strategy Strategy,
96101
next func(ctx context.Context, metrics pmetric.Metrics) error,
102+
inflight *int64, // used to calculate concurrent requests
97103
) (*MetricsRateLimiterProcessor, error) {
98104
telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings)
99105
if err != nil {
@@ -105,6 +111,7 @@ func NewMetricsRateLimiterProcessor(
105111
Component: rateLimiter,
106112
rl: rateLimiter.Unwrap(),
107113
telemetryBuilder: telemetryBuilder,
114+
inflight: inflight,
108115
},
109116
count: getMetricsCountFunc(strategy),
110117
next: next,
@@ -116,6 +123,7 @@ func NewTracesRateLimiterProcessor(
116123
telemetrySettings component.TelemetrySettings,
117124
strategy Strategy,
118125
next func(ctx context.Context, traces ptrace.Traces) error,
126+
inflight *int64,
119127
) (*TracesRateLimiterProcessor, error) {
120128
telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings)
121129
if err != nil {
@@ -127,6 +135,7 @@ func NewTracesRateLimiterProcessor(
127135
Component: rateLimiter,
128136
rl: rateLimiter.Unwrap(),
129137
telemetryBuilder: telemetryBuilder,
138+
inflight: inflight,
130139
},
131140
count: getTracesCountFunc(strategy),
132141
next: next,
@@ -138,6 +147,7 @@ func NewProfilesRateLimiterProcessor(
138147
telemetrySettings component.TelemetrySettings,
139148
strategy Strategy,
140149
next func(ctx context.Context, profiles pprofile.Profiles) error,
150+
inflight *int64,
141151
) (*ProfilesRateLimiterProcessor, error) {
142152
telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings)
143153
if err != nil {
@@ -149,6 +159,7 @@ func NewProfilesRateLimiterProcessor(
149159
Component: rateLimiter,
150160
rl: rateLimiter.Unwrap(),
151161
telemetryBuilder: telemetryBuilder,
162+
inflight: inflight,
152163
},
153164
count: getProfilesCountFunc(strategy),
154165
next: next,
@@ -200,7 +211,16 @@ func rateLimit(
200211
rateLimit func(ctx context.Context, n int) error,
201212
metadataKeys []string,
202213
telemetryBuilder *metadata.TelemetryBuilder,
214+
inflight *int64,
203215
) error {
216+
current := atomic.AddInt64(inflight, 1)
217+
telemetryBuilder.RatelimitConcurrentRequests.Record(ctx, current)
218+
219+
defer func(start time.Time) {
220+
atomic.AddInt64(inflight, -1)
221+
telemetryBuilder.RatelimitRequestDuration.Record(ctx, time.Since(start).Seconds())
222+
}(time.Now())
223+
204224
err := rateLimit(ctx, hits)
205225

206226
attrs := getTelemetryAttrs(ctx, metadataKeys, err)
@@ -218,6 +238,7 @@ func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs
218238
r.rl.RateLimit,
219239
r.metadataKeys,
220240
r.telemetryBuilder,
241+
r.inflight,
221242
); err != nil {
222243
return err
223244
}
@@ -234,6 +255,7 @@ func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pme
234255
r.rl.RateLimit,
235256
r.metadataKeys,
236257
r.telemetryBuilder,
258+
r.inflight,
237259
); err != nil {
238260
return err
239261
}
@@ -250,6 +272,7 @@ func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrac
250272
r.rl.RateLimit,
251273
r.metadataKeys,
252274
r.telemetryBuilder,
275+
r.inflight,
253276
); err != nil {
254277
return err
255278
}
@@ -266,6 +289,7 @@ func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd p
266289
r.rl.RateLimit,
267290
r.metadataKeys,
268291
r.telemetryBuilder,
292+
r.inflight,
269293
); err != nil {
270294
return err
271295
}

0 commit comments

Comments
 (0)