Skip to content

Commit 205a167

Browse files
committed
Incorporate review comments
1 parent ab598d1 commit 205a167

File tree

3 files changed

+34
-126
lines changed

3 files changed

+34
-126
lines changed

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import (
2121
)
2222

2323
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
24+
// By default:
25+
//
26+
// - the queue stores 1000 requests of telemetry
27+
// - is non-blocking when full
28+
// - concurrent exports limited to 10
29+
// - emits batches of 8192 items, timeout 200ms
2430
func NewDefaultQueueConfig() queuebatch.Config {
2531
return queuebatch.Config{
2632
Sizer: request.SizerTypeRequests,
@@ -89,36 +95,9 @@ func NewQueueSender(
8995
}
9096
qs.QueueBatch = qb
9197

92-
if len(qCfg.RequestMiddlewares) > 0 {
93-
warnIfNumConsumersMayCapMiddleware(&qCfg, qSet.Telemetry.Logger)
94-
}
95-
9698
return qs, nil
9799
}
98100

99-
// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when request middleware is configured.
100-
func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logger) {
101-
if len(cfg.RequestMiddlewares) == 0 {
102-
return
103-
}
104-
105-
defaultConsumers := NewDefaultQueueConfig().NumConsumers
106-
107-
// If the user left the default worker count, warn that the worker pool might artificially cap the controller.
108-
if cfg.NumConsumers == defaultConsumers && logger != nil {
109-
logger.Warn("sending_queue.num_consumers is at the default; request middleware may be capped by worker pool",
110-
zap.Int("num_consumers", cfg.NumConsumers),
111-
zap.Strings("request_middlewares", func() []string {
112-
ids := make([]string, len(cfg.RequestMiddlewares))
113-
for i, id := range cfg.RequestMiddlewares {
114-
ids[i] = id.String()
115-
}
116-
return ids
117-
}()),
118-
)
119-
}
120-
}
121-
122101
// Start overrides the default Start to resolve the extensions and wrap the sender.
123102
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
124103
mws := make([]requestmiddleware.RequestMiddleware, 0, len(qs.mwIDs))

exporter/exporterhelper/internal/queue_sender_test.go

Lines changed: 0 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ func TestQueueConfig_Validate(t *testing.T) {
6262
assert.NoError(t, noCfg.Validate())
6363
}
6464

65-
// --- New Tests for Request Middleware ---
66-
6765
// nopComponent satisfies component.Component but does not implement RequestMiddleware.
6866
type nopComponent struct{}
6967

@@ -299,100 +297,3 @@ func TestQueueSender_Start_Errors_And_Cleanup(t *testing.T) {
299297
})
300298
}
301299
}
302-
303-
func TestNewQueueSender_RequestMiddleware_LogsWarningOnDefaultConsumers(t *testing.T) {
304-
mwID := component.MustNewID("request_middleware")
305-
306-
tests := []struct {
307-
name string
308-
inputConfig queuebatch.Config
309-
wantConsumers int
310-
wantLog bool
311-
}{
312-
{
313-
name: "warn_on_default_consumer_count",
314-
inputConfig: func() queuebatch.Config {
315-
cfg := NewDefaultQueueConfig()
316-
cfg.RequestMiddlewares = []component.ID{mwID}
317-
cfg.NumConsumers = 10
318-
return cfg
319-
}(),
320-
wantConsumers: 10, // Should NOT change
321-
wantLog: true,
322-
},
323-
{
324-
name: "respect_high_consumer_count",
325-
inputConfig: func() queuebatch.Config {
326-
cfg := NewDefaultQueueConfig()
327-
cfg.RequestMiddlewares = []component.ID{mwID}
328-
cfg.NumConsumers = 500
329-
return cfg
330-
}(),
331-
wantConsumers: 500,
332-
wantLog: false,
333-
},
334-
{
335-
name: "ignore_if_middleware_disabled",
336-
inputConfig: func() queuebatch.Config {
337-
cfg := NewDefaultQueueConfig()
338-
cfg.RequestMiddlewares = nil
339-
cfg.NumConsumers = 10
340-
return cfg
341-
}(),
342-
wantConsumers: 10,
343-
wantLog: false,
344-
},
345-
}
346-
347-
for _, tt := range tests {
348-
t.Run(tt.name, func(t *testing.T) {
349-
logger, observed := observer.New(zap.WarnLevel)
350-
cfg := tt.inputConfig
351-
352-
warnIfNumConsumersMayCapMiddleware(&cfg, zap.New(logger))
353-
354-
assert.Equal(t, tt.wantConsumers, cfg.NumConsumers)
355-
if tt.wantLog {
356-
require.Len(t, observed.All(), 1, "Expected a warning log")
357-
assert.Contains(t, observed.All()[0].Message, "sending_queue.num_consumers is at the default")
358-
} else {
359-
assert.Len(t, observed.All(), 0, "Expected no warning logs")
360-
}
361-
})
362-
}
363-
}
364-
365-
func TestQueueSender_RequestMiddleware_Disabled_NoInteraction(t *testing.T) {
366-
// 1. Setup a host with the extension available, but do NOT use it.
367-
mwID := component.MustNewID("request_middleware")
368-
mockMw := &mockRequestMiddleware{}
369-
host := &mockHost{
370-
ext: map[component.ID]component.Component{
371-
mwID: mockMw,
372-
},
373-
}
374-
375-
// 2. do NOT configure the RequestMiddlewares in the queue settings.
376-
qSet := queuebatch.AllSettings[request.Request]{
377-
ID: component.MustNewID("otlp"),
378-
Signal: pipeline.SignalTraces,
379-
Telemetry: componenttest.NewNopTelemetrySettings(),
380-
}
381-
qCfg := NewDefaultQueueConfig()
382-
qCfg.RequestMiddlewares = nil
383-
384-
nextSender := sender.NewSender(func(ctx context.Context, req request.Request) error {
385-
return nil
386-
})
387-
388-
qs, err := NewQueueSender(qSet, qCfg, "", nextSender)
389-
require.NoError(t, err)
390-
391-
// 3. Start and Send data
392-
require.NoError(t, qs.Start(context.Background(), host))
393-
require.NoError(t, qs.Send(context.Background(), &requesttest.FakeRequest{Items: 10}))
394-
require.NoError(t, qs.Shutdown(context.Background()))
395-
396-
// 4. Verification: Assert ABSOLUTELY NO interaction with the mock middleware
397-
assert.Nil(t, mockMw.wrapper, "WrapSender should not be called when middleware is disabled")
398-
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package requestmiddleware // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requestmiddleware"
5+
6+
import (
7+
"go.opentelemetry.io/collector/component"
8+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
9+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
10+
"go.opentelemetry.io/collector/pipeline"
11+
)
12+
13+
// RequestMiddleware enables extensions to intercept or modify exporter requests.
14+
// Implementations can wrap the downstream sender with additional logic (e.g., rate control, adaptive concurrency).
15+
16+
// RequestMiddlewareSettings provides context about the exporter request pipeline.
17+
type RequestMiddlewareSettings struct {
18+
Signal pipeline.Signal
19+
ID component.ID
20+
Telemetry component.TelemetrySettings
21+
}
22+
23+
// RequestMiddleware allows an extension to wrap an exporter's request sender.
24+
type RequestMiddleware interface {
25+
// WrapSender allows the middleware to wrap the next sender in the chain.
26+
// It returns a generic sender that implements the specific request type.
27+
WrapSender(set RequestMiddlewareSettings, next sender.Sender[request.Request]) (sender.Sender[request.Request], error)
28+
}

0 commit comments

Comments
 (0)