Skip to content

otel: support system/metrics input with experimental otel runtime #8148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
12ea09f
otel: support system/metrics input with experimental otel runtime
mauri870 May 13, 2025
e981c64
use EventuallyWithTf
mauri870 May 19, 2025
467debd
Merge branch 'main' into otel-system-metrics
mauri870 May 19, 2025
ddafd05
add tests comparing final documents
mauri870 May 19, 2025
04c8423
trim test
mauri870 May 19, 2025
c030281
remove TestSystemMetricsInput
mauri870 May 19, 2025
b6734bf
remove unused vars
mauri870 May 19, 2025
5e545ef
ignore host.network.ingress fields
mauri870 May 20, 2025
b7612b0
Merge branch 'main' into otel-system-metrics
mauri870 May 20, 2025
8f57a30
fix typo in expectation message
mauri870 May 20, 2025
8dc35ad
don't compare system metrics directly, only compare the keys are present
mauri870 May 21, 2025
1ee22f1
Merge branch 'main' into otel-system-metrics
mauri870 May 21, 2025
ea8a07c
ignore host.network.* fields
mauri870 May 21, 2025
2bea714
remove dot from prefix
mauri870 May 22, 2025
5de5d41
ignore network activity fields
mauri870 May 26, 2025
8a973a8
Merge branch 'main' into otel-system-metrics
mauri870 May 26, 2025
827321a
ignore flaky fields from network
mauri870 May 26, 2025
5e0fa05
stop ignoring elastic_agent.{snapshot,version}
mauri870 May 26, 2025
aa6b4c6
reestructure test to use a single config
mauri870 May 26, 2025
c40c8e9
Revert "reestructure test to use a single config"
mauri870 May 27, 2025
49ba5f9
remove agent.version from ignore list
mauri870 May 27, 2025
0d1473a
reorder stripNondeterminism call
mauri870 May 27, 2025
86f6c95
print documents in case of failure
mauri870 May 27, 2025
0f46c29
compare host-wide and normal network events separately
mauri870 May 29, 2025
7a482a0
simplify code
mauri870 May 29, 2025
2431c42
Add Sudo to define.Require
mauri870 May 30, 2025
946e589
match for host metrics
mauri870 May 30, 2025
cc6b4a4
Merge branch 'main' into otel-system-metrics
mauri870 May 30, 2025
9a3012f
fix conflicts
mauri870 May 30, 2025
c1f50c5
filter network interface events
mauri870 May 30, 2025
1d4243b
use tt.metricset, add note about network events
mauri870 Jun 2, 2025
c857eac
add metricsets to template
mauri870 Jun 2, 2025
a092a6c
Update testing/integration/beat_receivers_test.go
mauri870 Jun 3, 2025
2b45118
use AssertMapstrEqual
mauri870 Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pkg/otel/configtranslate/otelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type exporterConfigTranslationFunc func(*config.C) (map[string]any, error)

