Skip to content

Commit 7466c9e

Browse files
committed
Incorporate review suggestions
1 parent 33d5428 commit 7466c9e

File tree

12 files changed

+269
-145
lines changed

12 files changed

+269
-145
lines changed

.chloggen/controller-interface.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ note: >
66
issues: [14080]
77
subtext: |
88
The `RequestMiddlewareFactory` interface has been added to the
9-
`extension/extensioncapabilities` module. The `exporterhelper`'s sending queue
10-
can now be configured with a `request_middleware` ID to delegate request execution logic to an extension.
9+
`extension/xextension/extensionmiddleware` package. The `exporterhelper`'s sending queue
10+
can now be configured with a list of `request_middlewares` to delegate request execution logic to extensions.
1111
change_logs: [user, api]

exporter/exporterhelper/README.md

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ The following configuration options can be modified:
2929
- `bytes`: the size of serialized data in bytes (the least performant option).
3030
- `queue_size` (default = 1000): Maximum size the queue can accept. Measured in units defined by `sizer`
3131
- `batch`: see below.
32-
- `concurrency_controller` (default = none): The ID of an extension implementing the `RequestMiddlewareFactory` interface (e.g., `adaptive_concurrency`). When configured, exporterhelper executes export requests through the middleware, enabling logic such as adaptive concurrency, rate limiting, or circuit breaking.
33-
3432

3533
#### Sending queue batch settings
3634

@@ -142,48 +140,3 @@ service:
142140
```
143141

144142
[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage
145-
146-
147-
### Request Middleware (e.g. Adaptive Concurrency)
148-
149-
Traditionally, exporters use a static `num_consumers` to determine how many concurrent requests can be sent to a backend. A Request Middleware implementation allows extensions to replace or augment this behavior with dynamic logic.
150-
151-
The middleware wraps the request execution, allowing it to:
152-
1. **Intercept:** Acquire permits or check conditions before the request starts (e.g., rate limiting).
153-
2. **Measure:** Track the duration and outcome of the request (e.g., adaptive concurrency).
154-
3. **Control:** Block or fail requests based on internal logic.
155-
156-
#### Interaction with num_consumers
157-
158-
When a middleware is configured (via `concurrency_controller`), it acts as a gatekeeper on top of the existing queue consumers. The effective concurrency is the minimum of the middleware's logic and the static `num_consumers`.
159-
160-
`Effective_Concurrency = min(Middleware_Limit, sending_queue.num_consumers)`
161-
162-
- **Warning:** If you leave `num_consumers` at the default value (10) while using middleware that requires high concurrency (like Adaptive Request Concurrency), the queue sender will log a warning.
163-
- **Recommendation:** Set `num_consumers` high enough to avoid capping the middleware’s maximum intended concurrency (for example, match the middleware’s configured max).
164-
165-
#### Example Configuration
166-
167-
In this example, an OTLP exporter is configured to use the `adaptive_concurrency` extension (which implements the Request Middleware interface).
168-
169-
```yaml
170-
exporters:
171-
otlp:
172-
endpoint: https://my-backend:4317
173-
sending_queue:
174-
enabled: true
175-
num_consumers: 100 # Provide headroom for the middleware
176-
# Link to the middleware extension
177-
concurrency_controller: adaptive_concurrency/otlp_limit
178-
179-
extensions:
180-
# The adaptive_concurrency extension implementation would be defined here
181-
adaptive_concurrency/otlp_limit:
182-
# ... extension specific config ...
183-
184-
service:
185-
extensions: [adaptive_concurrency/otlp_limit]
186-
pipelines:
187-
traces:
188-
receivers: [otlp]
189-
exporters: [otlp]

