Skip to content

Commit 135c55f

Browse files
committed
Add connection pooling support to OTLP exporter for high-throughput scenarios
This enhancement adds a connection_pool_size configuration option to the OTLP exporter, enabling multiple gRPC connections with round-robin load balancing. Key changes: - Add connection_pool_size config parameter (default: 0, uses 1 connection) - Implement round-robin load balancing across multiple connections - Support for 1-256 concurrent gRPC connections - Backward compatible: default behavior unchanged This resolves performance issues in high-throughput environments (10K+ spans/sec) and high-latency network scenarios where a single gRPC connection becomes a bottleneck. Also fixes unrelated service.go issue per contributor feedback on PR open-telemetry#14342.
1 parent 9b0e691 commit 135c55f

File tree

6 files changed

+140
-32
lines changed

6 files changed

+140
-32
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp)
7+
component: exporter/otlp
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Add `connection_pool_size` configuration option to support multiple gRPC connections for high-throughput scenarios"
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: []
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The OTLP exporter now supports connection pooling through the `connection_pool_size` configuration option.
20+
This helps improve performance in high-throughput environments (10K+ spans/sec) and high-latency scenarios
21+
by distributing load across multiple gRPC connections using round-robin load balancing.
22+
Default value is 1 (single connection) for backward compatibility. Recommended value for high-throughput: 4-8.
23+
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

