Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/otlp-exporter-connection-pool.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp)
component: exporter/otlp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `connection_pool_size` configuration option to support multiple gRPC connections for high-throughput scenarios"

# One or more tracking issues or pull requests related to the change
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The OTLP exporter now supports connection pooling through the `connection_pool_size` configuration option.
This helps improve performance in high-throughput environments (10K+ spans/sec) and high-latency scenarios
by distributing load across multiple gRPC connections using round-robin load balancing.
Default value is 1 (single connection) for backward compatibility. Recommended value for high-throughput: 4-8.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 16 additions & 0 deletions exporter/otlpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ If a scheme of `https` is used then client transport security is enabled and ove
- `sending_queue`: see [Sending Queue](../exporterhelper/README.md#sending-queue) for the full set of available options.
- `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend.

The following settings are optional:

- `connection_pool_size` (default = 0, uses 1 connection): Number of gRPC connections to maintain in the connection pool.
Useful for high-throughput scenarios (10K+ spans/sec) or high-latency network connections.
The exporter distributes requests across connections using round-robin load balancing.
Recommended value for high-throughput deployments: 4-8 connections.
Maximum allowed value: 256. Setting to 0 or omitting uses the default of 1 connection.

Example:

```yaml
Expand All @@ -46,6 +54,14 @@ exporters:
endpoint: otelcol2:4317
tls:
insecure: true
# High-throughput configuration with connection pooling
otlp/high-throughput:
endpoint: otelcol-gateway:4317
connection_pool_size: 5
compression: snappy
sending_queue:
num_consumers: 100
queue_size: 2000
```

By default, `gzip` compression is enabled. See [compression comparison](../../config/configgrpc/README.md#compression-comparison) for details benchmark information. To disable, configure as follows:
Expand Down
11 changes: 11 additions & 0 deletions exporter/otlpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type Config struct {
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
ClientConfig configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// ConnectionPoolSize configures the number of gRPC connections to create and maintain.
// A larger pool size helps with high-throughput scenarios by distributing load across multiple connections.
// Default is 1 (single connection). Recommended for high-throughput: 4-8 connections.
ConnectionPoolSize int `mapstructure:"connection_pool_size"`

// prevent unkeyed literal initialization
_ struct{}
}
Expand All @@ -36,6 +41,12 @@ func (c *Config) Validate() error {
if endpoint := c.sanitizedEndpoint(); endpoint == "" {
return errors.New(`requires a non-empty "endpoint"`)
}
if c.ConnectionPoolSize < 0 {
return errors.New("connection_pool_size must be >= 0")
}
if c.ConnectionPoolSize > 256 {
return errors.New("connection_pool_size must be <= 256")
}
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ func createDefaultConfig() component.Config {
clientCfg.BalancerName = ""

return &Config{
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
ClientConfig: clientCfg,
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
ClientConfig: clientCfg,
ConnectionPoolSize: 0, // 0 means use default (1) at runtime, allowing backward compatibility
}
}

Expand Down
99 changes: 77 additions & 22 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"runtime"
"sync/atomic"

"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
Expand Down Expand Up @@ -35,14 +36,19 @@ type baseExporter struct {
// Input configuration.
config *Config

// gRPC clients and connection.
traceExporter ptraceotlp.GRPCClient
metricExporter pmetricotlp.GRPCClient
logExporter plogotlp.GRPCClient
profileExporter pprofileotlp.GRPCClient
clientConn *grpc.ClientConn
metadata metadata.MD
callOptions []grpc.CallOption
// gRPC clients and connections pool
// When connection pool size > 1, we maintain multiple connections and round-robin across them
traceExporters []ptraceotlp.GRPCClient
metricExporters []pmetricotlp.GRPCClient
logExporters []plogotlp.GRPCClient
profileExporters []pprofileotlp.GRPCClient
clientConns []*grpc.ClientConn

// Round-robin counter for load balancing across connections
roundRobinCounter atomic.Uint32

metadata metadata.MD
callOptions []grpc.CallOption

settings component.TelemetrySettings

Expand All @@ -59,17 +65,48 @@ func newExporter(cfg component.Config, set exporter.Settings) *baseExporter {
return &baseExporter{config: oCfg, settings: set.TelemetrySettings, userAgent: userAgent}
}

// start actually creates the gRPC connection. The client construction is deferred till this point as this
// start actually creates the gRPC connection(s). The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) {
agentOpt := configgrpc.WithGrpcDialOption(grpc.WithUserAgent(e.userAgent))
if e.clientConn, err = e.config.ClientConfig.ToClientConn(ctx, host.GetExtensions(), e.settings, agentOpt); err != nil {
return err

// Determine pool size: default to 1 if not configured or 0
poolSize := e.config.ConnectionPoolSize
if poolSize <= 0 {
poolSize = 1
}

// Initialize slices for connection pool
e.clientConns = make([]*grpc.ClientConn, poolSize)
e.traceExporters = make([]ptraceotlp.GRPCClient, poolSize)
e.metricExporters = make([]pmetricotlp.GRPCClient, poolSize)
e.logExporters = make([]plogotlp.GRPCClient, poolSize)
e.profileExporters = make([]pprofileotlp.GRPCClient, poolSize)

// Create multiple connections for the pool
for i := 0; i < poolSize; i++ {
if e.clientConns[i], err = e.config.ClientConfig.ToClientConn(ctx, host.GetExtensions(), e.settings, agentOpt); err != nil {
// Close any connections created before the error
for j := 0; j < i; j++ {
if e.clientConns[j] != nil {
_ = e.clientConns[j].Close()
}
}
return err
}
e.traceExporters[i] = ptraceotlp.NewGRPCClient(e.clientConns[i])
e.metricExporters[i] = pmetricotlp.NewGRPCClient(e.clientConns[i])
e.logExporters[i] = plogotlp.NewGRPCClient(e.clientConns[i])
e.profileExporters[i] = pprofileotlp.NewGRPCClient(e.clientConns[i])
}

if poolSize > 1 {
e.settings.Logger.Info("OTLP exporter connection pool created",
zap.Int("pool_size", poolSize),
zap.String("endpoint", e.config.ClientConfig.Endpoint),
)
}
e.traceExporter = ptraceotlp.NewGRPCClient(e.clientConn)
e.metricExporter = pmetricotlp.NewGRPCClient(e.clientConn)
e.logExporter = plogotlp.NewGRPCClient(e.clientConn)
e.profileExporter = pprofileotlp.NewGRPCClient(e.clientConn)

headers := map[string]string{}
for k, v := range e.config.ClientConfig.Headers.Iter {
headers[k] = string(v)
Expand All @@ -83,15 +120,30 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro
}

func (e *baseExporter) shutdown(context.Context) error {
if e.clientConn != nil {
return e.clientConn.Close()
var lastErr error
for _, conn := range e.clientConns {
if conn != nil {
if err := conn.Close(); err != nil {
lastErr = err
}
}
}
return nil
return lastErr
}

// getNextExporterIndex returns the next index for round-robin load balancing across connections
func (e *baseExporter) getNextExporterIndex() int {
if len(e.clientConns) == 1 {
return 0
}
counter := e.roundRobinCounter.Add(1)
return int(counter % uint32(len(e.clientConns)))
}

func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
idx := e.getNextExporterIndex()
req := ptraceotlp.NewExportRequestFromTraces(td)
resp, respErr := e.traceExporter.Export(ctx, req, e.callOptions...)
resp, respErr := e.traceExporters[idx].Export(ctx, req, e.callOptions...)
if err := processError(respErr); err != nil {
return err
}
Expand All @@ -106,8 +158,9 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
}

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
idx := e.getNextExporterIndex()
req := pmetricotlp.NewExportRequestFromMetrics(md)
resp, respErr := e.metricExporter.Export(ctx, req, e.callOptions...)
resp, respErr := e.metricExporters[idx].Export(ctx, req, e.callOptions...)
if err := processError(respErr); err != nil {
return err
}
Expand All @@ -122,8 +175,9 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
idx := e.getNextExporterIndex()
req := plogotlp.NewExportRequestFromLogs(ld)
resp, respErr := e.logExporter.Export(ctx, req, e.callOptions...)
resp, respErr := e.logExporters[idx].Export(ctx, req, e.callOptions...)
if err := processError(respErr); err != nil {
return err
}
Expand All @@ -138,8 +192,9 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
}

func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error {
idx := e.getNextExporterIndex()
req := pprofileotlp.NewExportRequestFromProfiles(td)
resp, respErr := e.profileExporter.Export(ctx, req, e.callOptions...)
resp, respErr := e.profileExporters[idx].Export(ctx, req, e.callOptions...)
if err := processError(respErr); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/pdatagen/internal/pdata/one_of_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

const oneOfAccessorTemplate = `// {{ .typeFuncName }} returns the type of the {{ .lowerOriginFieldName }} for this {{ .structName }}.
// Calling this function on zero-initialized {{ .structName }} will cause a panic.
// Calling this function on zero-initialized {{ .structName }} is invalid and will cause a panic.
func (ms {{ .structName }}) {{ .typeFuncName }}() {{ .typeName }} {
switch ms.{{ .origAccessor }}.{{ .originFieldName }}.(type) {
{{- range .values }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const oneOfMessageAccessorsTemplate = `// {{ .fieldName }} returns the {{ .lower
// Calling this function when {{ .originOneOfTypeFuncName }}() != {{ .typeName }} returns an invalid
// zero-initialized instance of {{ .returnType }}. Note that using such {{ .returnType }} instance can cause panic.
//
// Calling this function on zero-initialized {{ .structName }} will cause a panic.
// Calling this function on zero-initialized {{ .structName }} is invalid and will cause a panic.
func (ms {{ .structName }}) {{ .fieldName }}() {{ .returnType }} {
v, ok := ms.orig.Get{{ .originOneOfFieldName }}().(*internal.{{ .originStructType }})
if !ok {
Expand All @@ -28,7 +28,7 @@ func (ms {{ .structName }}) {{ .fieldName }}() {{ .returnType }} {
//
// After this, {{ .originOneOfTypeFuncName }}() function will return {{ .typeName }}".
//
// Calling this function on zero-initialized {{ .structName }} will cause a panic.
// Calling this function on zero-initialized {{ .structName }} is invalid and will cause a panic.
func (ms {{ .structName }}) SetEmpty{{ .fieldName }}() {{ .returnType }} {
ms.state.AssertMutable()
var ov *internal.{{ .originStructType }}
Expand Down
24 changes: 12 additions & 12 deletions pdata/pcommon/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (v Value) getState() *internal.State {
}

// FromRaw sets the value from the given raw value.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) FromRaw(iv any) error {
switch tv := iv.(type) {
case nil:
Expand Down Expand Up @@ -194,7 +194,7 @@ func (v Value) FromRaw(iv any) error {
}

// Type returns the type of the value for this Value.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) Type() ValueType {
switch v.getOrig().Value.(type) {
case *internal.AnyValue_StringValue:
Expand Down Expand Up @@ -277,7 +277,7 @@ func (v Value) Bytes() ByteSlice {
// it also changes the type to be ValueTypeStr.
// The shorter name is used instead of SetString to avoid implementing
// fmt.Stringer interface by the corresponding getter method.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetStr(sv string) {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -289,7 +289,7 @@ func (v Value) SetStr(sv string) {

// SetInt replaces the int64 value associated with this Value,
// it also changes the type to be ValueTypeInt.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetInt(iv int64) {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -301,7 +301,7 @@ func (v Value) SetInt(iv int64) {

// SetDouble replaces the float64 value associated with this Value,
// it also changes the type to be ValueTypeDouble.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetDouble(dv float64) {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -313,7 +313,7 @@ func (v Value) SetDouble(dv float64) {

// SetBool replaces the bool value associated with this Value,
// it also changes the type to be ValueTypeBool.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetBool(bv bool) {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -324,7 +324,7 @@ func (v Value) SetBool(bv bool) {
}

// SetEmptyBytes sets value to an empty byte slice and returns it.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetEmptyBytes() ByteSlice {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -335,7 +335,7 @@ func (v Value) SetEmptyBytes() ByteSlice {
}

// SetEmptyMap sets value to an empty map and returns it.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetEmptyMap() Map {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -347,7 +347,7 @@ func (v Value) SetEmptyMap() Map {
}

// SetEmptySlice sets value to an empty slice and returns it.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) SetEmptySlice() Slice {
v.getState().AssertMutable()
// Delete everything but the AnyValue object itself.
Expand All @@ -360,7 +360,7 @@ func (v Value) SetEmptySlice() Slice {

// MoveTo moves the Value from current overriding the destination and
// resetting the current instance to empty value.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) MoveTo(dest Value) {
v.getState().AssertMutable()
dest.getState().AssertMutable()
Expand All @@ -373,7 +373,7 @@ func (v Value) MoveTo(dest Value) {
}

// CopyTo copies the Value instance overriding the destination.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) CopyTo(dest Value) {
dest.getState().AssertMutable()
internal.CopyAnyValue(dest.getOrig(), v.getOrig())
Expand All @@ -382,7 +382,7 @@ func (v Value) CopyTo(dest Value) {
// AsString converts an OTLP Value object of any type to its equivalent string
// representation. This differs from Str which only returns a non-empty value
// if the ValueType is ValueTypeStr.
// Calling this function on zero-initialized Value will cause a panic.
// Calling this function on zero-initialized Value is invalid and will cause a panic.
func (v Value) AsString() string {
switch v.Type() {
case ValueTypeEmpty:
Expand Down
Loading