Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 212 additions & 212 deletions NOTICE-fips.txt

Large diffs are not rendered by default.

424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ require (
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.135.0
go.opentelemetry.io/collector/receiver/nopreceiver v0.135.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.135.0
go.opentelemetry.io/collector/service v0.135.0
go.opentelemetry.io/ebpf-profiler v0.0.202536
go.uber.org/zap v1.27.0
go.yaml.in/yaml/v3 v3.0.4
Expand Down Expand Up @@ -722,6 +721,7 @@ require (
go.opentelemetry.io/collector/scraper v0.135.0 // indirect
go.opentelemetry.io/collector/scraper/scraperhelper v0.135.0 // indirect
go.opentelemetry.io/collector/semconv v0.128.1-0.20250610090210-188191247685 // indirect
go.opentelemetry.io/collector/service v0.135.0 // indirect
go.opentelemetry.io/collector/service/hostcapabilities v0.135.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 // indirect
Expand Down
25 changes: 23 additions & 2 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent/pkg/utils"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/go-ucfg"
Expand Down Expand Up @@ -126,14 +128,25 @@ func New(

otelExecMode := otelconfig.GetExecutionModeFromConfig(log, rawConfig)
isOtelExecModeSubprocess := otelExecMode == otelmanager.SubprocessExecutionMode
otelCollectorMetricsPort, err := utils.FindRandomTCPPort()
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to find a random port for otel collector metrics: %w", err)
}

// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled
upgrader, err := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, cfg.Settings.Upgrade, agentInfo, new(upgrade.AgentWatcherHelper))
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, rawConfig.OTel, agentInfo, isOtelExecModeSubprocess)
monitor := componentmonitoring.New(
isMonitoringSupported,
cfg.Settings.DownloadConfig.OS(),
cfg.Settings.MonitoringConfig,
agentInfo,
isOtelExecModeSubprocess,
otelCollectorMetricsPort,
)

runtime, err := runtime.NewManager(
log,
Expand Down Expand Up @@ -245,7 +258,15 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelExecMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
otelManager, err := otelmanager.NewOTelManager(
log.Named("otel_manager"),
logLevel, baseLogger,
otelExecMode,
agentInfo,
otelCollectorMetricsPort,
monitor.ComponentMonitoringConfig,
cfg.Settings.ProcessConfig.StopTimeout,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
Expand Down
64 changes: 20 additions & 44 deletions internal/pkg/agent/application/monitoring/component/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import (
"time"
"unicode"

"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/service"
koanfmaps "github.com/knadh/koanf/maps"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/utils"

koanfmaps "github.com/knadh/koanf/maps"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
Expand Down Expand Up @@ -91,12 +88,12 @@ var (
// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc)
// by injecting the monitoring config into an existing fleet config
type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
otelConfig *confmap.Conf
operatingSystem string
agentInfo info.Agent
isOtelRuntimeSubprocess bool
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo info.Agent
isOtelRuntimeSubprocess bool
otelCollectorMonitoringPort int
}

// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
Expand All @@ -115,16 +112,23 @@ type monitoringConfig struct {
}

// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, otelCfg *confmap.Conf, agentInfo info.Agent, isOtelRuntimeSubprocess bool) *BeatsMonitor {
func New(
enabled bool,
operatingSystem string,
cfg *monitoringCfg.MonitoringConfig,
agentInfo info.Agent,
isOtelRuntimeSubprocess bool,
otelCollectorMonitoringPort int,
) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
C: cfg,
},
otelConfig: otelCfg,
operatingSystem: operatingSystem,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
operatingSystem: operatingSystem,
agentInfo: agentInfo,
isOtelRuntimeSubprocess: isOtelRuntimeSubprocess,
otelCollectorMonitoringPort: otelCollectorMonitoringPort,
}
}

Expand All @@ -149,7 +153,6 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error {
}

b.config = &newConfig
b.otelConfig = rawConfig.OTel
return nil
}

