Skip to content

Conversation

@raghu999
Copy link

Description

This PR introduces the ConcurrencyController interface and integrates it into the exporterhelper.

Changes:

  • Defines the ConcurrencyController interface.
  • Adds the necessary plumbing in exporterhelper to support dynamic concurrency adjustments.
  • Allows extensions (like the Adaptive Concurrency Extension) to interact with and regulate exporter concurrency limits dynamically.

Link to tracking issue

Fixes #14080

Testing

  • Added unit tests for the new ConcurrencyController interface.
  • Verified that existing exporterhelper tests pass to ensure no regression in current behavior.

Documentation

  • Added GoDoc comments for the new interface and methods.

@raghu999 raghu999 requested review from a team, bogdandrutu and dmitryax as code owners December 22, 2025 01:30
@raghu999 raghu999 mentioned this pull request Dec 22, 2025
@raghu999
Copy link
Author

raghu999 commented Jan 5, 2026

@axw @dmitryax @bogdandrutu gentle ping on this PR (#14318). I wanted to check if you could take a look when you have a moment.

This introduces the ConcurrencyController interface + minimal exporterhelper plumbing to allow dynamic concurrency control (intended for the ARC extension path), with unit tests included. The change should be non-invasive unless a controller is explicitly configured.

Would appreciate a review on the API shape/placement and the integration points. Happy to iterate quickly on any feedback or adjust/split if needed. Thanks!

Copy link
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @raghu999!

My main feedback so far is:

  • The interface is very narrow, and seems a bit too coupled to ARC. I'd like to see if we can change that into a more general exporter request/sender middleware interface without compromising ARC.
  • The minConsumersWithController bit feels off. I'm not convinced we should change the default due to some other setting - seems like that would be surprising. A couple of thoughts:
    • Can we for now just have the extension log a warning if the default is set low?
    • Would it make sense for the controller to be able to add consumers? Maybe as a separate extension point in asyncQueue, but referencing the same extension?

@raghu999
Copy link
Author

raghu999 commented Jan 7, 2026

Thanks @raghu999!

My main feedback so far is:

  • The interface is very narrow, and seems a bit too coupled to ARC. I'd like to see if we can change that into a more general exporter request/sender middleware interface without compromising ARC.

  • The minConsumersWithController bit feels off. I'm not convinced we should change the default due to some other setting - seems like that would be surprising. A couple of thoughts:

    • Can we for now just have the extension log a warning if the default is set low?
    • Would it make sense for the controller to be able to add consumers? Maybe as a separate extension point in asyncQueue, but referencing the same extension?

@axw Thanks for the review — I’ve updated the PR based on your feedback:

“Why 200?” / defaults & warnings
I agree that auto-changing sending_queue.num_consumers is surprising. I removed the code that forced it to 200. Now, if concurrency_controller is configured but num_consumers is still at the default (10), exporterhelper logs a warning that the worker pool may cap the middleware’s behavior, while preserving the user’s config.

No-op middleware (avoid nil checks)
I added a NoopRequestMiddleware default so the hot path doesn’t need nil checks. I also guard against the factory returning nil by keeping the no-op middleware in that case.

General middleware interface
I refactored the ARC-coupled interface into a generic RequestMiddleware / RequestMiddlewareFactory. I explored the WrapSender(... internal/request,sender ...) style, but extensioncapabilities can’t depend on exporterhelper/internal/... types due to Go internal visibility rules and it would also introduce an import cycle. Using Handle(ctx, next func(ctx) error) keeps the interface general and decoupled while letting extensions encapsulate timing/permits/error logic.

@raghu999 raghu999 force-pushed the controller-interface branch from 77cbada to 3613de8 Compare January 7, 2026 07:50
@raghu999 raghu999 force-pushed the controller-interface branch from 3613de8 to f2d39d9 Compare January 7, 2026 07:51
Copy link
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the ARC-coupled interface into a generic RequestMiddleware / RequestMiddlewareFactory. I explored the WrapSender(... internal/request,sender ...) style, but extensioncapabilities can’t depend on exporterhelper/internal/... types due to Go internal visibility rules and it would also introduce an import cycle. Using Handle(ctx, next func(ctx) error) keeps the interface general and decoupled while letting extensions encapsulate timing/permits/error logic.

The interface doesn't have to live in extensioncapabilities. For example, there's https://github.com/open-telemetry/opentelemetry-collector/tree/main/extension/extensionmiddleware for HTTP and gRPC middleware. I wouldn't recommend adding it in there, just using it as an example. Perhaps we could introduce a new package under https://github.com/open-telemetry/opentelemetry-collector/tree/main/extension/xextension?

@raghu999
Copy link
Author

raghu999 commented Jan 7, 2026

I refactored the ARC-coupled interface into a generic RequestMiddleware / RequestMiddlewareFactory. I explored the WrapSender(... internal/request,sender ...) style, but extensioncapabilities can’t depend on exporterhelper/internal/... types due to Go internal visibility rules and it would also introduce an import cycle. Using Handle(ctx, next func(ctx) error) keeps the interface general and decoupled while letting extensions encapsulate timing/permits/error logic.

The interface doesn't have to live in extensioncapabilities. For example, there's https://github.com/open-telemetry/opentelemetry-collector/tree/main/extension/extensionmiddleware for HTTP and gRPC middleware. I wouldn't recommend adding it in there, just using it as an example. Perhaps we could introduce a new package under https://github.com/open-telemetry/opentelemetry-collector/tree/main/extension/xextension?

Thanks for the review @axw! I've updated the PR to incorporate all suggested changes:

Configuration (config.go):

  1. Renamed Field: Changed RequestMiddlewareID to RequestMiddlewares.
  2. Updated Type: Changed the type to a list ([]component.ID) to be consistent with confighttp and allow multiple middlewares.
  3. Updated Tag: Switched the YAML tag to mapstructure:"request_middlewares".
  4. Documentation: Updated the code comments to reflect the list type and removed the concurrency controller documentation

Interface Location:

  • Refactoring: As suggested, I removed the RequestMiddleware and RequestMiddlewareFactory interfaces from extensioncapabilities.

  • New Location: I've moved them to a new package go.opentelemetry.io/collector/extension/xextension/extensionmiddleware. This keeps the experimental middleware capabilities separate from the stable core extension interfaces.

Ready for a re-review!

@raghu999 raghu999 requested a review from axw January 7, 2026 23:26
@raghu999 raghu999 force-pushed the controller-interface branch from ce09eef to 7466c9e Compare January 7, 2026 23:31
@raghu999
Copy link
Author

raghu999 commented Jan 8, 2026

I think all the code can be simplified if:

  • NewQueueSender just stores "next" in a new field, and references that field rather than the parameter in exportFunc
  • queueSender.Start overrides the next field with the wrapped sender

Thanks, @axw.

I agree moving the interface to exporterhelper/internal and aliasing it in xexporterhelper is the right move here. It cleanly resolves the circular dependency issues caused by xextension needing to reference exporterhelper types.

I've applied that change and also refactored queue_sender.go to use the lazy-binding pattern you suggested (storing next as a field and wrapping it in Start). This allowed me to remove the RequestMiddlewareFactory entirely, which significantly simplifies the plumbing.

The tests passed locally. Ready for another look.

@raghu999 raghu999 requested a review from axw January 8, 2026 05:01

// Start overrides the default Start to resolve the extensions and wrap the sender.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
mws := make([]requestmiddleware.RequestMiddleware, 0, len(qs.mwIDs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the requestmiddleware code - seems you missed adding it to your commits?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axw Apologies, yes, looks like I missed adding the requestmiddleware code. Just added, please check. I also incorporated the other review comments (removed the warning logic and the superfluous tests).

@raghu999 raghu999 force-pushed the controller-interface branch from db733c3 to 205a167 Compare January 8, 2026 06:02
Copy link
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @raghu999, it looks pretty clean to me now. I'm not sure what the Sizer change is about - that seems off.

I think it's ready for a second opinion now. @dmitryax do you have opinions on this?

@raghu999
Copy link
Author

raghu999 commented Jan 8, 2026

Thanks @raghu999, it looks pretty clean to me now. I'm not sure what the Sizer change is about - that seems off.

I think it's ready for a second opinion now. @dmitryax do you have opinions on this?

@axw I’d appreciate your validation on the SizerType alias.

When I looked at exporterhelper/internal/request/request.go, I noticed the Request interface requires:

MergeSplit(ctx context.Context, maxSize int, sizerType SizerType, req Request) ([]Request, error)

Since xexporterhelper.Request aliases that interface, any external implementation (including mocks in my extension’s tests) needs to implement MergeSplit. Because SizerType is internal, I couldn’t find a way to write that method signature externally without re-exporting it.

Could you let me know if I’m missing a different pattern here? If the alias is undesirable, do you have a preferred way external consumers are expected to satisfy the MergeSplit contract?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Adaptive Request Concurrency (ARC) for HTTP and gRPC Exporters

2 participants