diff --git a/internal/pkg/otel/configtranslate/otelconfig.go b/internal/pkg/otel/configtranslate/otelconfig.go index a179fe8d15b..30e9a6ef756 100644 --- a/internal/pkg/otel/configtranslate/otelconfig.go +++ b/internal/pkg/otel/configtranslate/otelconfig.go @@ -39,7 +39,7 @@ type exporterConfigTranslationFunc func(*config.C) (map[string]any, error) var ( OtelSupportedOutputTypes = []string{"elasticsearch"} - OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics"} + OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics", "system/metrics"} configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{ otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter, } diff --git a/internal/pkg/otel/configtranslate/otelconfig_test.go b/internal/pkg/otel/configtranslate/otelconfig_test.go index f88ef6783ca..eb2489cf6b9 100644 --- a/internal/pkg/otel/configtranslate/otelconfig_test.go +++ b/internal/pkg/otel/configtranslate/otelconfig_test.go @@ -180,6 +180,33 @@ func TestGetOtelConfig(t *testing.T) { }, }, } + systemMetricsConfig := map[string]any{ + "id": "test", + "use_output": "default", + "type": "system/metrics", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "metricsets": map[string]any{ + "cpu": map[string]any{ + "data_stream.dataset": "system.cpu", + }, + "memory": map[string]any{ + "data_stream.dataset": "system.memory", + }, + "network": map[string]any{ + "data_stream.dataset": "system.network", + }, + "filesystem": map[string]any{ + "data_stream.dataset": "system.filesystem", + }, + }, + }, + }, + } esOutputConfig := map[string]any{ "type": "elasticsearch", "hosts": []any{"localhost:9200"}, @@ -505,6 +532,110 @@ func TestGetOtelConfig(t *testing.T) { }, }), }, + { + name: "system/metrics", + model: &component.Model{ + Components: []component.Component{ + { + ID: "system-metrics", + InputType: "system/metrics", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "system/metrics", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(systemMetricsConfig), + }, + { + ID: "system/metrics-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": expectedESConfig, + "receivers": map[string]any{ + "metricbeatreceiver/_agent-component/system-metrics": map[string]any{ + "metricbeat": map[string]any{ + "modules": []map[string]any{ + { + "module": "system", + "data_stream": map[string]any{"dataset": "generic-1"}, + "id": "test-1", + "index": "metrics-generic-1-default", + "metricsets": map[string]any{ + "cpu": map[string]any{ + "data_stream.dataset": "system.cpu", + }, + "memory": map[string]any{ + "data_stream.dataset": "system.memory", + }, + "network": map[string]any{ + "data_stream.dataset": "system.network", + }, + "filesystem": map[string]any{ + "data_stream.dataset": "system.filesystem", + }, + }, + "processors": defaultProcessors("test-1", "generic-1", "metrics"), + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), "system-metrics"), + }, + "queue": map[string]any{ + "mem": map[string]any{ + "events": uint64(3200), + "flush": map[string]any{ + "min_events": uint64(1600), + "timeout": "10s", + }, + }, + }, + "logging": map[string]any{ + "with_fields": map[string]any{ + "component": map[string]any{ + "binary": "metricbeat", + "dataset": "elastic_agent.metricbeat", + "type": "system/metrics", + "id": "system-metrics", + }, + "log": map[string]any{ + "source": "system-metrics", + }, + }, + }, + "http": map[string]any{ + "enabled": true, + "host": "localhost", + }, + }, + }, + "service": map[string]any{ + "pipelines": map[string]any{ + "logs/_agent-component/system-metrics": map[string][]string{ + "exporters": []string{"elasticsearch/_agent-component/default"}, + "receivers": []string{"metricbeatreceiver/_agent-component/system-metrics"}, + }, + }, + }, + }), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/testing/integration/beat_receivers_test.go b/testing/integration/beat_receivers_test.go index dd74524e13b..fedbdd32a1f 100644 --- a/testing/integration/beat_receivers_test.go +++ b/testing/integration/beat_receivers_test.go @@ -7,17 +7,22 @@ package integration import ( + "bytes" "context" + "encoding/base64" "fmt" "io" "net/http" + "strings" "testing" + "text/template" "time" "github.com/gofrs/uuid/v5" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" atesting "github.com/elastic/elastic-agent/pkg/testing" @@ -377,6 +382,291 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { } } +// TestAgentMetricsInput is a test that compares documents ingested by +// agent system/metrics input in process and otel modes and asserts that they are +// equivalent. +func TestAgentMetricsInput(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: Default, + Local: true, + Sudo: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + + metricsets := []string{"cpu", "memory", "network", "filesystem"} + + type configOptions struct { + HomeDir string + ESEndpoint string + BeatsESApiKey string + FBReceiverIndex string + Namespace string + RuntimeExperimental string + Metricsets []string + } + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +inputs: + # Collecting system metrics + - type: system/metrics + id: unique-system-metrics-input + data_stream.namespace: {{.Namespace}} + use_output: default + {{if ne .RuntimeExperimental "" }} + _runtime_experimental: {{.RuntimeExperimental}} + {{end}} + streams: + {{range $mset := .Metricsets}} + - metricsets: + - {{$mset}} + data_stream.dataset: system.{{$mset}} + {{end}} +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: {{.BeatsESApiKey}} +` + + esEndpoint, err := getESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + + beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded) + require.NoError(t, err, "error decoding api key") + + tableTests := []struct { + name string + runtimeExperimental string + }{ + {name: "agent"}, + {name: "otel", runtimeExperimental: "otel"}, + } + + // map of testcase -> metricset -> documents + esDocs := make(map[string]map[string]estools.Documents) + + for _, tt := range tableTests { + t.Run(tt.name, func(t *testing.T) { + startedAt := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + tmpDir := t.TempDir() + + if _, ok := esDocs[tt.name]; !ok { + esDocs[tt.name] = make(map[string]estools.Documents) + } + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + BeatsESApiKey: string(beatsApiKey), + Namespace: info.Namespace, + RuntimeExperimental: tt.runtimeExperimental, + Metricsets: metricsets, + })) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Log("Contents of agent config file:\n") + println(string(configContents)) + } + }) + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + + err = fixture.Prepare(ctx) + require.NoError(t, err) + err = fixture.Configure(ctx, configContents) + require.NoError(t, err) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err) + cmd.WaitDelay = 1 * time.Second + + var output strings.Builder + cmd.Stderr = &output + cmd.Stdout = &output + + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + mustClauses := []map[string]any{ + {"range": map[string]any{ + "@timestamp": map[string]string{ + "gte": startedAt, + }, + }}, + } + + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": mustClauses, + }, + }, + } + + for _, mset := range metricsets { + index := fmt.Sprintf(".ds-metrics-system.%s-%s*", mset, info.Namespace) + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, index, info.ESClient) + require.NoError(ct, err) + + if docs.Hits.Total.Value != 0 { + esDocs[tt.name][mset] = docs + } + require.Greater(ct, docs.Hits.Total.Value, 0, "docs count") + }, + 30*time.Second, 1*time.Second, + "Expected to find at least one document for metricset %s in index %s and runtime %q, got 0", mset, index, tt.runtimeExperimental) + } + + cancel() + cmd.Wait() + }) + } + + t.Run("compare documents", func(t *testing.T) { + require.Greater(t, len(esDocs), 0, "expected to find documents ingested") + require.Greater(t, len(esDocs["agent"]), 0, "expected to find documents ingested by normal agent metrics input") + require.Greater(t, len(esDocs["otel"]), 0, "expected to find documents ingested by beat receivers") + + agentDocs := esDocs["agent"] + otelDocs := esDocs["otel"] + + // Fields that are present in both agent and otel documents, but are expected to change + ignoredFields := []string{ + "@timestamp", + "agent.id", + "agent.ephemeral_id", + "elastic_agent.id", + "data_stream.namespace", + "event.ingested", + "event.duration", + } + + stripNondeterminism := func(m mapstr.M, mset string) { + // These metrics will change from run to run + prefixes := []string{ + fmt.Sprintf("system.%s", mset), + fmt.Sprintf("host.%s", mset), + } + + for k := range m { + for _, prefix := range prefixes { + if strings.HasPrefix(k, prefix) { + m[k] = nil + } + } + } + } + + testCases := []struct { + metricset string + yieldDocsFunc func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) + }{ + { + metricset: "cpu", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + return agent[0].Source, otel[0].Source + }, + }, + { + metricset: "memory", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + return agent[0].Source, otel[0].Source + }, + }, + { + metricset: "network", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + // make sure we compare events from network interfaces and not host metrics + var agentDoc, otelDoc mapstr.M + for _, hit := range agent { + agentDoc = hit.Source + if ok, _ := agentDoc.Flatten().HasKey("system.network.name"); ok { + break + } + } + for _, hit := range otel { + otelDoc = hit.Source + if ok, _ := otelDoc.Flatten().HasKey("system.network.name"); ok { + break + } + } + return agentDoc, otelDoc + }, + }, + { + metricset: "filesystem", + yieldDocsFunc: func(agent []estools.ESDoc, otel []estools.ESDoc) (mapstr.M, mapstr.M) { + return agent[0].Source, otel[0].Source + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.metricset, func(t *testing.T) { + msetAgentDocs := agentDocs[tt.metricset].Hits.Hits + msetOtelDocs := otelDocs[tt.metricset].Hits.Hits + require.Greater(t, len(msetAgentDocs), 0, "expected to find agent documents for metricset %s", tt.metricset) + require.Greater(t, len(msetOtelDocs), 0, "expected to find otel documents for metricset %s", tt.metricset) + + agentDoc, otelDoc := tt.yieldDocsFunc(msetAgentDocs, msetOtelDocs) + agentDoc = agentDoc.Flatten() + otelDoc = otelDoc.Flatten() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("agent document for metricset %s:\n%s", tt.metricset, agentDoc.StringToPrint()) + t.Logf("otel document for metricset %s:\n%s", tt.metricset, otelDoc.StringToPrint()) + } + }) + + stripNondeterminism(agentDoc, tt.metricset) + stripNondeterminism(otelDoc, tt.metricset) + + AssertMapstrKeysEqual(t, agentDoc, otelDoc, nil, "expected documents keys to be equal for metricset "+tt.metricset) + AssertMapsEqual(t, agentDoc, otelDoc, ignoredFields, "expected documents to be equal for metricset "+tt.metricset) + }) + + } + }) +} + func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) { assert.Equal(t, int(cproto.CollectorComponentStatus_StatusOK), status.Status, "component status should be ok") assert.Equal(t, "", status.Error, "component status should not have an error")