Expand Down Expand Up @@ -519,34 +522,7 @@ func (b *BeatsMonitor) monitoringNamespace() string {
}

func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
if b.otelConfig != nil {
if serviceConfig, err := b.otelConfig.Sub("service"); err == nil {
var service service.Config
if serviceConfig.Unmarshal(&service, confmap.WithIgnoreUnused()) == nil {
for _, reader := range service.Telemetry.Metrics.Readers {
if reader.Pull == nil || reader.Pull.Exporter.Prometheus == nil {
continue
}
prometheus := *reader.Pull.Exporter.Prometheus
host := "localhost"
port := 8888

if prometheus.Host != nil {
host = *prometheus.Host
}
if prometheus.Port != nil {
port = *prometheus.Port
}
if prometheus.Host != nil || prometheus.Port != nil {
return host + ":" + strconv.Itoa(port)
}
}
}
}
}

// If there is no explicit configuration, the collector publishes its telemetry on port 8888.
return "localhost:8888"
return fmt.Sprintf("localhost:%d", b.otelCollectorMonitoringPort)
}

// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ func TestMonitorReload(t *testing.T) {
monitorcfg.MonitorLogs = false
monitorcfg.MonitorMetrics = false

beatsMonitor := New(true, "", monitorcfg, nil, nil, false)
beatsMonitor := New(true, "", monitorcfg, nil, false, 0)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)

Expand Down
9 changes: 8 additions & 1 deletion internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,14 @@ func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]
}
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, otelCfg, agentInfo, isOtelExecModeSubprocess)
otelMetricsPortPlaceholder := 0 // this value is randomly selected on startup, set 0 here
monitor := componentmonitoring.New(
agentCfg.Settings.V1MonitoringEnabled,
agentCfg.Settings.DownloadConfig.OS(),
agentCfg.Settings.MonitoringConfig,
agentInfo,
isOtelExecModeSubprocess,
otelMetricsPortPlaceholder)
return monitor.MonitoringConfig, nil
}

Expand Down
24 changes: 0 additions & 24 deletions internal/pkg/otel/manager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@ package manager

import (
"context"
"fmt"
"net"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
)

// for testing purposes
var netListen = net.Listen

// reportErr sends an error to the provided error channel. It first drains the channel
// to ensure that only the most recent error is kept, as intermediate errors can be safely discarded.
// This ensures the receiver always observes the latest reported error.
Expand Down Expand Up @@ -51,22 +46,3 @@ func reportCollectorStatus(ctx context.Context, statusCh chan *status.AggregateS
case statusCh <- collectorStatus:
}
}

// findRandomTCPPort finds a random available TCP port on the localhost interface.
func findRandomTCPPort() (int, error) {
l, err := netListen("tcp", "localhost:0")
if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
if err != nil {
return 0, err
}
if port == 0 {
return 0, fmt.Errorf("failed to find random port")
}

return port, nil
}
21 changes: 0 additions & 21 deletions internal/pkg/otel/manager/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,13 @@
package manager

import (
"errors"
"net"
"path/filepath"
"testing"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"

"github.com/stretchr/testify/require"
)

func TestFindRandomPort(t *testing.T) {
port, err := findRandomTCPPort()
require.NoError(t, err)
require.NotEqual(t, 0, port)

defer func() {
netListen = net.Listen
}()

netListen = func(string, string) (net.Listener, error) {
return nil, errors.New("some error")
}
_, err = findRandomTCPPort()
require.Error(t, err, "failed to find random port")
}