exporter/exporterhelper/go.mod

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ require (
1717
go.opentelemetry.io/collector/consumer/consumertest v0.143.0
1818
go.opentelemetry.io/collector/exporter v1.49.0
1919
go.opentelemetry.io/collector/exporter/exportertest v0.143.0
20-
go.opentelemetry.io/collector/extension/extensioncapabilities v1.143.0
2120
go.opentelemetry.io/collector/extension/extensiontest v0.143.0
2221
go.opentelemetry.io/collector/extension/xextension v0.143.0
2322
go.opentelemetry.io/collector/featuregate v1.49.0
@@ -123,5 +122,3 @@ replace go.opentelemetry.io/collector/config/configoptional => ../../config/conf
123122
replace go.opentelemetry.io/collector/confmap/xconfmap => ../../confmap/xconfmap
124123

125124
replace go.opentelemetry.io/collector/internal/testutil => ../../internal/testutil
126-
127-
replace go.opentelemetry.io/collector/extension/extensioncapabilities => ../../extension/extensioncapabilities

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,17 @@ import (
88
"fmt"
99
"time"
1010

11+
"go.uber.org/multierr"
1112
"go.uber.org/zap"
1213

1314
"go.opentelemetry.io/collector/component"
1415
"go.opentelemetry.io/collector/config/configoptional"
1516
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
18-
"go.opentelemetry.io/collector/extension/extensioncapabilities"
19+
20+
// Import the new package (aliased to avoid confusion with existing extensionmiddleware)
21+
requestmiddleware "go.opentelemetry.io/collector/extension/xextension/extensionmiddleware"
1922
"go.opentelemetry.io/collector/pipeline"
2023
)
2124

@@ -43,8 +46,8 @@ func NewDefaultQueueConfig() queuebatch.Config {
4346
// queueSender wraps QueueBatch to add RequestMiddleware logic.
4447
type queueSender struct {
4548
*queuebatch.QueueBatch
46-
mwID *component.ID
47-
mw extensioncapabilities.RequestMiddleware
49+
mwIDs []component.ID
50+
mws []requestmiddleware.RequestMiddleware
4851

4952
// Capture these fields during creation so we can use them in Start()
5053
// because QueueBatch does not expose them publicly.
@@ -58,7 +61,7 @@ func NewQueueSender(
5861
exportFailureMessage string,
5962
next sender.Sender[request.Request],
6063
) (sender.Sender[request.Request], error) {
61-
if qCfg.RequestMiddlewareID == nil {
64+
if len(qCfg.RequestMiddlewares) == 0 {
6265
exportFunc := func(ctx context.Context, req request.Request) error {
6366
// Have to read the number of items before sending the request since the request can
6467
// be modified by the downstream components like the batcher.
@@ -77,19 +80,19 @@ func NewQueueSender(
7780
warnIfNumConsumersMayCapMiddleware(&qCfg, qSet.Telemetry.Logger)
7881

7982
qs := &queueSender{
80-
mwID: qCfg.RequestMiddlewareID,
83+
mwIDs: qCfg.RequestMiddlewares,
8184
id: qSet.ID,
8285
signal: qSet.Signal,
83-
mw: extensioncapabilities.NoopRequestMiddleware(), // Initialize with no-op to avoid nil checks
86+
mws: make([]requestmiddleware.RequestMiddleware, 0, len(qCfg.RequestMiddlewares)),
8487
}
8588

8689
exportFunc := func(ctx context.Context, req request.Request) error {
8790
// Have to read the number of items before sending the request since the request can
8891
// be modified by the downstream components like the batcher.
8992
itemsCount := req.ItemsCount()
9093

91-
// Wrap the actual export call in a closure for the middleware
92-
nextCall := func(ctx context.Context) error {
94+
// 1. Define the base exporter call
95+
baseSender := func(ctx context.Context) error {
9396
if errSend := next.Send(ctx, req); errSend != nil {
9497
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
9598
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
@@ -98,8 +101,19 @@ func NewQueueSender(
98101
return nil
99102
}
100103

101-
// Use the middleware to execute the request
102-
return qs.mw.Handle(ctx, nextCall)
104+
// 2. Chain middlewares: m1(m2(base))
105+
// We iterate in reverse so that the first middleware in the config is the outer-most wrapper.
106+
nextCall := baseSender
107+
for i := len(qs.mws) - 1; i >= 0; i-- {
108+
mw := qs.mws[i]
109+
currNext := nextCall
110+
nextCall = func(ctx context.Context) error {
111+
return mw.Handle(ctx, currNext)
112+
}
113+
}
114+
115+
// 3. Execute the chain
116+
return nextCall(ctx)
103117
}
104118

105119
qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
@@ -111,9 +125,9 @@ func NewQueueSender(
111125
return qs, nil
112126
}
113127

114-
// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when a concurrency controller is configured.
128+
// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when request middleware is configured.
115129
func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logger) {
116-
if cfg.RequestMiddlewareID == nil {
130+
if len(cfg.RequestMiddlewares) == 0 {
117131
return
118132
}
119133

@@ -123,53 +137,65 @@ func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logg
123137
if cfg.NumConsumers == defaultConsumers && logger != nil {
124138
logger.Warn("sending_queue.num_consumers is at the default; request middleware may be capped by worker pool",
125139
zap.Int("num_consumers", cfg.NumConsumers),
126-
zap.String("concurrency_controller", cfg.RequestMiddlewareID.String()),
140+
zap.Strings("request_middlewares", func() []string {
141+
ids := make([]string, len(cfg.RequestMiddlewares))
142+
for i, id := range cfg.RequestMiddlewares {
143+
ids[i] = id.String()
144+
}
145+
return ids
146+
}()),
127147
)
128148
}
129149
}
130150

131-
// Start overrides the default Start to resolve the extension.
151+
// Start overrides the default Start to resolve the extensions.
132152
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
133-
if qs.mwID != nil {
134-
ext, ok := host.GetExtensions()[*qs.mwID]
153+
for _, id := range qs.mwIDs {
154+
ext, ok := host.GetExtensions()[id]
135155
if !ok {
136-
return fmt.Errorf("request middleware extension %q not found", qs.mwID.String())
156+
// Cleanup any middlewares successfully created before this failure
157+
_ = qs.Shutdown(ctx)
158+
return fmt.Errorf("request middleware extension %q not found", id.String())
137159
}
138-
factory, ok := ext.(extensioncapabilities.RequestMiddlewareFactory)
160+
factory, ok := ext.(requestmiddleware.RequestMiddlewareFactory)
139161
if !ok {
140-
return fmt.Errorf("extension %q does not implement RequestMiddlewareFactory", qs.mwID.String())
162+
// Cleanup any middlewares successfully created before this failure
163+
_ = qs.Shutdown(ctx)
164+
return fmt.Errorf("extension %q does not implement RequestMiddlewareFactory", id.String())
141165
}
142166
mw, err := factory.CreateRequestMiddleware(qs.id, qs.signal)
143167
if err != nil {
144-
return fmt.Errorf("failed to create request middleware: %w", err)
168+
// Cleanup any middlewares successfully created before this failure
169+
_ = qs.Shutdown(ctx)
170+
return fmt.Errorf("failed to create request middleware for %q: %w", id.String(), err)
145171
}
146172

147-
// Guard against nil return from factory to avoid panic in Handle()
148173
if mw != nil {
149-
qs.mw = mw
174+
qs.mws = append(qs.mws, mw)
150175
}
151176
}
152177

153178
if err := qs.QueueBatch.Start(ctx, host); err != nil {
154-
// Ensure middleware is shut down if queue fails to start.
155-
// No need to check for nil because we initialize with Noop.
156-
_ = qs.mw.Shutdown(ctx)
157-
qs.mw = extensioncapabilities.NoopRequestMiddleware()
179+
// Ensure middlewares are shut down if queue fails to start.
180+
_ = qs.Shutdown(ctx)
158181
return err
159182
}
160183
return nil
161184
}
162185

163-
// Shutdown ensures the middleware is also shut down.
186+
// Shutdown ensures the middlewares are also shut down.
164187
func (qs *queueSender) Shutdown(ctx context.Context) error {
165-
errQ := qs.QueueBatch.Shutdown(ctx)
188+
var errs error
189+
if qs.QueueBatch != nil {
190+
errs = multierr.Append(errs, qs.QueueBatch.Shutdown(ctx))
191+
}
166192

167-
// No need to check for nil because we initialize with Noop.
168-
if err := qs.mw.Shutdown(ctx); err != nil && errQ == nil {
169-
return err
193+
// Shutdown middlewares in reverse order (LIFO)
194+
for i := len(qs.mws) - 1; i >= 0; i-- {
195+
errs = multierr.Append(errs, qs.mws[i].Shutdown(ctx))
170196
}
171-
// Reset to no-op after shutdown
172-
qs.mw = extensioncapabilities.NoopRequestMiddleware()
197+
// Clear middlewares after shutdown
198+
qs.mws = nil
173199

174-
return errQ
200+
return errs
175201
}

0 commit comments

Comments
 (0)