Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .chloggen/controller-interface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
change_type: enhancement
component: exporterhelper
note: >
Add `ConcurrencyController` interface support to the queue sender. This allows
extensions to dynamically control the number of concurrent requests in the
export pipeline.
issues: [14080]
subtext: |
The `ConcurrencyControllerFactory` interface has been added to a new
`extension/extensioncapabilities` module. The `exporterhelper`'s sending queue
can now be configured with a `concurrency_controller` ID to delegate flow
control logic to an extension.
change_logs: [user, api]
66 changes: 66 additions & 0 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- `bytes`: the size of serialized data in bytes (the least performant option).
- `queue_size` (default = 1000): Maximum size the queue can accept. Measured in units defined by `sizer`
- `batch`: see below.
- `concurrency_controller` (default = none): The ID of an extension implementing the `ConcurrencyController` interface (e.g., `adaptive_concurrency`). When configured, this extension dynamically manages the number of concurrent requests sent to the backend based on real-time signals like latency and error rates, providing adaptive backpressure to prevent downstream overload.

#### Sending queue batch settings

Expand Down Expand Up @@ -140,3 +141,68 @@
```

[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage


### Concurrency Controller (Adaptive Request Concurrency)

To use dynamic concurrency control, the following setting needs to be set:

- `sending_queue`
- `concurrency_controller` (default = none): When set, enables adaptive backpressure by using the specified extension to dynamically manage the number of concurrent requests.

#### How it works

Traditionally, exporters use a static `num_consumers` to determine how many concurrent requests can be sent to a backend. However, static limits are difficult to tune:
- **Too high:** You risk overwhelming the downstream backend, leading to increased latency, 429 (Too Many Requests) errors, and "death spirals."
- **Too low:** You underutilize the available network and backend capacity, causing the collector's internal queue to fill up unnecessarily.

Check warning on line 157 in exporter/exporterhelper/README.md

View workflow job for this annotation

GitHub Actions / spell-check

Unknown word (underutilize)

The Concurrency Controller implementation (e.g., `adaptive_concurrency`) replaces the fixed worker pool with a dynamic permit system based on the **AIMD (Additive Increase / Multiplicative Decrease)** algorithm.

Check warning on line 159 in exporter/exporterhelper/README.md

View workflow job for this annotation

GitHub Actions / spell-check

Misspelled word (AIMD) Suggestions: (aimed*)

#### The Control Loop

1. **Acquire:** Before an export attempt begins, the exporter asks the controller for a permit. If the current dynamic limit is reached, the request blocks until a slot becomes available.
2. **Measure:** The controller tracks the **Round Trip Time (RTT)** and the outcome (success or retryable error) of every request.
3. **Adapt:** At regular intervals, the controller compares the recent RTT baseline against current performance:
- **Increase:** If latency is stable and requests are succeeding, the controller increases the concurrency limit to maximize throughput.
- **Decrease:** If latency spikes or the backend returns "backpressure" signals (like HTTP 429 or gRPC `ResourceExhausted`), the controller immediately shrinks the limit to allow the backend to recover.

This feedback loop ensures the Collector automatically finds the "sweet spot" of maximum throughput without requiring manual tuning as network conditions or backend capacity change.

#### Interaction with num_consumers

When a concurrency_controller is configured, it acts as a gatekeeper on top of the existing queue consumers. The effective concurrency is the minimum of the controller's dynamic limit and the static num_consumers.

To ensure the controller has enough headroom to operate, this component enforces a minimum of 200 consumers when a controller is active.

- Automatic Adjustment: If you explicitly set num_consumers to a low value (e.g., 10), it will be automatically increased to 200 to prevent artificial bottlenecks.

- High Concurrency: If you need more than 200 concurrent requests (e.g., num_consumers: 500), your configured value will be respected.

**Recommendation:** generally, you do not need to configure num_consumers when using the controller; the default headroom (200) is sufficient for most use cases. Only increase it if you expect to exceed 200 concurrent requests.

#### Example Configuration

In this example, the OTLP exporter is configured to use the `adaptive_concurrency` extension. The extension will start with a small number of parallel requests and automatically scale up to 100 based on the health of the OTLP endpoint.

```yaml
exporters:
otlp:
endpoint: https://my-backend:4317
sending_queue:
enabled: true
# Link to the concurrency controller extension defined below
concurrency_controller: adaptive_concurrency/otlp_limit

extensions:
# Define the adaptive concurrency algorithm
adaptive_concurrency/otlp_limit:
initial_limit: 5 # Start with 5 concurrent requests
max_concurrency: 100 # Never exceed 100 concurrent requests
decrease_ratio: 0.5 # If backpressure is detected, cut concurrency by half

