Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/controller-interface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
change_type: enhancement
component: exporterhelper
note: >
Add `RequestMiddleware` interface support to the queue sender. This allows
extensions to intercept and manage export requests (e.g., for dynamic concurrency control).
issues: [14080]
subtext: |
The `RequestMiddlewareFactory` interface has been added to the
`extension/xextension/extensionmiddleware` package. The `exporterhelper`'s sending queue
can now be configured with a list of `request_middlewares` to delegate request execution logic to extensions.
change_logs: [user, api]
2 changes: 1 addition & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `wait_for_result` (default = false): determines if incoming requests are blocked until the request is processed or not.
- `wait_for_result` (default = false): Determines if incoming requests are blocked until the request is processed or not.
- `block_on_overflow` (default = false): If true, blocks the request until the queue has space otherwise rejects the data immediately; ignored if `enabled` is `false`
- `sizer` (default = requests): How the queue and batching is measured. Available options:
- `requests`: number of incoming batches of metrics, logs, traces (the most performant option);
Expand Down
155 changes: 149 additions & 6 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"fmt"
"time"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"

// Import the new package (aliased to avoid confusion with existing extensionmiddleware)
requestmiddleware "go.opentelemetry.io/collector/extension/xextension/extensionmiddleware"
"go.opentelemetry.io/collector/pipeline"
)

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
Expand All @@ -36,23 +43,159 @@ func NewDefaultQueueConfig() queuebatch.Config {
}
}

// queueSender wraps QueueBatch to add RequestMiddleware logic.
type queueSender struct {
*queuebatch.QueueBatch
mwIDs []component.ID
mws []requestmiddleware.RequestMiddleware

// Capture these fields during creation so we can use them in Start()
// because QueueBatch does not expose them publicly.
id component.ID
signal pipeline.Signal
}

func NewQueueSender(
qSet queuebatch.AllSettings[request.Request],
qCfg queuebatch.Config,
exportFailureMessage string,
next sender.Sender[request.Request],
) (sender.Sender[request.Request], error) {
if len(qCfg.RequestMiddlewares) == 0 {
exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
}

warnIfNumConsumersMayCapMiddleware(&qCfg, qSet.Telemetry.Logger)

qs := &queueSender{
mwIDs: qCfg.RequestMiddlewares,
id: qSet.ID,
signal: qSet.Signal,
mws: make([]requestmiddleware.RequestMiddleware, 0, len(qCfg.RequestMiddlewares)),
}

exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend

// 1. Define the base exporter call
baseSender := func(ctx context.Context) error {
if errSend := next.Send(ctx, req); errSend != nil {
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
return errSend
}
return nil
}
return nil

// 2. Chain middlewares: m1(m2(base))
// We iterate in reverse so that the first middleware in the config is the outer-most wrapper.
nextCall := baseSender
for i := len(qs.mws) - 1; i >= 0; i-- {
mw := qs.mws[i]
currNext := nextCall
nextCall = func(ctx context.Context) error {
return mw.Handle(ctx, currNext)
}
}

// 3. Execute the chain
return nextCall(ctx)
}

qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
if err != nil {
return nil, err
}
qs.QueueBatch = qb

return qs, nil
}

// warnIfNumConsumersMayCapMiddleware ensures sufficient worker headroom when request middleware is configured.
func warnIfNumConsumersMayCapMiddleware(cfg *queuebatch.Config, logger *zap.Logger) {
if len(cfg.RequestMiddlewares) == 0 {
return
}

defaultConsumers := NewDefaultQueueConfig().NumConsumers

// If the user left the default worker count, warn that the worker pool might artificially cap the controller.
if cfg.NumConsumers == defaultConsumers && logger != nil {
logger.Warn("sending_queue.num_consumers is at the default; request middleware may be capped by worker pool",
zap.Int("num_consumers", cfg.NumConsumers),
zap.Strings("request_middlewares", func() []string {
ids := make([]string, len(cfg.RequestMiddlewares))
for i, id := range cfg.RequestMiddlewares {
ids[i] = id.String()
}
return ids
}()),
)
}
}

// Start overrides the default Start to resolve the extensions.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
for _, id := range qs.mwIDs {
ext, ok := host.GetExtensions()[id]
if !ok {
// Cleanup any middlewares successfully created before this failure
_ = qs.Shutdown(ctx)
return fmt.Errorf("request middleware extension %q not found", id.String())
}
factory, ok := ext.(requestmiddleware.RequestMiddlewareFactory)
if !ok {
// Cleanup any middlewares successfully created before this failure
_ = qs.Shutdown(ctx)
return fmt.Errorf("extension %q does not implement RequestMiddlewareFactory", id.String())
}
mw, err := factory.CreateRequestMiddleware(qs.id, qs.signal)
if err != nil {
// Cleanup any middlewares successfully created before this failure
_ = qs.Shutdown(ctx)
return fmt.Errorf("failed to create request middleware for %q: %w", id.String(), err)
}

if mw != nil {
qs.mws = append(qs.mws, mw)
}
}

if err := qs.QueueBatch.Start(ctx, host); err != nil {
// Ensure middlewares are shut down if queue fails to start.
_ = qs.Shutdown(ctx)
return err
}
return nil
}

// Shutdown ensures the middlewares are also shut down.
func (qs *queueSender) Shutdown(ctx context.Context) error {
var errs error
if qs.QueueBatch != nil {
errs = multierr.Append(errs, qs.QueueBatch.Shutdown(ctx))
}

// Shutdown middlewares in reverse order (LIFO)
for i := len(qs.mws) - 1; i >= 0; i-- {
errs = multierr.Append(errs, qs.mws[i].Shutdown(ctx))
}
// Clear middlewares after shutdown
qs.mws = nil

return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc)
return errs
}
Loading