Skip to content

Commit 3613de8

Browse files
committed
Incorporate review comments add the middleware interface
1 parent 324029e commit 3613de8

File tree

9 files changed

+149
-180
lines changed

9 files changed

+149
-180
lines changed
Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
change_type: enhancement
22
component: exporterhelper
33
note: >
4-
Add `ConcurrencyController` interface support to the queue sender. This allows
5-
extensions to dynamically control the number of concurrent requests in the
6-
export pipeline.
4+
Add `RequestMiddleware` interface support to the queue sender. This allows
5+
extensions to intercept and manage export requests (e.g., for dynamic concurrency control).
76
issues: [14080]
87
subtext: |
9-
The `ConcurrencyControllerFactory` interface has been added to a new
8+
The `RequestMiddlewareFactory` interface has been added to the
109
`extension/extensioncapabilities` module. The `exporterhelper`'s sending queue
11-
can now be configured with a `concurrency_controller` ID to delegate flow
12-
control logic to an extension.
10+
can now be configured with a `request_middleware` ID to delegate request execution logic to an extension.
1311
change_logs: [user, api]

exporter/exporterhelper/README.md

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@ The following configuration options can be modified:
2121
- `sending_queue`
2222
- `enabled` (default = true)
2323
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
24-
- `wait_for_result` (default = false): determines if incoming requests are blocked until the request is processed or not.
24+
- `wait_for_result` (default = false): Determines if incoming requests are blocked until the request is processed or not.
2525
- `block_on_overflow` (default = false): If true, blocks the request until the queue has space otherwise rejects the data immediately; ignored if `enabled` is `false`
2626
- `sizer` (default = requests): How the queue and batching is measured. Available options:
2727
- `requests`: number of incoming batches of metrics, logs, traces (the most performant option);
2828
- `items`: number of the smallest parts of each signal (spans, metric data points, log records);
2929
- `bytes`: the size of serialized data in bytes (the least performant option).
3030
- `queue_size` (default = 1000): Maximum size the queue can accept. Measured in units defined by `sizer`
3131
- `batch`: see below.
32-
- `concurrency_controller` (default = none): The ID of an extension implementing the `ConcurrencyController` interface (e.g., `adaptive_concurrency`). When configured, this extension dynamically manages the number of concurrent requests sent to the backend based on real-time signals like latency and error rates, providing adaptive backpressure to prevent downstream overload.
32+
- `concurrency_controller` (default = none): The ID of an extension implementing the `RequestMiddlewareFactory` interface (e.g., `adaptive_concurrency`). When configured, exporterhelper executes export requests through the middleware, enabling logic such as adaptive concurrency, rate limiting, or circuit breaking.
33+
3334

3435
#### Sending queue batch settings
3536

