Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .chloggen/controller-interface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
change_type: enhancement
component: exporterhelper
note: >
Add `RequestMiddleware` interface support to the queue sender. This allows
extensions to intercept and manage export requests (e.g., for dynamic concurrency control).
issues: [14080]
subtext: |
The `RequestMiddleware` interface has been added to the `exporter/exporterhelper/xexporterhelper` package.
The `exporterhelper`'s sending queue can now be configured with a list of `request_middlewares` to delegate request execution logic to extensions.
change_logs: [user, api]
2 changes: 1 addition & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `wait_for_result` (default = false): determines if incoming requests are blocked until the request is processed or not.
- `wait_for_result` (default = false): Determines if incoming requests are blocked until the request is processed or not.
- `block_on_overflow` (default = false): If true, blocks the request until the queue has space otherwise rejects the data immediately; ignored if `enabled` is `false`
- `sizer` (default = requests): How the queue and batching is measured. Available options:
- `requests`: number of incoming batches of metrics, logs, traces (the most performant option);
Expand Down
154 changes: 146 additions & 8 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"fmt"
"time"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requestmiddleware"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
"go.opentelemetry.io/collector/pipeline"
)

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
// By default:
//
// - the queue stores 1000 requests of telemetry
// - is non-blocking when full
// - concurrent exports limited to 10
// - emits batches of 8192 items, timeout 200ms
func NewDefaultQueueConfig() queuebatch.Config {
return queuebatch.Config{
Sizer: request.SizerTypeRequests,
Expand All @@ -36,23 +35,162 @@ func NewDefaultQueueConfig() queuebatch.Config {
}
}

// queueSender wraps QueueBatch to add RequestMiddleware logic.
type queueSender struct {
*queuebatch.QueueBatch
mwIDs []component.ID

// next is the sender that the queue consumers will call.
// It is initialized to the downstream sender, but can be wrapped by middlewares in Start.
next sender.Sender[request.Request]

// activeMiddlewares tracks the wrappers created during Start so they can be shutdown.
activeMiddlewares []component.Component

// Cached exporter settings for Start().
id component.ID
signal pipeline.Signal
telemetry component.TelemetrySettings
}

func NewQueueSender(
qSet queuebatch.AllSettings[request.Request],
qCfg queuebatch.Config,
exportFailureMessage string,
next sender.Sender[request.Request],
) (sender.Sender[request.Request], error) {

qs := &queueSender{
mwIDs: qCfg.RequestMiddlewares,
id: qSet.ID,
signal: qSet.Signal,
telemetry: qSet.Telemetry,
next: next,
}

// exportFunc is called by the queue consumers.
// It delegates to qs.next, which allows us to swap qs.next in Start() without recreating the queue.
exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {

if errSend := qs.next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
if err != nil {
return nil, err
}
qs.QueueBatch = qb

if len(qCfg.RequestMiddlewares) > 0 {
warnIfNumConsumersMayCapMiddleware(&qCfg, qSet.Telemetry.Logger)
}

return qs, nil
}

// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when request middleware is configured.
func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logger) {
if len(cfg.RequestMiddlewares) == 0 {
return
}

defaultConsumers := NewDefaultQueueConfig().NumConsumers

// If the user left the default worker count, warn that the worker pool might artificially cap the controller.
if cfg.NumConsumers == defaultConsumers && logger != nil {
logger.Warn("sending_queue.num_consumers is at the default; request middleware may be capped by worker pool",
zap.Int("num_consumers", cfg.NumConsumers),
zap.Strings("request_middlewares", func() []string {
ids := make([]string, len(cfg.RequestMiddlewares))
for i, id := range cfg.RequestMiddlewares {
ids[i] = id.String()
}
return ids
}()),
)
}
}

// Start overrides the default Start to resolve the extensions and wrap the sender.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
mws := make([]requestmiddleware.RequestMiddleware, 0, len(qs.mwIDs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the requestmiddleware code - seems you missed adding it to your commits?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw Apologies, yes, looks like I missed adding the requestmiddleware code. Just added, please check. I also incorporated the other review comments (removed the warning logic and the superfluous tests).


// 1. Resolve all extensions first to ensure they exist and implement the interface
for _, id := range qs.mwIDs {
ext, ok := host.GetExtensions()[id]
if !ok {
return fmt.Errorf("request middleware extension %q not found", id.String())
}
mw, ok := ext.(requestmiddleware.RequestMiddleware)
if !ok {
return fmt.Errorf("extension %q does not implement RequestMiddleware", id.String())
}
mws = append(mws, mw)
}

settings := requestmiddleware.RequestMiddlewareSettings{
ID: qs.id,
Signal: qs.signal,
Telemetry: qs.telemetry,
}

// 2. Wrap the sender.
// Wrap sender in reverse so first-configured middleware is outermost.
wrapped := qs.next
for i := len(mws) - 1; i >= 0; i-- {
var err error
wrapped, err = mws[i].WrapSender(settings, wrapped)
if err != nil {
_ = qs.Shutdown(ctx) // Clean up any previously started middlewares
return fmt.Errorf("failed to wrap sender for %q: %w", qs.mwIDs[i].String(), err)
}
if wrapped == nil {
_ = qs.Shutdown(ctx)
return fmt.Errorf("request middleware %q returned nil sender", qs.mwIDs[i].String())
}

// If the wrapper is a component, start it.
if err := wrapped.Start(ctx, host); err != nil {
_ = qs.Shutdown(ctx)
return err
}
qs.activeMiddlewares = append(qs.activeMiddlewares, wrapped)
}

// Update the next sender to point to the head of the chain
qs.next = wrapped

// 3. Start the queue (which starts the consumers).
// The consumers will use the updated qs.next.
if err := qs.QueueBatch.Start(ctx, host); err != nil {
// Ensure middlewares are shut down if queue fails to start.
_ = qs.Shutdown(ctx)
return err
}
return nil
}

// Shutdown ensures the middlewares are also shut down.
func (qs *queueSender) Shutdown(ctx context.Context) error {
var errs error
if qs.QueueBatch != nil {
errs = multierr.Append(errs, qs.QueueBatch.Shutdown(ctx))
}

// Shutdown active middlewares in reverse order of creation (LIFO).
// Since we appended them as we wrapped (inner to outer), we simply iterate backwards.
for i := len(qs.activeMiddlewares) - 1; i >= 0; i-- {
errs = multierr.Append(errs, qs.activeMiddlewares[i].Shutdown(ctx))
}
qs.activeMiddlewares = nil

return errs
}
Loading