Skip to content

Commit 3ef0f6c

Browse files
[ratelimiter] enhance error logging with additional attributes (#692)
1 parent e7dddb0 commit 3ef0f6c

File tree

4 files changed

+63
-12
lines changed

4 files changed

+63
-12
lines changed

processor/ratelimitprocessor/gubernator.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
142142
},
143143
})
144144
if err != nil {
145-
r.set.Logger.Error("error executing gubernator rate limit request", zap.Error(err))
146-
return errRateLimitInternalError
145+
return err
147146
}
148147

149148
// Inside the gRPC response, we should have a single-item list of responses.
@@ -153,20 +152,13 @@ func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
153152
}
154153
resp := responses[0]
155154
if resp.GetError() != "" {
156-
r.set.Logger.Error("failed to get response from gubernator", zap.Error(errors.New(resp.GetError())))
157-
return errRateLimitInternalError
155+
return errors.New(resp.GetError())
158156
}
159157

160158
if resp.GetStatus() == gubernator.Status_OVER_LIMIT {
161159
// Same logic as local
162160
switch r.cfg.ThrottleBehavior {
163161
case ThrottleBehaviorError:
164-
r.set.Logger.Error(
165-
"request is over the limits defined by the rate limiter",
166-
zap.Error(errTooManyRequests),
167-
zap.String("processor_id", r.set.ID.String()),
168-
zap.Strings("metadata_keys", r.cfg.MetadataKeys),
169-
)
170162
return status.Error(codes.ResourceExhausted, errTooManyRequests.Error())
171163
case ThrottleBehaviorDelay:
172164
delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond

processor/ratelimitprocessor/processor.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"go.opentelemetry.io/collector/pdata/ptrace"
3232
"go.opentelemetry.io/otel/attribute"
3333
"go.opentelemetry.io/otel/metric"
34+
"go.uber.org/zap"
3435
"google.golang.org/grpc/codes"
3536
"google.golang.org/grpc/status"
3637

@@ -44,6 +45,7 @@ type rateLimiterProcessor struct {
4445
rl RateLimiter
4546
metadataKeys []string
4647
telemetryBuilder *metadata.TelemetryBuilder
48+
logger *zap.Logger
4749
inflight *int64
4850
}
4951

@@ -89,6 +91,7 @@ func NewLogsRateLimiterProcessor(
8991
Component: rateLimiter,
9092
rl: rateLimiter.Unwrap(),
9193
telemetryBuilder: telemetryBuilder,
94+
logger: telemetrySettings.Logger,
9295
inflight: inflight,
9396
metadataKeys: metadataKeys,
9497
},
@@ -115,6 +118,7 @@ func NewMetricsRateLimiterProcessor(
115118
Component: rateLimiter,
116119
rl: rateLimiter.Unwrap(),
117120
telemetryBuilder: telemetryBuilder,
121+
logger: telemetrySettings.Logger,
118122
inflight: inflight,
119123
metadataKeys: metadataKeys,
120124
},
@@ -141,6 +145,7 @@ func NewTracesRateLimiterProcessor(
141145
Component: rateLimiter,
142146
rl: rateLimiter.Unwrap(),
143147
telemetryBuilder: telemetryBuilder,
148+
logger: telemetrySettings.Logger,
144149
inflight: inflight,
145150
metadataKeys: metadataKeys,
146151
},
@@ -217,6 +222,7 @@ func rateLimit(ctx context.Context,
217222
rateLimit func(ctx context.Context, n int) error,
218223
metadataKeys []string,
219224
telemetryBuilder *metadata.TelemetryBuilder,
225+
logger *zap.Logger,
220226
inflight *int64,
221227
) error {
222228
current := atomic.AddInt64(inflight, 1)
@@ -229,6 +235,21 @@ func rateLimit(ctx context.Context,
229235
}(time.Now())
230236

231237
err := rateLimit(ctx, hits)
238+
if err != nil {
239+
// enhance error logging with metadata keys
240+
fields := []zap.Field{
241+
zap.Int("hits", hits),
242+
}
243+
for _, kv := range attrsCommon {
244+
switch kv.Value.Type() {
245+
case attribute.STRINGSLICE:
246+
fields = append(fields, zap.Strings(string(kv.Key), kv.Value.AsStringSlice()))
247+
default:
248+
fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString()))
249+
}
250+
}
251+
logger.Error("request is over the limits defined by the rate limiter", append(fields, zap.Error(err))...)
252+
}
232253

233254
attrRequests := getTelemetryAttrs(attrsCommon, err)
234255
telemetryBuilder.RatelimitRequests.Add(ctx, 1, metric.WithAttributes(attrRequests...))
@@ -245,6 +266,7 @@ func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs
245266
r.rl.RateLimit,
246267
r.metadataKeys,
247268
r.telemetryBuilder,
269+
r.logger,
248270
r.inflight,
249271
); err != nil {
250272
return err
@@ -262,6 +284,7 @@ func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pme
262284
r.rl.RateLimit,
263285
r.metadataKeys,
264286
r.telemetryBuilder,
287+
r.logger,
265288
r.inflight,
266289
); err != nil {
267290
return err
@@ -279,6 +302,7 @@ func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrac
279302
r.rl.RateLimit,
280303
r.metadataKeys,
281304
r.telemetryBuilder,
305+
r.logger,
282306
r.inflight,
283307
); err != nil {
284308
return err
@@ -296,6 +320,7 @@ func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd p
296320
r.rl.RateLimit,
297321
r.metadataKeys,
298322
r.telemetryBuilder,
323+
r.logger,
299324
r.inflight,
300325
); err != nil {
301326
return err

processor/ratelimitprocessor/processor_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ import (
3838
"go.opentelemetry.io/otel/attribute"
3939
"go.opentelemetry.io/otel/sdk/metric/metricdata"
4040
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
41+
"go.uber.org/zap"
42+
"go.uber.org/zap/zapcore"
43+
"go.uber.org/zap/zaptest/observer"
4144
)
4245

4346
var (
@@ -123,6 +126,7 @@ func TestGetCountFunc_Profiles(t *testing.T) {
123126
}
124127

125128
func TestConsume_Logs(t *testing.T) {
129+
126130
rateLimiter := newTestLocalRateLimiter(t, &Config{
127131
Type: LocalRateLimiter,
128132
RateLimitSettings: RateLimitSettings{
@@ -134,6 +138,7 @@ func TestConsume_Logs(t *testing.T) {
134138
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
135139
require.NoError(t, err)
136140

141+
observedZapCore, observedLogs := observer.New(zapcore.ErrorLevel)
137142
tt := componenttest.NewTelemetry()
138143
telemetryBuilder, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
139144
require.NoError(t, err)
@@ -142,6 +147,7 @@ func TestConsume_Logs(t *testing.T) {
142147
rl := rateLimiterProcessor{
143148
rl: rateLimiter,
144149
telemetryBuilder: telemetryBuilder,
150+
logger: zap.New(observedZapCore),
145151
inflight: &inflight,
146152
metadataKeys: []string{"x-tenant-id"},
147153
}
@@ -166,6 +172,7 @@ func TestConsume_Logs(t *testing.T) {
166172
assert.False(t, consumed)
167173
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
168174

175+
testRatelimitLogMetadata(t, observedLogs.TakeAll())
169176
testRateLimitTelemetry(t, tt)
170177
}
171178

@@ -181,6 +188,7 @@ func TestConsume_Metrics(t *testing.T) {
181188
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
182189
require.NoError(t, err)
183190

191+
observedZapCore, observedLogs := observer.New(zapcore.ErrorLevel)
184192
tt := componenttest.NewTelemetry()
185193
telemetryBuilder, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
186194
require.NoError(t, err)
@@ -189,6 +197,7 @@ func TestConsume_Metrics(t *testing.T) {
189197
rl := rateLimiterProcessor{
190198
rl: rateLimiter,
191199
telemetryBuilder: telemetryBuilder,
200+
logger: zap.New(observedZapCore),
192201
inflight: &inflight,
193202
metadataKeys: []string{"x-tenant-id"},
194203
}
@@ -213,6 +222,7 @@ func TestConsume_Metrics(t *testing.T) {
213222
assert.False(t, consumed)
214223
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
215224

225+
testRatelimitLogMetadata(t, observedLogs.TakeAll())
216226
testRateLimitTelemetry(t, tt)
217227
}
218228

@@ -228,6 +238,7 @@ func TestConsume_Traces(t *testing.T) {
228238
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
229239
require.NoError(t, err)
230240

241+
observedZapCore, observedLogs := observer.New(zapcore.ErrorLevel)
231242
tt := componenttest.NewTelemetry()
232243
telemetryBuilder, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
233244
require.NoError(t, err)
@@ -236,6 +247,7 @@ func TestConsume_Traces(t *testing.T) {
236247
rl := rateLimiterProcessor{
237248
rl: rateLimiter,
238249
telemetryBuilder: telemetryBuilder,
250+
logger: zap.New(observedZapCore),
239251
inflight: &inflight,
240252
metadataKeys: []string{"x-tenant-id"},
241253
}
@@ -260,6 +272,7 @@ func TestConsume_Traces(t *testing.T) {
260272
assert.False(t, consumed)
261273
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
262274

275+
testRatelimitLogMetadata(t, observedLogs.TakeAll())
263276
testRateLimitTelemetry(t, tt)
264277
}
265278

@@ -275,6 +288,7 @@ func TestConsume_Profiles(t *testing.T) {
275288
err := rateLimiter.Start(context.Background(), componenttest.NewNopHost())
276289
require.NoError(t, err)
277290

291+
observedZapCore, observedLogs := observer.New(zapcore.ErrorLevel)
278292
tt := componenttest.NewTelemetry()
279293
telemetryBuilder, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
280294
require.NoError(t, err)
@@ -284,6 +298,7 @@ func TestConsume_Profiles(t *testing.T) {
284298
rl := rateLimiterProcessor{
285299
rl: rateLimiter,
286300
telemetryBuilder: telemetryBuilder,
301+
logger: zap.New(observedZapCore),
287302
inflight: &inflight,
288303
metadataKeys: []string{"x-tenant-id"},
289304
}
@@ -308,6 +323,7 @@ func TestConsume_Profiles(t *testing.T) {
308323
assert.False(t, consumed)
309324
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = too many requests")
310325

326+
testRatelimitLogMetadata(t, observedLogs.TakeAll())
311327
testRateLimitTelemetry(t, tt)
312328
}
313329

@@ -420,3 +436,22 @@ func testRateLimitTelemetry(t *testing.T, tel *componenttest.Telemetry) {
420436
},
421437
}, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp())
422438
}
439+
440+
func testRatelimitLogMetadata(t *testing.T, logEntries []observer.LoggedEntry) {
441+
require.Len(t, logEntries, 1, "Expected exactly one error log entry")
442+
logEntry := logEntries[0]
443+
assert.Equal(t, zapcore.ErrorLevel, logEntry.Level)
444+
445+
fields := make(map[string]interface{})
446+
for _, field := range logEntry.Context {
447+
switch field.Type {
448+
case zapcore.StringType:
449+
fields[field.Key] = field.String
450+
case zapcore.Int64Type:
451+
fields[field.Key] = field.Integer
452+
}
453+
}
454+
455+
assert.Equal(t, "TestProjectID", fields["x-tenant-id"])
456+
assert.Equal(t, int64(1), fields["hits"])
457+
}

processor/ratelimitprocessor/ratelimiter.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import (
2828
)
2929

3030
var (
31-
errTooManyRequests = errors.New("too many requests")
32-
errRateLimitInternalError = errors.New("rate limiter failed")
31+
errTooManyRequests = errors.New("too many requests")
3332
)
3433

3534
// RateLimiter provides an interface for rate limiting by some number

0 commit comments

Comments
 (0)