Skip to content

[9.0] (backport #8031) Allow using beats receivers for self-monitoring #8101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,19 +414,19 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo
componentInfo{
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
BinaryName: metricBeatName,
RuntimeManager: component.DefaultRuntimeManager,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
},
componentInfo{
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
BinaryName: metricBeatName,
RuntimeManager: component.DefaultRuntimeManager,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
}
if b.config.C.MonitorLogs {
componentInfos = append(componentInfos, componentInfo{
ID: monitoringFilesUnitsID,
BinaryName: fileBeatName,
RuntimeManager: component.DefaultRuntimeManager,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
}
// sort the components to ensure a consistent order of inputs in the configuration
Expand All @@ -444,15 +444,19 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfo

streams = append(streams, b.getServiceComponentFilestreamStreams(componentInfos)...)

inputs := []interface{}{
map[string]interface{}{
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
useOutputKey: monitoringOutput,
"streams": streams,
},
input := map[string]interface{}{
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
useOutputKey: monitoringOutput,
"streams": streams,
}
// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
input["_runtime_experimental"] = b.config.C.RuntimeManager
}

inputs := []any{input}
inputsNode, found := cfg[inputsKey]
if !found {
return fmt.Errorf("no inputs in config")
Expand Down Expand Up @@ -518,6 +522,14 @@ func (b *BeatsMonitor) injectMetricsInput(
},
}

// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
for _, input := range inputs {
inputMap := input.(map[string]interface{})
inputMap["_runtime_experimental"] = b.config.C.RuntimeManager
}
}

// add system/process metrics for services that can't be monitored via json/beats metrics
inputs = append(inputs, b.getServiceComponentProcessMetricInputs(
componentInfos, metricsCollectionIntervalString)...)
Expand Down
73 changes: 71 additions & 2 deletions internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestMonitoringFull(t *testing.T) {
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: true,
},
RuntimeManager: monitoringcfg.DefaultRuntimeManager,
},
},
agentInfo: agentInfo,
Expand Down Expand Up @@ -863,6 +864,7 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
RuntimeManager: monitoringcfg.DefaultRuntimeManager,
},
}

Expand Down Expand Up @@ -914,7 +916,9 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
// Verify that if we're using filebeat receiver, there's no filebeat input
var monitoringCfg struct {
Inputs []struct {
Streams []struct {
ID string
RuntimeManager string `mapstructure:"_runtime_experimental"`
Streams []struct {
Path string `mapstructure:"path"`
}
}
Expand All @@ -931,7 +935,72 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) {
}
}
}
assert.Len(t, streamsForInputMetrics, 2) // we have two filebeats running: filestream-process and monitoring
assert.Len(t, streamsForInputMetrics, 2)
}

func TestMonitoringWithOtelRuntime(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

cfg := &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorLogs: true,
MonitorMetrics: true,
Namespace: "test",
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
RuntimeManager: monitoringcfg.OtelRuntimeManager,
},
}

policy := map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"logs": false,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
}

b := &BeatsMonitor{
enabled: true,
config: cfg,
agentInfo: agentInfo,
}

components := []component.Component{
{
ID: "filestream-receiver",
InputSpec: &component.InputRuntimeSpec{
Spec: component.InputSpec{
Command: &component.CommandSpec{
Name: "filebeat",
},
},
},
RuntimeManager: component.OtelRuntimeManager,
},
}
monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{})
require.NoError(t, err)

// Verify that if we're using filebeat receiver, there's no filebeat input
var monitoringCfg struct {
Inputs []struct {
ID string
RuntimeManager string `mapstructure:"_runtime_experimental"`
}
}
err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg)
require.NoError(t, err)
for _, input := range monitoringCfg.Inputs {
assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager)
}
}

func TestEnrichArgs(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions internal/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ const (
defaultNamespace = "default"

// DefaultHost is used when host is not defined or empty
DefaultHost = "localhost"
DefaultHost = "localhost"
ProcessRuntimeManager = "process"
OtelRuntimeManager = "otel"
DefaultRuntimeManager = ProcessRuntimeManager
)

// MonitoringConfig describes a configuration of a monitoring
Expand All @@ -33,6 +36,7 @@ type MonitoringConfig struct {
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
RuntimeManager string `yaml:"_runtime_experimental,omitempty" config:"_runtime_experimental,omitempty"`
}

// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent
Expand Down Expand Up @@ -118,9 +122,10 @@ func DefaultConfig() *MonitoringConfig {
Host: DefaultHost,
Port: defaultPort,
},
Namespace: defaultNamespace,
APM: defaultAPMConfig(),
Diagnostics: defaultDiagnostics(),
Namespace: defaultNamespace,
APM: defaultAPMConfig(),
Diagnostics: defaultDiagnostics(),
RuntimeManager: DefaultRuntimeManager,
}
}

Expand Down
Loading