Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/controller-interface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
change_type: enhancement
component: exporterhelper
note: >
Add `RequestMiddleware` interface support to the queue sender. This allows
extensions to intercept and manage export requests (e.g., for dynamic concurrency control).
issues: [14080]
subtext: |
The `RequestMiddlewareFactory` interface has been added to the
`extension/extensioncapabilities` module. The `exporterhelper`'s sending queue
can now be configured with a `request_middleware` ID to delegate request execution logic to an extension.
change_logs: [user, api]
49 changes: 48 additions & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `wait_for_result` (default = false): determines if incoming requests are blocked until the request is processed or not.
- `wait_for_result` (default = false): Determines if incoming requests are blocked until the request is processed or not.
- `block_on_overflow` (default = false): If true, blocks the request until the queue has space otherwise rejects the data immediately; ignored if `enabled` is `false`
- `sizer` (default = requests): How the queue and batching is measured. Available options:
- `requests`: number of incoming batches of metrics, logs, traces (the most performant option);
- `items`: number of the smallest parts of each signal (spans, metric data points, log records);
- `bytes`: the size of serialized data in bytes (the least performant option).
- `queue_size` (default = 1000): Maximum size the queue can accept. Measured in units defined by `sizer`
- `batch`: see below.
- `concurrency_controller` (default = none): The ID of an extension implementing the `RequestMiddlewareFactory` interface (e.g., `adaptive_concurrency`). When configured, exporterhelper executes export requests through the middleware, enabling logic such as adaptive concurrency, rate limiting, or circuit breaking.


#### Sending queue batch settings

Expand Down Expand Up @@ -140,3 +142,48 @@ service:
```

[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage


### Request Middleware (e.g. Adaptive Concurrency)

Traditionally, exporters use a static `num_consumers` to determine how many concurrent requests can be sent to a backend. A Request Middleware implementation allows extensions to replace or augment this behavior with dynamic logic.

The middleware wraps the request execution, allowing it to:
1. **Intercept:** Acquire permits or check conditions before the request starts (e.g., rate limiting).
2. **Measure:** Track the duration and outcome of the request (e.g., adaptive concurrency).
3. **Control:** Block or fail requests based on internal logic.

#### Interaction with num_consumers

When a middleware is configured (via `concurrency_controller`), it acts as a gatekeeper on top of the existing queue consumers. The effective concurrency is the minimum of the middleware's logic and the static `num_consumers`.

`Effective_Concurrency = min(Middleware_Limit, sending_queue.num_consumers)`

- **Warning:** If you leave `num_consumers` at the default value (10) while using middleware that requires high concurrency (like Adaptive Request Concurrency), the queue sender will log a warning.
- **Recommendation:** Set `num_consumers` high enough to avoid capping the middleware’s maximum intended concurrency (for example, match the middleware’s configured max).

#### Example Configuration

In this example, an OTLP exporter is configured to use the `adaptive_concurrency` extension (which implements the Request Middleware interface).

```yaml
exporters:
otlp:
endpoint: https://my-backend:4317
sending_queue:
enabled: true
num_consumers: 100 # Provide headroom for the middleware
# Link to the middleware extension
concurrency_controller: adaptive_concurrency/otlp_limit

extensions:
# The adaptive_concurrency extension implementation would be defined here
adaptive_concurrency/otlp_limit:
# ... extension specific config ...

service:
extensions: [adaptive_concurrency/otlp_limit]
pipelines:
traces:
receivers: [otlp]
exporters: [otlp]
3 changes: 3 additions & 0 deletions exporter/exporterhelper/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.opentelemetry.io/collector/consumer/consumertest v0.143.0
go.opentelemetry.io/collector/exporter v1.49.0
go.opentelemetry.io/collector/exporter/exportertest v0.143.0
go.opentelemetry.io/collector/extension/extensioncapabilities v1.143.0
go.opentelemetry.io/collector/extension/extensiontest v0.143.0
go.opentelemetry.io/collector/extension/xextension v0.143.0
go.opentelemetry.io/collector/featuregate v1.49.0
Expand Down Expand Up @@ -122,3 +123,5 @@ replace go.opentelemetry.io/collector/config/configoptional => ../../config/conf
replace go.opentelemetry.io/collector/confmap/xconfmap => ../../confmap/xconfmap

