Skip to content

Commit 7cab962

Browse files
swiatekmblakerouse
authored andcommitted
Use a random port for otel collector monitoring endpoint (#10240)
* Use a random port for otel collector monitoring endpoint * mage notice * Use an env variable * Fix linter warnings * Fix random port determination for the embedded otel collector * Drop the ports functions from the utils package * fixup! Fix random port determination for the embedded otel collector * Ensure no port conflicts * Clean up port assignment * Verify that returned ports are unique * Add port conflict test * Fix docstring typo * More comments * Add comments explaining the port conflict test * Update internal/pkg/otel/manager/execution_subprocess.go Co-authored-by: Blake Rouse <[email protected]> --------- Co-authored-by: Blake Rouse <[email protected]> (cherry picked from commit 5cb8c31)
1 parent d59dd9a commit 7cab962

File tree

13 files changed

+976
-516
lines changed

13 files changed

+976
-516
lines changed

NOTICE-fips.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

NOTICE.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ require (
124124
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.135.0
125125
go.opentelemetry.io/collector/receiver/nopreceiver v0.135.0
126126
go.opentelemetry.io/collector/receiver/otlpreceiver v0.135.0
127-
go.opentelemetry.io/collector/service v0.135.0
128127
go.opentelemetry.io/ebpf-profiler v0.0.202536
129128
go.uber.org/zap v1.27.0
130129
go.yaml.in/yaml/v3 v3.0.4
@@ -722,6 +721,7 @@ require (
722721
go.opentelemetry.io/collector/scraper v0.135.0 // indirect
723722
go.opentelemetry.io/collector/scraper/scraperhelper v0.135.0 // indirect
724723
go.opentelemetry.io/collector/semconv v0.128.1-0.20250610090210-188191247685 // indirect
724+
go.opentelemetry.io/collector/service v0.135.0 // indirect
725725
go.opentelemetry.io/collector/service/hostcapabilities v0.135.0 // indirect
726726
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
727727
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 // indirect

internal/pkg/agent/application/application.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,13 @@ func New(
133133
if err != nil {
134134
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
135135
}
136-
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, rawConfig.OTel, agentInfo, isOtelExecModeSubprocess)
136+
monitor := componentmonitoring.New(
137+
isMonitoringSupported,
138+
cfg.Settings.DownloadConfig.OS(),
139+
cfg.Settings.MonitoringConfig,
140+
agentInfo,
141+
isOtelExecModeSubprocess,
142+
)
137143

138144
runtime, err := runtime.NewManager(
139145
log,
@@ -245,7 +251,16 @@ func New(
245251
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
246252
}
247253

248-
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
254+
otelManager, err := otelmanager.NewOTelManager(
255+
log.Named("otel_manager"),
256+
logLevel, baseLogger,
257+
otelExecMode,
258+
agentInfo,
259+
0, // TODO: make this configurable in a follow-up
260+
0, // TODO: make this configurable in a follow-up
261+
monitor.ComponentMonitoringConfig,
262+
cfg.Settings.ProcessConfig.StopTimeout,
263+
)
249264
if err != nil {
250265
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
251266
}

internal/pkg/agent/application/monitoring/component/v1_monitor.go

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,11 @@ import (
2121
"time"
2222
"unicode"
2323

24-
"go.opentelemetry.io/collector/confmap"
25-
"go.opentelemetry.io/collector/service"
24+
koanfmaps "github.com/knadh/koanf/maps"
2625

2726
"github.com/elastic/elastic-agent/pkg/component"
2827
"github.com/elastic/elastic-agent/pkg/utils"
2928

30-
koanfmaps "github.com/knadh/koanf/maps"
31-
3229
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
3330
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
3431
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
@@ -38,6 +35,12 @@ import (
3835
)
3936

4037
const (
38+
// OtelCollectorMetricsPortEnvVarName is the name of the environment variable used to pass the collector metrics
39+
// port to the managed EDOT collector. It exists because by default we use a random port for this, and we want to
40+
// determine it as late as possible. However, the monitoring manager is instantiated early in the application
41+
// startup process, so instead we rely on this variable. The OTel manager is required to set it whenever it starts
42+
// a collector.
43+
OtelCollectorMetricsPortEnvVarName = "EDOT_COLLECTOR_METRICS_PORT"
4144
// args: data path, pipeline name, application name
4245
logFileFormat = "%s/logs/%s"
4346
// args: data path, install path, pipeline name, application name
@@ -93,7 +96,6 @@ var (
9396
type BeatsMonitor struct {
9497
enabled bool // feature flag disabling whole v1 monitoring story
9598
config *monitoringConfig
96-
otelConfig *confmap.Conf
9799
operatingSystem string
98100
agentInfo info.Agent
99101
isOtelRuntimeSubprocess bool
@@ -115,13 +117,12 @@ type monitoringConfig struct {
115117
}
116118

117119
// New creates a new BeatsMonitor instance.
118-
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, otelCfg *confmap.Conf, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
120+
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
119121
return &BeatsMonitor{
120122
enabled: enabled,
121123
config: &monitoringConfig{
122124
C: cfg,
123125
},
124-
otelConfig: otelCfg,
125126
operatingSystem: operatingSystem,
126127
agentInfo: agentInfo,
127128
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
@@ -149,7 +150,6 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error {
149150
}
150151

151152
b.config = &newConfig
152-
b.otelConfig = rawConfig.OTel
153153
return nil
154154
}
155155

@@ -519,34 +519,9 @@ func (b *BeatsMonitor) monitoringNamespace() string {
519519
}
520520

521521
func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
522-
if b.otelConfig != nil {
523-
if serviceConfig, err := b.otelConfig.Sub("service"); err == nil {
524-
var service service.Config
525-
if serviceConfig.Unmarshal(&service, confmap.WithIgnoreUnused()) == nil {
526-
for _, reader := range service.Telemetry.Metrics.Readers {
527-
if reader.Pull == nil || reader.Pull.Exporter.Prometheus == nil {
528-
continue
529-
}
530-
prometheus := *reader.Pull.Exporter.Prometheus
531-
host := "localhost"
532-
port := 8888
533-
534-
if prometheus.Host != nil {
535-
host = *prometheus.Host
536-
}
537-
if prometheus.Port != nil {
538-
port = *prometheus.Port
539-
}
540-
if prometheus.Host != nil || prometheus.Port != nil {
541-
return host + ":" + strconv.Itoa(port)
542-
}
543-
}
544-
}
545-
}
546-
}
547-
548-
// If there is no explicit configuration, the collector publishes its telemetry on port 8888.
549-
return "localhost:8888"
522+
// The OTel manager is required to set the environment variable. See comment at the constant definition for more
523+
// information.
524+
return fmt.Sprintf("localhost:${env:%s}", OtelCollectorMetricsPortEnvVarName)
550525
}
551526

552527
// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object.

internal/pkg/agent/application/monitoring/component/v1_monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ func TestMonitorReload(t *testing.T) {
11021102
monitorcfg.MonitorLogs = false
11031103
monitorcfg.MonitorMetrics = false
11041104

1105-
beatsMonitor := New(true, "", monitorcfg, nil, nil, false)
1105+
beatsMonitor := New(true, "", monitorcfg, nil, false)
11061106
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
11071107
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
11081108

internal/pkg/agent/cmd/inspect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]
414414
}
415415
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
416416
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
417-
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, otelCfg, agentInfo, isOtelExecModeSubprocess)
417+
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
418418
return monitor.MonitoringConfig, nil
419419
}
420420

internal/pkg/otel/manager/common.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package manager
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"net"
1112

@@ -52,21 +53,30 @@ func reportCollectorStatus(ctx context.Context, statusCh chan *status.AggregateS
5253
}
5354
}
5455

55-
// findRandomTCPPort finds a random available TCP port on the localhost interface.
56-
func findRandomTCPPort() (int, error) {
57-
l, err := netListen("tcp", "localhost:0")
58-
if err != nil {
59-
return 0, err
60-
}
56+
// findRandomTCPPorts finds count random available TCP ports on the localhost interface.
57+
func findRandomTCPPorts(count int) (ports []int, err error) {
58+
ports = make([]int, 0, count)
59+
listeners := make([]net.Listener, 0, count)
60+
defer func() {
61+
for _, listener := range listeners {
62+
if closeErr := listener.Close(); closeErr != nil {
63+
err = errors.Join(err, fmt.Errorf("error closing listener: %w", closeErr))
64+
}
65+
}
66+
}()
67+
for range count {
68+
l, err := netListen("tcp", "localhost:0")
69+
if err != nil {
70+
return nil, err
71+
}
72+
listeners = append(listeners, l)
6173

62-
port := l.Addr().(*net.TCPAddr).Port
63-
err = l.Close()
64-
if err != nil {
65-
return 0, err
66-
}
67-
if port == 0 {
68-
return 0, fmt.Errorf("failed to find random port")
74+
port := l.Addr().(*net.TCPAddr).Port
75+
if port == 0 {
76+
return nil, fmt.Errorf("failed to find random port")
77+
}
78+
ports = append(ports, port)
6979
}
7080

71-
return port, nil
81+
return ports, err
7282
}

internal/pkg/otel/manager/common_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88
"errors"
99
"net"
1010
"path/filepath"
11+
"slices"
1112
"testing"
1213

14+
"github.com/stretchr/testify/assert"
15+
1316
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1417
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
1518
"github.com/elastic/elastic-agent/pkg/component"
@@ -18,9 +21,15 @@ import (
1821
)
1922

2023
func TestFindRandomPort(t *testing.T) {
21-
port, err := findRandomTCPPort()
24+
portCount := 2
25+
ports, err := findRandomTCPPorts(portCount)
2226
require.NoError(t, err)
23-
require.NotEqual(t, 0, port)
27+
require.Len(t, ports, portCount)
28+
for _, port := range ports {
29+
assert.NotEqual(t, 0, port)
30+
}
31+
slices.Sort(ports)
32+
require.Len(t, slices.Compact(ports), portCount, "returned ports should be unique")
2433

2534
defer func() {
2635
netListen = net.Listen
@@ -29,8 +38,8 @@ func TestFindRandomPort(t *testing.T) {
2938
netListen = func(string, string) (net.Listener, error) {
3039
return nil, errors.New("some error")
3140
}
32-
_, err = findRandomTCPPort()
33-
require.Error(t, err, "failed to find random port")
41+
_, err = findRandomTCPPorts(portCount)
42+
assert.Error(t, err, "failed to find random port")
3443
}
3544

3645
func testComponent(componentId string) component.Component {

internal/pkg/otel/manager/execution_embedded.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package manager
66

77
import (
88
"context"
9+
"os"
10+
"strconv"
911
"time"
1012

1113
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
@@ -14,6 +16,8 @@ import (
1416
"go.uber.org/zap"
1517
"go.uber.org/zap/zapcore"
1618

19+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
20+
1721
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
1822
"github.com/elastic/elastic-agent/internal/pkg/otel"
1923
"github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider"
@@ -22,11 +26,14 @@ import (
2226
"github.com/elastic/elastic-agent/pkg/core/logger"
2327
)
2428

25-
func newExecutionEmbedded() *embeddedExecution {
26-
return &embeddedExecution{}
29+
// newExecutionEmbedded creates a new execution which runs the otel collector in a goroutine. A metricsPort of 0 will
30+
// result in a random port being used.
31+
func newExecutionEmbedded(metricsPort int) *embeddedExecution {
32+
return &embeddedExecution{collectorMetricsPort: metricsPort}
2733
}
2834

2935
type embeddedExecution struct {
36+
collectorMetricsPort int
3037
}
3138

3239
// startCollector starts the collector in a new goroutine.
@@ -41,6 +48,10 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L
4148
extConf := map[string]any{
4249
"endpoint": paths.DiagnosticsExtensionSocket(),
4350
}
51+
collectorMetricsPort, err := r.getCollectorMetricsPort()
52+
if err != nil {
53+
return nil, err
54+
}
4455
// NewForceExtensionConverterFactory is used to ensure that the agent_status extension is always enabled.
4556
// It is required for the Elastic Agent to extract the status out of the OTel collector.
4657
settings := otel.NewSettings(
@@ -59,13 +70,41 @@ func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.L
5970
return nil, err
6071
}
6172
go func() {
73+
// Set the environment variable for the collector metrics port. See comment at the constant definition for more information.
74+
setErr := os.Setenv(componentmonitoring.OtelCollectorMetricsPortEnvVarName, strconv.Itoa(collectorMetricsPort))
75+
defer func() {
76+
unsetErr := os.Unsetenv(componentmonitoring.OtelCollectorMetricsPortEnvVarName)
77+
if unsetErr != nil {
78+
logger.Errorf("couldn't unset environment variable %s: %v", componentmonitoring.OtelCollectorMetricsPortEnvVarName, unsetErr)
79+
}
80+
}()
81+
if setErr != nil {
82+
reportErr(ctx, errCh, setErr)
83+
return
84+
}
6285
runErr := svc.Run(collectorCtx)
6386
close(ctl.collectorDoneCh)
6487
reportErr(ctx, errCh, runErr)
6588
}()
6689
return ctl, nil
6790
}
6891

92+
// getCollectorPorts returns the metrics port used by the OTel collector. If the port set in the execution struct is 0,
93+
// a random port is returned instead.
94+
func (r *embeddedExecution) getCollectorMetricsPort() (metricsPort int, err error) {
95+
// if the port is defined (non-zero), use it
96+
if r.collectorMetricsPort > 0 {
97+
return r.collectorMetricsPort, nil
98+
}
99+
100+
// get a random port
101+
ports, err := findRandomTCPPorts(1)
102+
if err != nil {
103+
return 0, err
104+
}
105+
return ports[0], nil
106+
}
107+
69108
type ctxHandle struct {
70109
collectorDoneCh chan struct{}
71110
cancel context.CancelFunc

0 commit comments

Comments
 (0)