-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Add ConcurrencyController interface for ARC in exporterhelper #14318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 16 commits
496a75d
e509643
1714ea5
10032bf
071d699
a5622d0
4be8fd7
324029e
f2d39d9
33d5428
7466c9e
c9c2a38
20d3981
af76d73
ab598d1
205a167
f456b13
0b13828
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| change_type: enhancement | ||
| component: exporterhelper | ||
| note: > | ||
| Add `RequestMiddleware` interface support to the queue sender. This allows | ||
| extensions to intercept and manage export requests (e.g., for dynamic concurrency control). | ||
| issues: [14080] | ||
| subtext: | | ||
| The `RequestMiddleware` interface has been added to the `exporter/exporterhelper/xexporterhelper` package. | ||
| The `exporterhelper`'s sending queue can now be configured with a list of `request_middlewares` to delegate request execution logic to extensions. | ||
| change_logs: [user, api] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,14 +5,19 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe | |
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "go.uber.org/multierr" | ||
| "go.uber.org/zap" | ||
|
|
||
| "go.opentelemetry.io/collector/component" | ||
| "go.opentelemetry.io/collector/config/configoptional" | ||
| "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" | ||
| "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" | ||
| "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requestmiddleware" | ||
| "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" | ||
| "go.opentelemetry.io/collector/pipeline" | ||
| ) | ||
|
|
||
| // NewDefaultQueueConfig returns the default config for queuebatch.Config. | ||
|
|
@@ -36,23 +41,135 @@ func NewDefaultQueueConfig() queuebatch.Config { | |
| } | ||
| } | ||
|
|
||
| // queueSender wraps QueueBatch to add RequestMiddleware logic. | ||
| type queueSender struct { | ||
| *queuebatch.QueueBatch | ||
| mwIDs []component.ID | ||
|
|
||
| // next is the sender that the queue consumers will call. | ||
| // It is initialized to the downstream sender, but can be wrapped by middlewares in Start. | ||
| next sender.Sender[request.Request] | ||
|
|
||
| // activeMiddlewares tracks the wrappers created during Start so they can be shutdown. | ||
| activeMiddlewares []component.Component | ||
|
|
||
| // Cached exporter settings for Start(). | ||
| id component.ID | ||
| signal pipeline.Signal | ||
| telemetry component.TelemetrySettings | ||
| } | ||
|
|
||
| func NewQueueSender( | ||
| qSet queuebatch.AllSettings[request.Request], | ||
| qCfg queuebatch.Config, | ||
| exportFailureMessage string, | ||
| next sender.Sender[request.Request], | ||
| ) (sender.Sender[request.Request], error) { | ||
|
|
||
| qs := &queueSender{ | ||
| mwIDs: qCfg.RequestMiddlewares, | ||
| id: qSet.ID, | ||
| signal: qSet.Signal, | ||
| telemetry: qSet.Telemetry, | ||
| next: next, | ||
| } | ||
|
|
||
| // exportFunc is called by the queue consumers. | ||
| // It delegates to qs.next, which allows us to swap qs.next in Start() without recreating the queue. | ||
| exportFunc := func(ctx context.Context, req request.Request) error { | ||
| // Have to read the number of items before sending the request since the request can | ||
| // be modified by the downstream components like the batcher. | ||
| itemsCount := req.ItemsCount() | ||
| if errSend := next.Send(ctx, req); errSend != nil { | ||
|
|
||
| if errSend := qs.next.Send(ctx, req); errSend != nil { | ||
| qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, | ||
| zap.Error(errSend), zap.Int("dropped_items", itemsCount)) | ||
| return errSend | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| return queuebatch.NewQueueBatch(qSet, qCfg, exportFunc) | ||
| qb, err := queuebatch.NewQueueBatch(qSet, qCfg, exportFunc) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| qs.QueueBatch = qb | ||
|
|
||
| return qs, nil | ||
| } | ||
|
|
||
| // Start overrides the default Start to resolve the extensions and wrap the sender. | ||
| func (qs *queueSender) Start(ctx context.Context, host component.Host) error { | ||
| mws := make([]requestmiddleware.RequestMiddleware, 0, len(qs.mwIDs)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see the requestmiddleware code - seems you missed adding it to your commits?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @axw Apologies, yes, looks like I missed adding the requestmiddleware code. Just added, please check. I also incorporated the other review comments (removed the warning logic and the superfluous tests). |
||
|
|
||
| // 1. Resolve all extensions first to ensure they exist and implement the interface | ||
| for _, id := range qs.mwIDs { | ||
| ext, ok := host.GetExtensions()[id] | ||
| if !ok { | ||
| return fmt.Errorf("request middleware extension %q not found", id.String()) | ||
| } | ||
| mw, ok := ext.(requestmiddleware.RequestMiddleware) | ||
| if !ok { | ||
| return fmt.Errorf("extension %q does not implement RequestMiddleware", id.String()) | ||
| } | ||
| mws = append(mws, mw) | ||
| } | ||
|
|
||
| settings := requestmiddleware.RequestMiddlewareSettings{ | ||
| ID: qs.id, | ||
| Signal: qs.signal, | ||
| Telemetry: qs.telemetry, | ||
| } | ||
|
|
||
| // 2. Wrap the sender. | ||
| // Wrap sender in reverse so first-configured middleware is outermost. | ||
| wrapped := qs.next | ||
| for i := len(mws) - 1; i >= 0; i-- { | ||
| var err error | ||
| wrapped, err = mws[i].WrapSender(settings, wrapped) | ||
| if err != nil { | ||
| _ = qs.Shutdown(ctx) // Clean up any previously started middlewares | ||
| return fmt.Errorf("failed to wrap sender for %q: %w", qs.mwIDs[i].String(), err) | ||
| } | ||
| if wrapped == nil { | ||
| _ = qs.Shutdown(ctx) | ||
| return fmt.Errorf("request middleware %q returned nil sender", qs.mwIDs[i].String()) | ||
| } | ||
raghu999 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // If the wrapper is a component, start it. | ||
| if err := wrapped.Start(ctx, host); err != nil { | ||
| _ = qs.Shutdown(ctx) | ||
| return err | ||
| } | ||
| qs.activeMiddlewares = append(qs.activeMiddlewares, wrapped) | ||
| } | ||
|
|
||
| // Update the next sender to point to the head of the chain | ||
| qs.next = wrapped | ||
|
|
||
| // 3. Start the queue (which starts the consumers). | ||
| // The consumers will use the updated qs.next. | ||
| if err := qs.QueueBatch.Start(ctx, host); err != nil { | ||
| // Ensure middlewares are shut down if queue fails to start. | ||
| _ = qs.Shutdown(ctx) | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Shutdown ensures the middlewares are also shut down. | ||
| func (qs *queueSender) Shutdown(ctx context.Context) error { | ||
| var errs error | ||
| if qs.QueueBatch != nil { | ||
| errs = multierr.Append(errs, qs.QueueBatch.Shutdown(ctx)) | ||
| } | ||
|
|
||
| // Shutdown active middlewares in reverse order of creation (LIFO). | ||
| // Since we appended them as we wrapped (inner to outer), we simply iterate backwards. | ||
| for i := len(qs.activeMiddlewares) - 1; i >= 0; i-- { | ||
| errs = multierr.Append(errs, qs.activeMiddlewares[i].Shutdown(ctx)) | ||
| } | ||
| qs.activeMiddlewares = nil | ||
|
|
||
| return errs | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.