Skip to content

Commit 26c4485

Browse files
authored
ratelimitprocessor: use up-down counter (#923)
1 parent 1f41aa1 commit 26c4485

File tree

8 files changed

+45
-82
lines changed

8 files changed

+45
-82
lines changed

processor/ratelimitprocessor/documentation.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ The following telemetry is emitted by this component.
1010

1111
Number of in-flight requests at any given time [Development]
1212

13-
| Unit | Metric Type | Value Type | Stability |
14-
| ---- | ----------- | ---------- | --------- |
15-
| {requests} | Gauge | Int | Development |
13+
| Unit | Metric Type | Value Type | Monotonic | Stability |
14+
| ---- | ----------- | ---------- | --------- | --------- |
15+
| {requests} | Sum | Int | false | Development |
1616

1717
### otelcol_ratelimit.dynamic.escalations
1818

processor/ratelimitprocessor/factory.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ func createLogsProcessor(
8282
if err != nil {
8383
return nil, err
8484
}
85-
var inflight int64
8685
return NewLogsRateLimiterProcessor(
8786
rateLimiter,
8887
set.TelemetrySettings.Logger,
@@ -92,7 +91,6 @@ func createLogsProcessor(
9291
func(ctx context.Context, ld plog.Logs) error {
9392
return nextConsumer.ConsumeLogs(ctx, ld)
9493
},
95-
&inflight,
9694
config.MetadataKeys,
9795
)
9896
}
@@ -112,7 +110,6 @@ func createMetricsProcessor(
112110
if err != nil {
113111
return nil, err
114112
}
115-
var inflight int64
116113
return NewMetricsRateLimiterProcessor(
117114
rateLimiter,
118115
set.TelemetrySettings.Logger,
@@ -122,7 +119,6 @@ func createMetricsProcessor(
122119
func(ctx context.Context, md pmetric.Metrics) error {
123120
return nextConsumer.ConsumeMetrics(ctx, md)
124121
},
125-
&inflight,
126122
config.MetadataKeys,
127123
)
128124
}
@@ -142,7 +138,6 @@ func createTracesProcessor(
142138
if err != nil {
143139
return nil, err
144140
}
145-
var inflight int64
146141
return NewTracesRateLimiterProcessor(
147142
rateLimiter,
148143
set.TelemetrySettings.Logger,
@@ -152,7 +147,6 @@ func createTracesProcessor(
152147
func(ctx context.Context, td ptrace.Traces) error {
153148
return nextConsumer.ConsumeTraces(ctx, td)
154149
},
155-
&inflight,
156150
config.MetadataKeys,
157151
)
158152
}
@@ -172,7 +166,6 @@ func createProfilesProcessor(
172166
if err != nil {
173167
return nil, err
174168
}
175-
var inflight int64
176169
return NewProfilesRateLimiterProcessor(
177170
rateLimiter,
178171
set.TelemetrySettings.Logger,
@@ -182,7 +175,6 @@ func createProfilesProcessor(
182175
func(ctx context.Context, td pprofile.Profiles) error {
183176
return nextConsumer.ConsumeProfiles(ctx, td)
184177
},
185-
&inflight,
186178
config.MetadataKeys,
187179
)
188180
}

processor/ratelimitprocessor/internal/metadata/generated_telemetry.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest.go

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/ratelimitprocessor/internal/metadatatest/generated_telemetrytest_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/ratelimitprocessor/metadata.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ telemetry:
1919
unit: "{requests}"
2020
stability:
2121
level: development
22-
gauge:
22+
sum:
2323
value_type: int
24-
monotonic: true
24+
monotonic: false
2525
ratelimit.dynamic.escalations:
2626
enabled: true
2727
description: Total number of dynamic rate escalations (dynamic > static)

processor/ratelimitprocessor/processor.go

Lines changed: 30 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector
1919

2020
import (
2121
"context"
22-
"sync/atomic"
2322
"time"
2423

2524
"go.opentelemetry.io/collector/component"
@@ -47,7 +46,6 @@ type rateLimiterProcessor struct {
4746
telemetryBuilder *metadata.TelemetryBuilder
4847
tracerProvider trace.TracerProvider
4948
logger *zap.Logger
50-
inflight *int64
5149
strategy Strategy
5250
}
5351

@@ -82,7 +80,6 @@ func NewLogsRateLimiterProcessor(
8280
tracerProvider trace.TracerProvider,
8381
strategy Strategy,
8482
next func(ctx context.Context, logs plog.Logs) error,
85-
inflight *int64,
8683
metadataKeys []string,
8784
) (*LogsRateLimiterProcessor, error) {
8885
return &LogsRateLimiterProcessor{
@@ -92,7 +89,6 @@ func NewLogsRateLimiterProcessor(
9289
telemetryBuilder: telemetryBuilder,
9390
tracerProvider: tracerProvider,
9491
logger: logger,
95-
inflight: inflight,
9692
metadataKeys: metadataKeys,
9793
strategy: strategy,
9894
},
@@ -108,7 +104,6 @@ func NewMetricsRateLimiterProcessor(
108104
tracerProvider trace.TracerProvider,
109105
strategy Strategy,
110106
next func(ctx context.Context, metrics pmetric.Metrics) error,
111-
inflight *int64, // used to calculate concurrent requests
112107
metadataKeys []string,
113108
) (*MetricsRateLimiterProcessor, error) {
114109
return &MetricsRateLimiterProcessor{
@@ -118,7 +113,6 @@ func NewMetricsRateLimiterProcessor(
118113
telemetryBuilder: telemetryBuilder,
119114
tracerProvider: tracerProvider,
120115
logger: logger,
121-
inflight: inflight,
122116
metadataKeys: metadataKeys,
123117
strategy: strategy,
124118
},
@@ -134,7 +128,6 @@ func NewTracesRateLimiterProcessor(
134128
tracerProvider trace.TracerProvider,
135129
strategy Strategy,
136130
next func(ctx context.Context, traces ptrace.Traces) error,
137-
inflight *int64,
138131
metadataKeys []string,
139132
) (*TracesRateLimiterProcessor, error) {
140133
return &TracesRateLimiterProcessor{
@@ -144,7 +137,6 @@ func NewTracesRateLimiterProcessor(
144137
telemetryBuilder: telemetryBuilder,
145138
tracerProvider: tracerProvider,
146139
logger: logger,
147-
inflight: inflight,
148140
metadataKeys: metadataKeys,
149141
strategy: strategy,
150142
},
@@ -160,7 +152,6 @@ func NewProfilesRateLimiterProcessor(
160152
tracerProvider trace.TracerProvider,
161153
strategy Strategy,
162154
next func(ctx context.Context, profiles pprofile.Profiles) error,
163-
inflight *int64,
164155
metadataKeys []string,
165156
) (*ProfilesRateLimiterProcessor, error) {
166157
return &ProfilesRateLimiterProcessor{
@@ -170,7 +161,6 @@ func NewProfilesRateLimiterProcessor(
170161
telemetryBuilder: telemetryBuilder,
171162
tracerProvider: tracerProvider,
172163
logger: logger,
173-
inflight: inflight,
174164
metadataKeys: metadataKeys,
175165
strategy: strategy,
176166
},
@@ -216,34 +206,31 @@ func getTelemetryAttrs(attrsCommon []attribute.KeyValue, err error) (attrs []att
216206
return attrs
217207
}
218208

219-
func rateLimit(ctx context.Context,
209+
func withRateLimit[T any](ctx context.Context,
220210
hits int,
221211
rateLimit func(ctx context.Context, n int) error,
222212
metadataKeys []string,
223213
tb *metadata.TelemetryBuilder,
224214
logger *zap.Logger,
225-
inflight *int64,
215+
next func(ctx context.Context, data T) error,
216+
data T,
226217
) error {
227-
current := atomic.AddInt64(inflight, 1)
228218
attrsCommon := getAttrsFromContext(ctx, metadataKeys)
229219
attrsSet := attribute.NewSet(attrsCommon...)
230-
tb.RatelimitConcurrentRequests.Record(ctx, current,
220+
tb.RatelimitConcurrentRequests.Add(ctx, 1, metric.WithAttributeSet(attrsSet))
221+
defer tb.RatelimitConcurrentRequests.Add(ctx, -1, metric.WithAttributeSet(attrsSet))
222+
223+
start := time.Now()
224+
err := rateLimit(ctx, hits)
225+
tb.RatelimitRequestDuration.Record(ctx,
226+
time.Since(start).Seconds(),
231227
metric.WithAttributeSet(attrsSet),
232228
)
233229

234-
defer func(start time.Time) {
235-
atomic.AddInt64(inflight, -1)
236-
tb.RatelimitRequestDuration.Record(ctx, time.Since(start).Seconds(),
237-
metric.WithAttributeSet(attrsSet),
238-
)
239-
}(time.Now())
240-
241-
err := rateLimit(ctx, hits)
242230
attrRequests := getTelemetryAttrs(attrsCommon, err)
243231
attrRequestsSet := attribute.NewSet(attrRequests...)
244-
tb.RatelimitRequestSize.Record(ctx, int64(hits),
245-
metric.WithAttributeSet(attrRequestsSet),
246-
)
232+
tb.RatelimitRequestSize.Record(ctx, int64(hits), metric.WithAttributeSet(attrRequestsSet))
233+
tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet))
247234
if err != nil {
248235
// enhance error logging with metadata keys
249236
fields := make([]zap.Field, 0, len(attrsCommon)+1)
@@ -256,71 +243,61 @@ func rateLimit(ctx context.Context,
256243
fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString()))
257244
}
258245
}
259-
logger.Error("request is over the limits defined by the rate limiter", append(fields, zap.Error(err))...)
246+
logger.Error(
247+
"request is over the limits defined by the rate limiter",
248+
append(fields, zap.Error(err))...,
249+
)
250+
return err
260251
}
261-
262-
tb.RatelimitRequests.Add(ctx, 1, metric.WithAttributeSet(attrRequestsSet))
263-
return err
252+
return next(ctx, data)
264253
}
265254

266255
func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
267-
if err := rateLimit(
256+
return withRateLimit(
268257
ctx,
269258
r.count(ld),
270259
r.rl.RateLimit,
271260
r.metadataKeys,
272261
r.telemetryBuilder,
273262
r.logger,
274-
r.inflight,
275-
); err != nil {
276-
return err
277-
}
278-
return r.next(ctx, ld)
263+
r.next, ld,
264+
)
279265
}
280266

281267
func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
282-
if err := rateLimit(
268+
return withRateLimit(
283269
ctx,
284270
r.count(md),
285271
r.rl.RateLimit,
286272
r.metadataKeys,
287273
r.telemetryBuilder,
288274
r.logger,
289-
r.inflight,
290-
); err != nil {
291-
return err
292-
}
293-
return r.next(ctx, md)
275+
r.next, md,
276+
)
294277
}
295278

296279
func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
297-
if err := rateLimit(
280+
return withRateLimit(
298281
ctx,
299282
r.count(td),
300283
r.rl.RateLimit,
301284
r.metadataKeys,
302285
r.telemetryBuilder,
303286
r.logger,
304-
r.inflight,
305-
); err != nil {
306-
return err
307-
}
308-
return r.next(ctx, td)
287+
r.next, td,
288+
)
309289
}
310290

311291
func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) error {
312-
if err := rateLimit(
292+
return withRateLimit(
313293
ctx,
314294
r.count(pd),
315295
r.rl.RateLimit,
316296
r.metadataKeys,
317297
r.telemetryBuilder,
318298
r.logger,
319-
r.inflight,
320-
); err != nil {
321-
return err
322-
}
323-
return r.next(ctx, pd)
299+
r.next, pd,
300+
)
324301
}
325302

326303
func getLogsCountFunc(strategy Strategy) func(ld plog.Logs) int {

0 commit comments

Comments
 (0)