Skip to content

Commit 8e50e97

Browse files
authored
[processor/ratelimit] Use trace provider in gubernator grpc client (#848)
Use the component's trace provider in gubernator grpc client
1 parent 215dff7 commit 8e50e97

File tree

4 files changed

+56
-10
lines changed

4 files changed

+56
-10
lines changed

processor/ratelimitprocessor/factory.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.opentelemetry.io/collector/pdata/ptrace"
3030
"go.opentelemetry.io/collector/processor"
3131
"go.opentelemetry.io/collector/processor/xprocessor"
32+
"go.opentelemetry.io/otel/trace"
3233

3334
"github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent"
3435
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
@@ -56,10 +57,11 @@ func getRateLimiter(
5657
config *Config,
5758
set processor.Settings,
5859
telemetryBuilder *metadata.TelemetryBuilder,
60+
tracerProvider trace.TracerProvider,
5961
) (*sharedcomponent.Component[rateLimiterComponent], error) {
6062
return rateLimiters.LoadOrStore(config, func() (rateLimiterComponent, error) {
6163
if config.Type == GubernatorRateLimiter {
62-
return newGubernatorRateLimiter(config, set.Logger, telemetryBuilder)
64+
return newGubernatorRateLimiter(config, set.Logger, telemetryBuilder, tracerProvider)
6365
}
6466
return newLocalRateLimiter(config, set)
6567
})
@@ -76,7 +78,7 @@ func createLogsProcessor(
7678
if err != nil {
7779
return nil, err
7880
}
79-
rateLimiter, err := getRateLimiter(config, set, tb)
81+
rateLimiter, err := getRateLimiter(config, set, tb, set.TracerProvider)
8082
if err != nil {
8183
return nil, err
8284
}
@@ -85,6 +87,7 @@ func createLogsProcessor(
8587
rateLimiter,
8688
set.TelemetrySettings.Logger,
8789
tb,
90+
set.TracerProvider,
8891
config.Strategy,
8992
func(ctx context.Context, ld plog.Logs) error {
9093
return nextConsumer.ConsumeLogs(ctx, ld)
@@ -105,7 +108,7 @@ func createMetricsProcessor(
105108
if err != nil {
106109
return nil, err
107110
}
108-
rateLimiter, err := getRateLimiter(config, set, tb)
111+
rateLimiter, err := getRateLimiter(config, set, tb, set.TracerProvider)
109112
if err != nil {
110113
return nil, err
111114
}
@@ -114,6 +117,7 @@ func createMetricsProcessor(
114117
rateLimiter,
115118
set.TelemetrySettings.Logger,
116119
tb,
120+
set.TracerProvider,
117121
config.Strategy,
118122
func(ctx context.Context, md pmetric.Metrics) error {
119123
return nextConsumer.ConsumeMetrics(ctx, md)
@@ -134,7 +138,7 @@ func createTracesProcessor(
134138
if err != nil {
135139
return nil, err
136140
}
137-
rateLimiter, err := getRateLimiter(config, set, tb)
141+
rateLimiter, err := getRateLimiter(config, set, tb, set.TracerProvider)
138142
if err != nil {
139143
return nil, err
140144
}
@@ -143,6 +147,7 @@ func createTracesProcessor(
143147
rateLimiter,
144148
set.TelemetrySettings.Logger,
145149
tb,
150+
set.TracerProvider,
146151
config.Strategy,
147152
func(ctx context.Context, td ptrace.Traces) error {
148153
return nextConsumer.ConsumeTraces(ctx, td)
@@ -163,7 +168,7 @@ func createProfilesProcessor(
163168
if err != nil {
164169
return nil, err
165170
}
166-
rateLimiter, err := getRateLimiter(config, set, tb)
171+
rateLimiter, err := getRateLimiter(config, set, tb, set.TracerProvider)
167172
if err != nil {
168173
return nil, err
169174
}
@@ -172,6 +177,7 @@ func createProfilesProcessor(
172177
rateLimiter,
173178
set.TelemetrySettings.Logger,
174179
tb,
180+
set.TracerProvider,
175181
config.Strategy,
176182
func(ctx context.Context, td pprofile.Profiles) error {
177183
return nextConsumer.ConsumeProfiles(ctx, td)

processor/ratelimitprocessor/gubernator.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/uptrace/opentelemetry-go-extra/otellogrus"
28+
"go.opentelemetry.io/otel/trace"
2829

2930
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3031
"go.opentelemetry.io/otel/attribute"
@@ -87,6 +88,7 @@ type gubernatorRateLimiter struct {
8788
classResolver ClassResolver
8889
windowConfigurator WindowConfigurator
8990
telemetryBuilder *metadata.TelemetryBuilder
91+
tracerProvider trace.TracerProvider
9092
}
9193

9294
func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, error) {
@@ -113,7 +115,7 @@ func newGubernatorDaemonConfig(logger *zap.Logger) (gubernator.DaemonConfig, err
113115
return conf, nil
114116
}
115117

116-
func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder *metadata.TelemetryBuilder) (*gubernatorRateLimiter, error) {
118+
func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder *metadata.TelemetryBuilder, tracerProvider trace.TracerProvider) (*gubernatorRateLimiter, error) {
117119
daemonCfg, err := newGubernatorDaemonConfig(logger)
118120
if err != nil {
119121
return nil, fmt.Errorf("failed to create gubernator daemon config: %w", err)
@@ -125,6 +127,7 @@ func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder
125127
behavior: gubernator.Behavior_BATCHING,
126128
daemonCfg: daemonCfg,
127129
telemetryBuilder: telemetryBuilder,
130+
tracerProvider: tracerProvider,
128131
classResolver: noopResolver{},
129132
windowConfigurator: defaultWindowConfigurator{multiplier: cfg.DynamicRateLimiting.DefaultWindowMultiplier},
130133
}, nil
@@ -160,7 +163,7 @@ func (r *gubernatorRateLimiter) Start(ctx context.Context, host component.Host)
160163

161164
r.clientConn, err = grpc.NewClient(r.daemonCfg.GRPCListenAddress,
162165
grpc.WithTransportCredentials(insecure.NewCredentials()),
163-
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
166+
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(r.tracerProvider))),
164167
)
165168
if err != nil {
166169
return fmt.Errorf("failed to create gRPC client connection: %w", err)

processor/ratelimitprocessor/gubernator_test.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,50 @@ import (
2727
"testing"
2828
"time"
2929

30-
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
31-
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadatatest"
3230
"github.com/gubernator-io/gubernator/v2"
3331
"github.com/gubernator-io/gubernator/v2/cluster"
3432
"github.com/stretchr/testify/assert"
3533
"github.com/stretchr/testify/require"
3634
"go.opentelemetry.io/collector/client"
3735
"go.opentelemetry.io/collector/component"
3836
"go.opentelemetry.io/collector/component/componenttest"
37+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3938
"go.opentelemetry.io/otel/attribute"
4039
"go.opentelemetry.io/otel/sdk/metric/metricdata"
4140
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
4241
"go.uber.org/zap/zaptest"
4342
"google.golang.org/grpc"
4443
"google.golang.org/grpc/credentials/insecure"
44+
45+
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata"
46+
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadatatest"
4547
)
4648

4749
func newTestGubernatorRateLimiterMetrics(t *testing.T, cfg *Config) (
4850
*gubernatorRateLimiter, *componenttest.Telemetry,
4951
) {
5052
rl := newTestGubernatorRateLimiter(t, cfg, nil)
5153
tt := componenttest.NewTelemetry()
52-
tb, err := metadata.NewTelemetryBuilder(tt.NewTelemetrySettings())
54+
telSettings := tt.NewTelemetrySettings()
55+
tb, err := metadata.NewTelemetryBuilder(telSettings)
5356
require.NoError(t, err)
5457
rl.telemetryBuilder = tb
58+
rl.tracerProvider = telSettings.TracerProvider
59+
// NOTE(carsonip): It does not test whether rate limiter is instrumenting grpc client correctly in Start
60+
// because we overwrite client and clientConn directly here, instead of calling Start.
61+
// To test Start properly it will require refactoring the tests.
62+
conn, err := grpc.NewClient(
63+
fmt.Sprintf("static:///%s", rl.daemon.PeerInfo.GRPCAddress),
64+
grpc.WithResolvers(gubernator.NewStaticBuilder()),
65+
grpc.WithTransportCredentials(insecure.NewCredentials()),
66+
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telSettings.TracerProvider))),
67+
)
68+
require.NoError(t, err)
69+
client := gubernator.NewV1Client(conn)
70+
rl.clientConn = conn
71+
rl.client = client
5572
t.Cleanup(func() {
73+
_ = conn.Close()
5674
_ = tt.Shutdown(t.Context())
5775
})
5876
return rl, tt
@@ -945,6 +963,9 @@ func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) {
945963
),
946964
},
947965
}, metricdatatest.IgnoreTimestamp())
966+
967+
spans := tt.SpanRecorder.Ended()
968+
assert.Greater(t, len(spans), 0)
948969
})
949970

950971
t.Run("dynamic_escalation_skipped_increments", func(t *testing.T) {
@@ -964,6 +985,9 @@ func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) {
964985
),
965986
},
966987
}, metricdatatest.IgnoreTimestamp())
988+
989+
spans := tt.SpanRecorder.Ended()
990+
assert.Greater(t, len(spans), 0)
967991
})
968992

969993
t.Run("gubernator_degraded_increments", func(t *testing.T) {
@@ -981,6 +1005,9 @@ func TestGubernatorRateLimiter_TelemetryCounters(t *testing.T) {
9811005
),
9821006
},
9831007
}, metricdatatest.IgnoreTimestamp())
1008+
1009+
spans := tt.SpanRecorder.Ended()
1010+
assert.Greater(t, len(spans), 0)
9841011
})
9851012
}
9861013