replace go.opentelemetry.io/collector/internal/testutil => ../../internal/testutil

replace go.opentelemetry.io/collector/extension/extensioncapabilities => ../../extension/extensioncapabilities
129 changes: 123 additions & 6 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"fmt"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.opentelemetry.io/collector/pipeline"
)

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
Expand All @@ -36,23 +40,136 @@ func NewDefaultQueueConfig() queuebatch.Config {
}
}

// queueSender wraps QueueBatch to add RequestMiddleware logic.
type queueSender struct {
*queuebatch.QueueBatch
mwID *component.ID
mw extensioncapabilities.RequestMiddleware

// Capture these fields during creation so we can use them in Start()
// because QueueBatch does not expose them publicly.
id component.ID
signal pipeline.Signal
}

func NewQueueSender(
qSet queuebatch.AllSettings[request.Request],
qCfg queuebatch.Config,
exportFailureMessage string,
next sender.Sender[request.Request],
) (sender.Sender[request.Request], error) {
if qCfg.RequestMiddlewareID == nil {
exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
}

warnIfNumConsumersMayCapMiddleware(&qCfg, qSet.Telemetry.Logger)

qs := &queueSender{
mwID: qCfg.RequestMiddlewareID,
id: qSet.ID,
signal: qSet.Signal,
mw: extensioncapabilities.NoopRequestMiddleware(), // Initialize with no-op to avoid nil checks
}

exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend

// Wrap the actual export call in a closure for the middleware
nextCall := func(ctx context.Context) error {
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}
return nil

// Use the middleware to execute the request
return qs.mw.Handle(ctx, nextCall)
}

qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
if err != nil {
return nil, err
}
qs.QueueBatch = qb

return qs, nil
}

// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when a concurrency controller is configured.
func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logger) {
if cfg.RequestMiddlewareID == nil {
return
}

defaultConsumers := NewDefaultQueueConfig().NumConsumers

// If the user left the default worker count, warn that the worker pool might artificially cap the controller.
if cfg.NumConsumers == defaultConsumers && logger != nil {
logger.Warn("sending_queue.num_consumers is at the default; request middleware may be capped by worker pool",
zap.Int("num_consumers", cfg.NumConsumers),
zap.String("concurrency_controller", cfg.RequestMiddlewareID.String()),
)
}
}

// Start overrides the default Start to resolve the extension.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
if qs.mwID != nil {
ext, ok := host.GetExtensions()[*qs.mwID]
if !ok {
return fmt.Errorf("request middleware extension %q not found", qs.mwID.String())
}
factory, ok := ext.(extensioncapabilities.RequestMiddlewareFactory)
if !ok {
return fmt.Errorf("extension %q does not implement RequestMiddlewareFactory", qs.mwID.String())
}
mw, err := factory.CreateRequestMiddleware(qs.id, qs.signal)
if err != nil {
return fmt.Errorf("failed to create request middleware: %w", err)
}

// Guard against nil return from factory to avoid panic in Handle()
if mw != nil {
qs.mw = mw
}
}

if err := qs.QueueBatch.Start(ctx, host); err != nil {
// Ensure middleware is shut down if queue fails to start.
// No need to check for nil because we initialize with Noop.
_ = qs.mw.Shutdown(ctx)
qs.mw = extensioncapabilities.NoopRequestMiddleware()
return err
}
return nil
}

// Shutdown ensures the middleware is also shut down.
func (qs *queueSender) Shutdown(ctx context.Context) error {
errQ := qs.QueueBatch.Shutdown(ctx)

// No need to check for nil because we initialize with Noop.
if err := qs.mw.Shutdown(ctx); err != nil && errQ == nil {
return err
}
// Reset to no-op after shutdown
qs.mw = extensioncapabilities.NoopRequestMiddleware()

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
return errQ
}
Loading