@@ -143,62 +144,42 @@ service:
143144
[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage
144145

145146

146-
### Concurrency Controller (Adaptive Request Concurrency)
147-
148-
To use dynamic concurrency control, the following setting needs to be set:
149-
150-
- `sending_queue`
151-
- `concurrency_controller` (default = none): When set, enables adaptive backpressure by using the specified extension to dynamically manage the number of concurrent requests.
152-
153-
#### How it works
154-
155-
Traditionally, exporters use a static `num_consumers` to determine how many concurrent requests can be sent to a backend. However, static limits are difficult to tune:
156-
- **Too high:** You risk overwhelming the downstream backend, leading to increased latency, 429 (Too Many Requests) errors, and "death spirals."
157-
- **Too low:** You underutilize the available network and backend capacity, causing the collector's internal queue to fill up unnecessarily.
158-
159-
The Concurrency Controller implementation (e.g., `adaptive_concurrency`) replaces the fixed worker pool with a dynamic permit system based on the **AIMD (Additive Increase / Multiplicative Decrease)** algorithm.
160-
161-
#### The Control Loop
147+
### Request Middleware (e.g. Adaptive Concurrency)
162148

163-
1. **Acquire:** Before an export attempt begins, the exporter asks the controller for a permit. If the current dynamic limit is reached, the request blocks until a slot becomes available.
164-
2. **Measure:** The controller tracks the **Round Trip Time (RTT)** and the outcome (success or retryable error) of every request.
165-
3. **Adapt:** At regular intervals, the controller compares the recent RTT baseline against current performance:
166-
- **Increase:** If latency is stable and requests are succeeding, the controller increases the concurrency limit to maximize throughput.
167-
- **Decrease:** If latency spikes or the backend returns "backpressure" signals (like HTTP 429 or gRPC `ResourceExhausted`), the controller immediately shrinks the limit to allow the backend to recover.
149+
Traditionally, exporters use a static `num_consumers` to determine how many concurrent requests can be sent to a backend. A Request Middleware implementation allows extensions to replace or augment this behavior with dynamic logic.
168150

169-
This feedback loop ensures the Collector automatically finds the "sweet spot" of maximum throughput without requiring manual tuning as network conditions or backend capacity change.
151+
The middleware wraps the request execution, allowing it to:
152+
1. **Intercept:** Acquire permits or check conditions before the request starts (e.g., rate limiting).
153+
2. **Measure:** Track the duration and outcome of the request (e.g., adaptive concurrency).
154+
3. **Control:** Block or fail requests based on internal logic.
170155

171156
#### Interaction with num_consumers
172157

173-
When a concurrency_controller is configured, it acts as a gatekeeper on top of the existing queue consumers. The effective concurrency is the minimum of the controller's dynamic limit and the static num_consumers.
174-
175-
To ensure the controller has enough headroom to operate, this component enforces a minimum of 200 consumers when a controller is active.
176-
177-
- Automatic Adjustment: If you explicitly set num_consumers to a low value (e.g., 10), it will be automatically increased to 200 to prevent artificial bottlenecks.
158+
When a middleware is configured (via `concurrency_controller`), it acts as a gatekeeper on top of the existing queue consumers. The effective concurrency is the minimum of the middleware's logic and the static `num_consumers`.
178159

179-
- High Concurrency: If you need more than 200 concurrent requests (e.g., num_consumers: 500), your configured value will be respected.
160+
`Effective_Concurrency = min(Middleware_Limit, sending_queue.num_consumers)`
180161

181-
**Recommendation:** generally, you do not need to configure num_consumers when using the controller; the default headroom (200) is sufficient for most use cases. Only increase it if you expect to exceed 200 concurrent requests.
162+
- **Warning:** If you leave `num_consumers` at the default value (10) while using middleware that requires high concurrency (like Adaptive Request Concurrency), the queue sender will log a warning.
163+
- **Recommendation:** Set `num_consumers` high enough to avoid capping the middleware’s maximum intended concurrency (for example, match the middleware’s configured max).
182164

183165
#### Example Configuration
184166

185-
In this example, the OTLP exporter is configured to use the `adaptive_concurrency` extension. The extension will start with a small number of parallel requests and automatically scale up to 100 based on the health of the OTLP endpoint.
167+
In this example, an OTLP exporter is configured to use the `adaptive_concurrency` extension (which implements the Request Middleware interface).
186168

187169
```yaml
188170
exporters:
189171
otlp:
190172
endpoint: https://my-backend:4317
191173
sending_queue:
192174
enabled: true
193-
# Link to the concurrency controller extension defined below
175+
num_consumers: 100 # Provide headroom for the middleware
176+
# Link to the middleware extension
194177
concurrency_controller: adaptive_concurrency/otlp_limit
195178

196179
extensions:
197-
# Define the adaptive concurrency algorithm
180+
# The adaptive_concurrency extension implementation would be defined here
198181
adaptive_concurrency/otlp_limit:
199-
initial_limit: 5 # Start with 5 concurrent requests
200-
max_concurrency: 100 # Never exceed 100 concurrent requests
201-
decrease_ratio: 0.5 # If backpressure is detected, cut concurrency by half
182+
# ... extension specific config ...
202183

203184
service:
204185
extensions: [adaptive_concurrency/otlp_limit]

exporter/exporterhelper/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
go.opentelemetry.io/collector/consumer/consumertest v0.143.0
1818
go.opentelemetry.io/collector/exporter v1.49.0
1919
go.opentelemetry.io/collector/exporter/exportertest v0.143.0
20+
go.opentelemetry.io/collector/extension/extensioncapabilities v1.149.0
2021
go.opentelemetry.io/collector/extension/extensiontest v0.143.0
2122
go.opentelemetry.io/collector/extension/xextension v0.143.0
2223
go.opentelemetry.io/collector/featuregate v1.49.0

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 39 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919
"go.opentelemetry.io/collector/pipeline"
2020
)
2121

22-
const minConsumersWithController = 200
23-
2422
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
2523
// By default:
2624
//
@@ -42,11 +40,11 @@ func NewDefaultQueueConfig() queuebatch.Config {
4240
}
4341
}
4442

45-
// queueSender wraps QueueBatch to add ConcurrencyController logic.
43+
// queueSender wraps QueueBatch to add RequestMiddleware logic.
4644
type queueSender struct {
4745
*queuebatch.QueueBatch
48-
ctrlID *component.ID
49-
ctrl extensioncapabilities.ConcurrencyController
46+
mwID *component.ID
47+
mw extensioncapabilities.RequestMiddleware
5048

5149
// Capture these fields during creation so we can use them in Start()
5250
// because QueueBatch does not expose them publicly.
@@ -60,7 +58,7 @@ func NewQueueSender(
6058
exportFailureMessage string,
6159
next sender.Sender[request.Request],
6260
) (sender.Sender[request.Request], error) {
63-
if qCfg.ConcurrencyControllerID == nil {
61+
if qCfg.RequestMiddlewareID == nil {
6462
exportFunc := func(ctx context.Context, req request.Request) error {
6563
// Have to read the number of items before sending the request since the request can
6664
// be modified by the downstream components like the batcher.
@@ -76,44 +74,32 @@ func NewQueueSender(
7674
return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
7775
}
7876

79-
enforceMinConsumers(&qCfg, qSet.Telemetry.Logger)
77+
warnIfNumConsumersMayCapMiddleware(&qCfg, qSet.Telemetry.Logger)
8078

8179
qs := &queueSender{
82-
ctrlID: qCfg.ConcurrencyControllerID,
80+
mwID: qCfg.RequestMiddlewareID,
8381
id: qSet.ID,
8482
signal: qSet.Signal,
83+
mw: extensioncapabilities.NoopRequestMiddleware(), // Initialize with no-op to avoid nil checks
8584
}
8685

8786
exportFunc := func(ctx context.Context, req request.Request) error {
8887
// Have to read the number of items before sending the request since the request can
8988
// be modified by the downstream components like the batcher.
9089
itemsCount := req.ItemsCount()
9190

92-
ctrl := qs.ctrl
93-
if ctrl == nil {
91+
// Wrap the actual export call in a closure for the middleware
92+
nextCall := func(ctx context.Context) error {
9493
if errSend := next.Send(ctx, req); errSend != nil {
9594
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
9695
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
9796
return errSend
9897
}
9998
return nil
10099
}
101-
// This blocks the worker if the dynamic limit is reached.
102-
if err := ctrl.Acquire(ctx); err != nil {
103-
return err
104-
}
105100

106-
start := time.Now()
107-
var errSend error
108-
defer func() { ctrl.Record(ctx, time.Since(start), errSend) }()
109-
110-
errSend = next.Send(ctx, req)
111-
if errSend != nil {
112-
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
113-
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
114-
return errSend
115-
}
116-
return nil
101+
// Use the middleware to execute the request
102+
return qs.mw.Handle(ctx, nextCall)
117103
}
118104

119105
qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
@@ -125,75 +111,66 @@ func NewQueueSender(
125111
return qs, nil
126112
}
127113

128-
// enforceMinConsumers ensures sufficient worker headroom when a concurrency controller is configured.
129-
func enforceMinConsumers(cfg *queuebatch.Config, logger *zap.Logger) {
130-
if cfg.ConcurrencyControllerID == nil {
114+
// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when a concurrency controller is configured.
115+
func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logger) {
116+
if cfg.RequestMiddlewareID == nil {
131117
return
132118
}
133119

134120
defaultConsumers := NewDefaultQueueConfig().NumConsumers
135121

136-
// If the user left the default worker count, bump it to allow the controller to be the governor.
137-
if cfg.NumConsumers == defaultConsumers && cfg.NumConsumers < minConsumersWithController {
138-
if logger != nil {
139-
logger.Warn("sending_queue.num_consumers overridden because concurrency_controller is configured",
140-
zap.Int("original", cfg.NumConsumers),
141-
zap.Int("override", minConsumersWithController),
142-
zap.String("controller", cfg.ConcurrencyControllerID.String()),
143-
)
144-
}
145-
cfg.NumConsumers = minConsumersWithController
146-
return
147-
}
148-
149-
// If the user explicitly set num_consumers below the recommended headroom, don't override,
150-
// but warn that the controller will be capped by the worker pool.
151-
if cfg.NumConsumers > 0 && cfg.NumConsumers < minConsumersWithController && logger != nil {
152-
logger.Warn("sending_queue.num_consumers caps concurrency_controller; consider increasing num_consumers for headroom",
122+
// If the user left the default worker count, warn that the worker pool might artificially cap the controller.
123+
if cfg.NumConsumers == defaultConsumers && logger != nil {
124+
logger.Warn("sending_queue.num_consumers is at the default; request middleware may be capped by worker pool",
153125
zap.Int("num_consumers", cfg.NumConsumers),
154-
zap.Int("recommended_min", minConsumersWithController),
155-
zap.String("controller", cfg.ConcurrencyControllerID.String()),
126+
zap.String("concurrency_controller", cfg.RequestMiddlewareID.String()),
156127
)
157128
}
158129
}
159130

160131
// Start overrides the default Start to resolve the extension.
161132
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
162-
if qs.ctrlID != nil {
163-
ext, ok := host.GetExtensions()[*qs.ctrlID]
133+
if qs.mwID != nil {
134+
ext, ok := host.GetExtensions()[*qs.mwID]
164135
if !ok {
165-
return fmt.Errorf("concurrency controller extension %q not found", qs.ctrlID.String())
136+
return fmt.Errorf("request middleware extension %q not found", qs.mwID.String())
166137
}
167-
factory, ok := ext.(extensioncapabilities.ConcurrencyControllerFactory)
138+
factory, ok := ext.(extensioncapabilities.RequestMiddlewareFactory)
168139
if !ok {
169-
return fmt.Errorf("extension %q does not implement ConcurrencyControllerFactory", qs.ctrlID.String())
140+
return fmt.Errorf("extension %q does not implement RequestMiddlewareFactory", qs.mwID.String())
170141
}
171-
ctrl, err := factory.CreateConcurrencyController(qs.id, qs.signal)
142+
mw, err := factory.CreateRequestMiddleware(qs.id, qs.signal)
172143
if err != nil {
173-
return fmt.Errorf("failed to create concurrency controller: %w", err)
144+
return fmt.Errorf("failed to create request middleware: %w", err)
145+
}
146+
147+
// Guard against nil return from factory to avoid panic in Handle()
148+
if mw != nil {
149+
qs.mw = mw
174150
}
175-
qs.ctrl = ctrl
176151
}
177152

178153
if err := qs.QueueBatch.Start(ctx, host); err != nil {
179-
if qs.ctrl != nil {
180-
_ = qs.ctrl.Shutdown(ctx)
181-
qs.ctrl = nil
154+
if qs.mw != nil {
155+
_ = qs.mw.Shutdown(ctx)
156+
// Reset to no-op on failure cleanup
157+
qs.mw = extensioncapabilities.NoopRequestMiddleware()
182158
}
183159
return err
184160
}
185161
return nil
186162
}
187163

188-
// Shutdown ensures the controller is also shut down.
164+
// Shutdown ensures the middleware is also shut down.
189165
func (qs *queueSender) Shutdown(ctx context.Context) error {
190166
errQ := qs.QueueBatch.Shutdown(ctx)
191167

192-
if qs.ctrl != nil {
193-
if err := qs.ctrl.Shutdown(ctx); err != nil && errQ == nil {
168+
if qs.mw != nil {
169+
if err := qs.mw.Shutdown(ctx); err != nil && errQ == nil {
194170
return err
195171
}
196-
qs.ctrl = nil
172+
// Reset to no-op after shutdown
173+
qs.mw = extensioncapabilities.NoopRequestMiddleware()
197174
}
198175
return errQ
199176
}

0 commit comments

Comments
 (0)