var (
OtelSupportedOutputTypes = []string{"elasticsearch"}
OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics"}
OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics", "system/metrics"}
configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{
otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter,
}
Expand Down
131 changes: 131 additions & 0 deletions internal/pkg/otel/configtranslate/otelconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,33 @@ func TestGetOtelConfig(t *testing.T) {
},
},
}
systemMetricsConfig := map[string]any{
"id": "test",
"use_output": "default",
"type": "system/metrics",
"streams": []any{
map[string]any{
"id": "test-1",
"data_stream": map[string]any{
"dataset": "generic-1",
},
"metricsets": map[string]any{
"cpu": map[string]any{
"data_stream.dataset": "system.cpu",
},
"memory": map[string]any{
"data_stream.dataset": "system.memory",
},
"network": map[string]any{
"data_stream.dataset": "system.network",
},
"filesystem": map[string]any{
"data_stream.dataset": "system.filesystem",
},
},
},
},
}
esOutputConfig := map[string]any{
"type": "elasticsearch",
"hosts": []any{"localhost:9200"},
Expand Down Expand Up @@ -505,6 +532,110 @@ func TestGetOtelConfig(t *testing.T) {
},
}),
},
{
name: "system/metrics",
model: &component.Model{
Components: []component.Component{
{
ID: "system-metrics",
InputType: "system/metrics",
OutputType: "elasticsearch",
InputSpec: &component.InputRuntimeSpec{
BinaryName: "agentbeat",
Spec: component.InputSpec{
Command: &component.CommandSpec{
Args: []string{"metricbeat"},
},
},
},
Units: []component.Unit{
{
ID: "system/metrics",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(systemMetricsConfig),
},
{
ID: "system/metrics-default",
Type: client.UnitTypeOutput,
Config: component.MustExpectedConfig(esOutputConfig),
},
},
},
},
},
expectedConfig: confmap.NewFromStringMap(map[string]any{
"exporters": expectedESConfig,
"receivers": map[string]any{
"metricbeatreceiver/_agent-component/system-metrics": map[string]any{
"metricbeat": map[string]any{
"modules": []map[string]any{
{
"module": "system",
"data_stream": map[string]any{"dataset": "generic-1"},
"id": "test-1",
"index": "metrics-generic-1-default",
"metricsets": map[string]any{
"cpu": map[string]any{
"data_stream.dataset": "system.cpu",
},
"memory": map[string]any{
"data_stream.dataset": "system.memory",
},
"network": map[string]any{
"data_stream.dataset": "system.network",
},
"filesystem": map[string]any{
"data_stream.dataset": "system.filesystem",
},
},
"processors": defaultProcessors("test-1", "generic-1", "metrics"),
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"path": map[string]any{
"data": filepath.Join(paths.Run(), "system-metrics"),
},
"queue": map[string]any{
"mem": map[string]any{
"events": uint64(3200),
"flush": map[string]any{
"min_events": uint64(1600),
"timeout": "10s",
},
},
},
"logging": map[string]any{
"with_fields": map[string]any{
"component": map[string]any{
"binary": "metricbeat",
"dataset": "elastic_agent.metricbeat",
"type": "system/metrics",
"id": "system-metrics",
},
"log": map[string]any{
"source": "system-metrics",
},
},
},
"http": map[string]any{
"enabled": true,
"host": "localhost",
},
},
},
"service": map[string]any{
"pipelines": map[string]any{
"logs/_agent-component/system-metrics": map[string][]string{
"exporters": []string{"elasticsearch/_agent-component/default"},
"receivers": []string{"metricbeatreceiver/_agent-component/system-metrics"},
},
},
},
}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
138 changes: 138 additions & 0 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,144 @@ agent.monitoring:
cmd.Wait()
}

func TestOTelSystemMetricsInput(t *testing.T) {
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
Sudo: true,
OS: []define.OS{
{Type: define.Windows},
{Type: define.Linux},
{Type: define.Darwin},
},
Stack: &define.Stack{},
})

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

// Create the otel configuration file
type otelConfigOptions struct {
InputPath string
ESEndpoint string
ESApiKey string
Namespace string
}
esEndpoint, err := getESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
decodedApiKey, err := getDecodedApiKey(esApiKey)
require.NoError(t, err)
configTemplate := `agent.logging.level: info
agent.logging.to_stderr: true
inputs:
# Collecting system metrics
- type: system/metrics
id: unique-system-metrics-input
data_stream.namespace: {{.Namespace}}
use_output: default
_runtime_experimental: otel
streams:
- metricsets:
- cpu
data_stream.dataset: system.cpu
- metricsets:
- memory
data_stream.dataset: system.memory
- metricsets:
- network
data_stream.dataset: system.network
- metricsets:
- filesystem
data_stream.dataset: system.filesystem
outputs:
default:
type: elasticsearch
hosts: [{{.ESEndpoint}}]
api_key: "{{.ESApiKey}}"
preset: "balanced"
`
var configBuffer bytes.Buffer

template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
otelConfigOptions{
ESEndpoint: esEndpoint,
ESApiKey: decodedApiKey,
Namespace: info.Namespace,
})

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()
err = fixture.Prepare(ctx)
require.NoError(t, err)

err = fixture.Configure(ctx, configBuffer.Bytes())

cmd, err := fixture.PrepareAgentCommand(ctx, nil)
require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err)

output := strings.Builder{}
cmd.Stderr = &output
cmd.Stdout = &output

startedAt := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
err = cmd.Start()
require.NoError(t, err)

t.Cleanup(func() {
if t.Failed() {
t.Log("Elastic-Agent output:")
t.Log(output.String())
}
})

require.Eventually(t, func() bool {
err = fixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 30*time.Second, 1*time.Second)

mustClauses := []map[string]any{
{"range": map[string]any{
"@timestamp": map[string]string{
"gte": startedAt,
},
}},
}

rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": mustClauses,
},
},
}

metricsets := []string{"cpu", "memory", "network", "filesystem"}
for _, mset := range metricsets {
index := fmt.Sprintf(".ds-metrics-system.%s-%s*", mset, info.Namespace)
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, index, info.ESClient)
require.NoError(t, err)
return docs.Hits.Total.Value > 0
},
30*time.Second, 1*time.Second,
"Expected to find at least one document for metricset %s in index %s, got 0", mset, index)
}

cancel()
cmd.Wait()
}

func TestOtelMBReceiverE2E(t *testing.T) {
info := define.Require(t, define.Requirements{
Group: Default,
Expand Down