processor/ratelimitprocessor/processor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"go.opentelemetry.io/collector/pdata/ptrace"
3131
"go.opentelemetry.io/otel/attribute"
3232
"go.opentelemetry.io/otel/metric"
33+
"go.opentelemetry.io/otel/trace"
3334
"go.uber.org/zap"
3435
"google.golang.org/grpc/codes"
3536
"google.golang.org/grpc/status"
@@ -44,6 +45,7 @@ type rateLimiterProcessor struct {
4445
rl RateLimiter
4546
metadataKeys []string
4647
telemetryBuilder *metadata.TelemetryBuilder
48+
tracerProvider trace.TracerProvider
4749
logger *zap.Logger
4850
inflight *int64
4951
strategy Strategy
@@ -77,6 +79,7 @@ func NewLogsRateLimiterProcessor(
7779
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
7880
logger *zap.Logger,
7981
telemetryBuilder *metadata.TelemetryBuilder,
82+
tracerProvider trace.TracerProvider,
8083
strategy Strategy,
8184
next func(ctx context.Context, logs plog.Logs) error,
8285
inflight *int64,
@@ -87,6 +90,7 @@ func NewLogsRateLimiterProcessor(
8790
Component: rateLimiter,
8891
rl: rateLimiter.Unwrap(),
8992
telemetryBuilder: telemetryBuilder,
93+
tracerProvider: tracerProvider,
9094
logger: logger,
9195
inflight: inflight,
9296
metadataKeys: metadataKeys,
@@ -101,6 +105,7 @@ func NewMetricsRateLimiterProcessor(
101105
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
102106
logger *zap.Logger,
103107
telemetryBuilder *metadata.TelemetryBuilder,
108+
tracerProvider trace.TracerProvider,
104109
strategy Strategy,
105110
next func(ctx context.Context, metrics pmetric.Metrics) error,
106111
inflight *int64, // used to calculate concurrent requests
@@ -111,6 +116,7 @@ func NewMetricsRateLimiterProcessor(
111116
Component: rateLimiter,
112117
rl: rateLimiter.Unwrap(),
113118
telemetryBuilder: telemetryBuilder,
119+
tracerProvider: tracerProvider,
114120
logger: logger,
115121
inflight: inflight,
116122
metadataKeys: metadataKeys,
@@ -125,6 +131,7 @@ func NewTracesRateLimiterProcessor(
125131
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
126132
logger *zap.Logger,
127133
telemetryBuilder *metadata.TelemetryBuilder,
134+
tracerProvider trace.TracerProvider,
128135
strategy Strategy,
129136
next func(ctx context.Context, traces ptrace.Traces) error,
130137
inflight *int64,
@@ -135,6 +142,7 @@ func NewTracesRateLimiterProcessor(
135142
Component: rateLimiter,
136143
rl: rateLimiter.Unwrap(),
137144
telemetryBuilder: telemetryBuilder,
145+
tracerProvider: tracerProvider,
138146
logger: logger,
139147
inflight: inflight,
140148
metadataKeys: metadataKeys,
@@ -149,6 +157,7 @@ func NewProfilesRateLimiterProcessor(
149157
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
150158
logger *zap.Logger,
151159
telemetryBuilder *metadata.TelemetryBuilder,
160+
tracerProvider trace.TracerProvider,
152161
strategy Strategy,
153162
next func(ctx context.Context, profiles pprofile.Profiles) error,
154163
inflight *int64,
@@ -159,6 +168,7 @@ func NewProfilesRateLimiterProcessor(
159168
Component: rateLimiter,
160169
rl: rateLimiter.Unwrap(),
161170
telemetryBuilder: telemetryBuilder,
171+
tracerProvider: tracerProvider,
162172
logger: logger,
163173
inflight: inflight,
164174
metadataKeys: metadataKeys,

0 commit comments

Comments
 (0)