func testComponent(componentId string) component.Component {
fileStreamConfig := map[string]any{
"id": "test",
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"go.opentelemetry.io/collector/component"
"gopkg.in/yaml.v3"

"github.com/elastic/elastic-agent/pkg/utils"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -87,7 +89,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
return nil, fmt.Errorf("cannot access collector path: %w", err)
}

httpHealthCheckPort, err := findRandomTCPPort()
httpHealthCheckPort, err := utils.FindRandomTCPPort()
if err != nil {
return nil, fmt.Errorf("could not find port for http health check: %w", err)
}
Expand Down
54 changes: 52 additions & 2 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type OTelManager struct {
// the mergedCollectorCfg is nil then the collector is not running.
mergedCollectorCfg *confmap.Conf
mergedCollectorCfgHash []byte
collectorMetricsPort int

currentCollectorStatus *status.AggregateStatus
currentComponentStates map[string]runtime.ComponentComponentState
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewOTelManager(
baseLogger *logger.Logger,
mode ExecutionMode,
agentInfo info.Agent,
collectorMetricsPort int,
beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter,
stopTimeout time.Duration,
) (*OTelManager, error) {
Expand Down Expand Up @@ -157,6 +159,7 @@ func NewOTelManager(
errCh: make(chan error, 1), // holds at most one error
collectorStatusCh: make(chan *status.AggregateStatus, 1),
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
collectorMetricsPort: collectorMetricsPort,
updateCh: make(chan configUpdate, 1),
doneChan: make(chan struct{}),
execution: exec,
Expand Down Expand Up @@ -280,7 +283,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
// and reset the retry count
m.recoveryTimer.Stop()
m.recoveryRetries.Store(0)
mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.baseLogger)
mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.collectorMetricsPort, m.baseLogger)
if err != nil {
reportErr(ctx, m.errCh, err)
continue
Expand Down Expand Up @@ -328,7 +331,13 @@ func (m *OTelManager) Errors() <-chan error {
}

// buildMergedConfig combines collector configuration with component-derived configuration.
func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringConfigGetter translate.BeatMonitoringConfigGetter, logger *logp.Logger) (*confmap.Conf, error) {
func buildMergedConfig(
cfgUpdate configUpdate,
agentInfo info.Agent,
monitoringConfigGetter translate.BeatMonitoringConfigGetter,
collectorMetricsPort int,
logger *logp.Logger,
) (*confmap.Conf, error) {
mergedOtelCfg := confmap.New()

// Generate component otel config if there are components
Expand Down Expand Up @@ -363,6 +372,10 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC
}
}

if err := addCollectorMetricsPort(mergedOtelCfg, collectorMetricsPort); err != nil {
return nil, fmt.Errorf("failed to add random collector metrics port: %w", err)
}

if err := injectDiagnosticsExtension(mergedOtelCfg); err != nil {
return nil, fmt.Errorf("failed to inject diagnostics: %w", err)
}
Expand Down Expand Up @@ -617,3 +630,40 @@ func calculateConfmapHash(conf *confmap.Conf) ([]byte, error) {

return h.Sum(nil), nil
}

func addCollectorMetricsPort(conf *confmap.Conf, port int) error {
// We operate on untyped maps instead of otel config structs because the otel collector has an elaborate
// configuration resolution system, and we can't reproduce it fully here. It's possible some of the values won't
// be valid for unmarshalling, because they're supposed to be loaded from environment variables, and so on.
metricReadersUntyped := conf.Get("service::telemetry::metrics::readers")
if metricReadersUntyped == nil {
metricReadersUntyped = []any{}
}
metricsReadersList, ok := metricReadersUntyped.([]any)
if !ok {
return fmt.Errorf("couldn't convert value of service::telemetry::metrics::readers to a list: %v", metricReadersUntyped)
}

metricsReader := map[string]any{
"pull": map[string]any{
"exporter": map[string]any{
"prometheus": map[string]any{
"host": "localhost",
"port": port,
// this is the default configuration from the otel collector
"without_scope_info": true,
"without_units": true,
"without_type_suffix": true,
},
},
},
}
metricsReadersList = append(metricsReadersList, metricsReader)
confMap := map[string]any{
"service::telemetry::metrics::readers": metricsReadersList,
}
if mergeErr := conf.Merge(confmap.NewFromStringMap(confMap)); mergeErr != nil {
return fmt.Errorf("failed to merge config: %w", mergeErr)
}
return nil
}
Loading