service:
extensions: [adaptive_concurrency/otlp_limit]
pipelines:
traces:
receivers: [otlp]
exporters: [otlp]
2 changes: 2 additions & 0 deletions exporter/exporterhelper/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,5 @@ replace go.opentelemetry.io/collector/config/configoptional => ../../config/conf
replace go.opentelemetry.io/collector/confmap/xconfmap => ../../confmap/xconfmap

replace go.opentelemetry.io/collector/internal/testutil => ../../internal/testutil

replace go.opentelemetry.io/collector/extension/extensioncapabilities => ../../extension/extensioncapabilities
145 changes: 143 additions & 2 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"fmt"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.opentelemetry.io/collector/pipeline"
)

const minConsumersWithController = 200

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
// By default:
//
Expand All @@ -36,23 +42,158 @@ func NewDefaultQueueConfig() queuebatch.Config {
}
}

// queueSender wraps QueueBatch to add ConcurrencyController logic.
type queueSender struct {
*queuebatch.QueueBatch
ctrlID *component.ID
ctrl extensioncapabilities.ConcurrencyController

// Capture these fields during creation so we can use them in Start()
// because QueueBatch does not expose them publicly.
id component.ID
signal pipeline.Signal
}

func NewQueueSender(
qSet queuebatch.AllSettings[request.Request],
qCfg queuebatch.Config,
exportFailureMessage string,
next sender.Sender[request.Request],
) (sender.Sender[request.Request], error) {
if qCfg.ConcurrencyControllerID == nil {
exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
}

enforceMinConsumers(&qCfg, qSet.Telemetry.Logger)

qs := &queueSender{
ctrlID: qCfg.ConcurrencyControllerID,
id: qSet.ID,
signal: qSet.Signal,
}

exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {

ctrl := qs.ctrl
if ctrl == nil {
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}
// This blocks the worker if the dynamic limit is reached.
if err := ctrl.Acquire(ctx); err != nil {
return err
}

start := time.Now()
var errSend error
defer func() { ctrl.Record(ctx, time.Since(start), errSend) }()

errSend = next.Send(ctx, req)
if errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
if err != nil {
return nil, err
}
qs.QueueBatch = qb

return qs, nil
}

// enforceMinConsumers ensures sufficient worker headroom when a concurrency controller is configured.
func enforceMinConsumers(cfg *queuebatch.Config, logger *zap.Logger) {
if cfg.ConcurrencyControllerID == nil {
return
}

defaultConsumers := NewDefaultQueueConfig().NumConsumers

// If the user left the default worker count, bump it to allow the controller to be the governor.
if cfg.NumConsumers == defaultConsumers && cfg.NumConsumers < minConsumersWithController {
if logger != nil {
logger.Warn("sending_queue.num_consumers overridden because concurrency_controller is configured",
zap.Int("original", cfg.NumConsumers),
zap.Int("override", minConsumersWithController),
zap.String("controller", cfg.ConcurrencyControllerID.String()),
)
}
cfg.NumConsumers = minConsumersWithController
return
}

// If the user explicitly set num_consumers below the recommended headroom, don't override,
// but warn that the controller will be capped by the worker pool.
if cfg.NumConsumers > 0 && cfg.NumConsumers < minConsumersWithController && logger != nil {
logger.Warn("sending_queue.num_consumers caps concurrency_controller; consider increasing num_consumers for headroom",
zap.Int("num_consumers", cfg.NumConsumers),
zap.Int("recommended_min", minConsumersWithController),
zap.String("controller", cfg.ConcurrencyControllerID.String()),
)
}
}

// Start overrides the default Start to resolve the extension.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
if qs.ctrlID != nil {
ext, ok := host.GetExtensions()[*qs.ctrlID]
if !ok {
return fmt.Errorf("concurrency controller extension %q not found", qs.ctrlID.String())
}
factory, ok := ext.(extensioncapabilities.ConcurrencyControllerFactory)
if !ok {
return fmt.Errorf("extension %q does not implement ConcurrencyControllerFactory", qs.ctrlID.String())
}
ctrl, err := factory.CreateConcurrencyController(qs.id, qs.signal)
if err != nil {
return fmt.Errorf("failed to create concurrency controller: %w", err)
}
qs.ctrl = ctrl
}

if err := qs.QueueBatch.Start(ctx, host); err != nil {
if qs.ctrl != nil {
_ = qs.ctrl.Shutdown(ctx)
qs.ctrl = nil
}
return err
}
return nil
}

// Shutdown ensures the controller is also shut down.
func (qs *queueSender) Shutdown(ctx context.Context) error {
errQ := qs.QueueBatch.Shutdown(ctx)

if qs.ctrl != nil {
if err := qs.ctrl.Shutdown(ctx); err != nil && errQ == nil {
return err
}
qs.ctrl = nil
}
return errQ
}
Loading
Loading