exporter/otlpexporter/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ If a scheme of `https` is used then client transport security is enabled and ove
3333
- `sending_queue`: see [Sending Queue](../exporterhelper/README.md#sending-queue) for the full set of available options.
3434
- `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend.
3535

36+
The following settings are optional:
37+
38+
- `connection_pool_size` (default = 0, uses 1 connection): Number of gRPC connections to maintain in the connection pool.
39+
Useful for high-throughput scenarios (10K+ spans/sec) or high-latency network connections.
40+
The exporter distributes requests across connections using round-robin load balancing.
41+
Recommended value for high-throughput deployments: 4-8 connections.
42+
Maximum allowed value: 256. Setting to 0 or omitting uses the default of 1 connection.
43+
3644
Example:
3745

3846
```yaml
@@ -46,6 +54,14 @@ exporters:
4654
endpoint: otelcol2:4317
4755
tls:
4856
insecure: true
57+
# High-throughput configuration with connection pooling
58+
otlp/high-throughput:
59+
endpoint: otelcol-gateway:4317
60+
connection_pool_size: 5
61+
compression: snappy
62+
sending_queue:
63+
num_consumers: 100
64+
queue_size: 2000
4965
```
5066
5167
By default, `gzip` compression is enabled. See [compression comparison](../../config/configgrpc/README.md#compression-comparison) for details benchmark information. To disable, configure as follows:

exporter/otlpexporter/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ type Config struct {
2323
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
2424
ClientConfig configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
2525

26+
// ConnectionPoolSize configures the number of gRPC connections to create and maintain.
27+
// A larger pool size helps with high-throughput scenarios by distributing load across multiple connections.
28+
// Default is 1 (single connection). Recommended for high-throughput: 4-8 connections.
29+
ConnectionPoolSize int `mapstructure:"connection_pool_size"`
30+
2631
// prevent unkeyed literal initialization
2732
_ struct{}
2833
}
@@ -36,6 +41,12 @@ func (c *Config) Validate() error {
3641
if endpoint := c.sanitizedEndpoint(); endpoint == "" {
3742
return errors.New(`requires a non-empty "endpoint"`)
3843
}
44+
if c.ConnectionPoolSize < 0 {
45+
return errors.New("connection_pool_size must be >= 0")
46+
}
47+
if c.ConnectionPoolSize > 256 {
48+
return errors.New("connection_pool_size must be <= 256")
49+
}
3950
return nil
4051
}
4152

exporter/otlpexporter/factory.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ func createDefaultConfig() component.Config {
4242
clientCfg.BalancerName = ""
4343

4444
return &Config{
45-
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
46-
RetryConfig: configretry.NewDefaultBackOffConfig(),
47-
QueueConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
48-
ClientConfig: clientCfg,
45+
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
46+
RetryConfig: configretry.NewDefaultBackOffConfig(),
47+
QueueConfig: configoptional.Some(exporterhelper.NewDefaultQueueConfig()),
48+
ClientConfig: clientCfg,
49+
ConnectionPoolSize: 0, // 0 means use default (1) at runtime, allowing backward compatibility
4950
}
5051
}
5152

exporter/otlpexporter/otlp.go

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"runtime"
10+
"sync/atomic"
1011

1112
"go.uber.org/zap"
1213
"google.golang.org/genproto/googleapis/rpc/errdetails"
@@ -35,14 +36,19 @@ type baseExporter struct {
3536
// Input configuration.
3637
config *Config
3738

38-
// gRPC clients and connection.
39-
traceExporter ptraceotlp.GRPCClient
40-
metricExporter pmetricotlp.GRPCClient
41-
logExporter plogotlp.GRPCClient
42-
profileExporter pprofileotlp.GRPCClient
43-
clientConn *grpc.ClientConn
44-
metadata metadata.MD
45-
callOptions []grpc.CallOption
39+
// gRPC clients and connections pool
40+
// When connection pool size > 1, we maintain multiple connections and round-robin across them
41+
traceExporters []ptraceotlp.GRPCClient
42+
metricExporters []pmetricotlp.GRPCClient
43+
logExporters []plogotlp.GRPCClient
44+
profileExporters []pprofileotlp.GRPCClient
45+
clientConns []*grpc.ClientConn
46+
47+
// Round-robin counter for load balancing across connections
48+
roundRobinCounter atomic.Uint32
49+
50+
metadata metadata.MD
51+
callOptions []grpc.CallOption
4652

4753
settings component.TelemetrySettings
4854

@@ -59,17 +65,48 @@ func newExporter(cfg component.Config, set exporter.Settings) *baseExporter {
5965
return &baseExporter{config: oCfg, settings: set.TelemetrySettings, userAgent: userAgent}
6066
}
6167

62-
// start actually creates the gRPC connection. The client construction is deferred till this point as this
68+
// start actually creates the gRPC connection(s). The client construction is deferred till this point as this
6369
// is the only place we get hold of Extensions which are required to construct auth round tripper.
6470
func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) {
6571
agentOpt := configgrpc.WithGrpcDialOption(grpc.WithUserAgent(e.userAgent))
66-
if e.clientConn, err = e.config.ClientConfig.ToClientConn(ctx, host.GetExtensions(), e.settings, agentOpt); err != nil {
67-
return err
72+
73+
// Determine pool size: default to 1 if not configured or 0
74+
poolSize := e.config.ConnectionPoolSize
75+
if poolSize <= 0 {
76+
poolSize = 1
77+
}
78+
79+
// Initialize slices for connection pool
80+
e.clientConns = make([]*grpc.ClientConn, poolSize)
81+
e.traceExporters = make([]ptraceotlp.GRPCClient, poolSize)
82+
e.metricExporters = make([]pmetricotlp.GRPCClient, poolSize)
83+
e.logExporters = make([]plogotlp.GRPCClient, poolSize)
84+
e.profileExporters = make([]pprofileotlp.GRPCClient, poolSize)
85+
86+
// Create multiple connections for the pool
87+
for i := 0; i < poolSize; i++ {
88+
if e.clientConns[i], err = e.config.ClientConfig.ToClientConn(ctx, host.GetExtensions(), e.settings, agentOpt); err != nil {
89+
// Close any connections created before the error
90+
for j := 0; j < i; j++ {
91+
if e.clientConns[j] != nil {
92+
_ = e.clientConns[j].Close()
93+
}
94+
}
95+
return err
96+
}
97+
e.traceExporters[i] = ptraceotlp.NewGRPCClient(e.clientConns[i])
98+
e.metricExporters[i] = pmetricotlp.NewGRPCClient(e.clientConns[i])
99+
e.logExporters[i] = plogotlp.NewGRPCClient(e.clientConns[i])
100+
e.profileExporters[i] = pprofileotlp.NewGRPCClient(e.clientConns[i])
101+
}
102+
103+
if poolSize > 1 {
104+
e.settings.Logger.Info("OTLP exporter connection pool created",
105+
zap.Int("pool_size", poolSize),
106+
zap.String("endpoint", e.config.ClientConfig.Endpoint),
107+
)
68108
}
69-
e.traceExporter = ptraceotlp.NewGRPCClient(e.clientConn)
70-
e.metricExporter = pmetricotlp.NewGRPCClient(e.clientConn)
71-
e.logExporter = plogotlp.NewGRPCClient(e.clientConn)
72-
e.profileExporter = pprofileotlp.NewGRPCClient(e.clientConn)
109+
73110
headers := map[string]string{}
74111
for k, v := range e.config.ClientConfig.Headers.Iter {
75112
headers[k] = string(v)
@@ -83,15 +120,30 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro
83120
}
84121

85122
func (e *baseExporter) shutdown(context.Context) error {
86-
if e.clientConn != nil {
87-
return e.clientConn.Close()
123+
var lastErr error
124+
for _, conn := range e.clientConns {
125+
if conn != nil {
126+
if err := conn.Close(); err != nil {
127+
lastErr = err
128+
}
129+
}
88130
}
89-
return nil
131+
return lastErr
132+
}
133+
134+
// getNextExporterIndex returns the next index for round-robin load balancing across connections
135+
func (e *baseExporter) getNextExporterIndex() int {
136+
if len(e.clientConns) == 1 {
137+
return 0
138+
}
139+
counter := e.roundRobinCounter.Add(1)
140+
return int(counter % uint32(len(e.clientConns)))
90141
}
91142

92143
func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
144+
idx := e.getNextExporterIndex()
93145
req := ptraceotlp.NewExportRequestFromTraces(td)
94-
resp, respErr := e.traceExporter.Export(ctx, req, e.callOptions...)
146+
resp, respErr := e.traceExporters[idx].Export(ctx, req, e.callOptions...)
95147
if err := processError(respErr); err != nil {
96148
return err
97149
}
@@ -106,8 +158,9 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
106158
}
107159

108160
func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
161+
idx := e.getNextExporterIndex()
109162
req := pmetricotlp.NewExportRequestFromMetrics(md)
110-
resp, respErr := e.metricExporter.Export(ctx, req, e.callOptions...)
163+
resp, respErr := e.metricExporters[idx].Export(ctx, req, e.callOptions...)
111164
if err := processError(respErr); err != nil {
112165
return err
113166
}
@@ -122,8 +175,9 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
122175
}
123176

124177
func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
178+
idx := e.getNextExporterIndex()
125179
req := plogotlp.NewExportRequestFromLogs(ld)
126-
resp, respErr := e.logExporter.Export(ctx, req, e.callOptions...)
180+
resp, respErr := e.logExporters[idx].Export(ctx, req, e.callOptions...)
127181
if err := processError(respErr); err != nil {
128182
return err
129183
}
@@ -138,8 +192,9 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
138192
}
139193

140194
func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error {
195+
idx := e.getNextExporterIndex()
141196
req := pprofileotlp.NewExportRequestFromProfiles(td)
142-
resp, respErr := e.profileExporter.Export(ctx, req, e.callOptions...)
197+
resp, respErr := e.profileExporters[idx].Export(ctx, req, e.callOptions...)
143198
if err := processError(respErr); err != nil {
144199
return err
145200
}

service/service.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"go.opentelemetry.io/collector/service/internal/proctelemetry"
3535
"go.opentelemetry.io/collector/service/internal/status"
3636
"go.opentelemetry.io/collector/service/telemetry"
37-
"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
3837
)
3938

4039
// This feature gate is deprecated and will be removed in 1.40.0. Views can now be configured.
@@ -235,11 +234,8 @@ func New(ctx context.Context, set Settings, cfg Config) (_ *Service, resultErr e
235234
return nil, err
236235
}
237236

238-
// Only register process metrics if metrics telemetry is enabled
239-
if telemetryCfg, ok := cfg.Telemetry.(*otelconftelemetry.Config); !ok || telemetryCfg.Metrics.Level != configtelemetry.LevelNone {
240-
if err := proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil {
241-
return nil, fmt.Errorf("failed to register process metrics: %w", err)
242-
}
237+
if err := proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil {
238+
return nil, fmt.Errorf("failed to register process metrics: %w", err)
243239
}
244240
return srv, nil
245241
}

0 commit comments

Comments
 (0)