diff --git a/testing/integration/beat_receivers_test.go b/testing/integration/beat_receivers_test.go index c0b03fd137..f92ba9884d 100644 --- a/testing/integration/beat_receivers_test.go +++ b/testing/integration/beat_receivers_test.go @@ -11,74 +11,129 @@ import ( "fmt" "io" "net/http" - "runtime" "testing" "time" - "github.com/stretchr/testify/assert" - - "gopkg.in/yaml.v2" - - "github.com/elastic/elastic-agent/pkg/control/v2/cproto" - - "github.com/gofrs/uuid/v5" - "github.com/stretchr/testify/require" - "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" -) + "github.com/gofrs/uuid/v5" + "gopkg.in/yaml.v2" -var ( - agentDocs map[string]estools.Documents - otelDocs map[string]estools.Documents - commonLogMessage = "Determined allowed capabilities" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -// TestAgentMonitoring is a test to provide a baseline for what -// elastic-agent monitoring looks like with classic monitoring. It -// will be expanded in the future to compare with beats receivers for -// elastic-agent monitoring. -func TestAgentMonitoring(t *testing.T) { +// TestClassicAndReceiverAgentMonitoring is a test to compare documents ingested by +// elastic-agent monitoring classic mode vs otel mode +func TestClassicAndReceiverAgentMonitoring(t *testing.T) { info := define.Require(t, define.Requirements{ Group: Default, Local: true, OS: []define.OS{ {Type: define.Linux}, {Type: define.Darwin}, - {Type: define.Windows}, }, Stack: &define.Stack{}, Sudo: true, }) - agentDocs = make(map[string]estools.Documents) - otelDocs = make(map[string]estools.Documents) + agentDocs := make(map[string]estools.Documents) + otelDocs := make(map[string]estools.Documents) // Tests logs and metrics are present type test struct { - dsType string - dsDataset string - dsNamespace string - message string + dsType string + dsDataset string + dsNamespace string + query map[string]any + onlyCompareKeys bool + ignoreFields []string } tests := []test{ - {dsType: "logs", dsDataset: "elastic_agent", dsNamespace: info.Namespace, message: commonLogMessage}, - {dsType: "metrics", dsDataset: "elastic_agent.elastic_agent", dsNamespace: info.Namespace}, - {dsType: "metrics", dsDataset: "elastic_agent.filebeat", dsNamespace: info.Namespace}, - {dsType: "metrics", dsDataset: "elastic_agent.filebeat_input", dsNamespace: info.Namespace}, - {dsType: "metrics", dsDataset: "elastic_agent.metricbeat", dsNamespace: info.Namespace}, + { + dsType: "logs", + dsDataset: "elastic_agent", + dsNamespace: info.Namespace, + query: map[string]any{"match_phrase": map[string]any{"message": "Determined allowed capabilities"}}, + onlyCompareKeys: false, + }, + + { + dsType: "metrics", + dsDataset: "elastic_agent.filebeat", + dsNamespace: info.Namespace, + query: map[string]any{"exists": map[string]any{"field": "beat.stats.system.cpu.cores"}}, + onlyCompareKeys: true, + ignoreFields: []string{ + // all process related metrics are dropped for beatreceivers + "beat.stats.cgroup", + "beat.stats.cpu", + "beat.stats.handles", + "beat.stats.memstats", + "beat.stats.runtime", + "beat.elasticsearch.cluster.id", + "beat.stats.libbeat.config", + }, + }, + { + dsType: "metrics", + dsDataset: "elastic_agent.metricbeat", + dsNamespace: info.Namespace, + query: map[string]any{"exists": map[string]any{"field": "beat.stats.system.cpu.cores"}}, + onlyCompareKeys: true, + ignoreFields: []string{ + // all process related metrics are dropped for beatreceivers + "beat.stats.cgroup", + "beat.stats.cpu", + "beat.stats.handles", + "beat.stats.memstats", + "beat.stats.runtime", + "beat.elasticsearch.cluster.id", + "beat.stats.libbeat.config", + }, + }, + { + dsType: "metrics", + dsDataset: "elastic_agent.elastic_agent", + dsNamespace: info.Namespace, + onlyCompareKeys: true, + query: map[string]any{"exists": map[string]any{"field": "system.process.memory.size"}}, + }, + // TODO: fbreceiver must support /inputs/ endpoint for this to work + // { + // dsType: "metrics", + // dsDataset: "elastic_agent.filebeat_input", + // dsNamespace: info.Namespace, + // query: map[string]any{"exists": map[string]any{"field": "filebeat_input.bytes_processed_total"}}, + // }, } installOpts := atesting.InstallOpts{ NonInteractive: true, Privileged: true, Force: true, + Develop: true, } + // Flow + // 1. Start elastic agent monitoring in classic mode (configure, install and wait for elastic-agent healthy) + // 2. Assert monitoring logs and metrics are available on ES + // 3. Uninstall + + // 4. Start elastic agent monitoring in otel mode + // 5. Assert monitoring logs and metrics are available on ES (for otel mode) + // 6. Uninstall + + // 7. Compare both documents are equivalent + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + t.Cleanup(cancel) + // prepare the policy and marshalled configuration policyCtx, policyCancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) t.Cleanup(policyCancel) @@ -140,199 +195,161 @@ func TestAgentMonitoring(t *testing.T) { d.ApiKey = string(apiKey) policy.Outputs["default"] = d - // Flow - // 1. Create and install policy with just monitoring - // 2. Download the policy, add the API key - // 3. Install without enrolling in fleet - // 4. Make sure logs and metrics for agent monitoring are being received - t.Run("verify elastic-agent monitoring functionality", func(t *testing.T) { - ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) - t.Cleanup(cancel) - - // beats processes and beats receivers should use a different namespace to ensure each test looks only at the - // right data - actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "process") - policy.Agent.Monitoring["namespace"] = actualNamespace - - updatedPolicyBytes, err := yaml.Marshal(policy) - require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) - t.Cleanup(func() { - if t.Failed() { - t.Logf("policy was %s", string(updatedPolicyBytes)) - } - }) - - // 3. Install without enrolling in fleet - fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) - require.NoError(t, err) - - err = fixture.Prepare(ctx) - require.NoError(t, err, "error preparing fixture") - - err = fixture.Configure(ctx, updatedPolicyBytes) - require.NoError(t, err, "error configuring fixture") - - output, err := fixture.InstallWithoutEnroll(ctx, &installOpts) - require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output)) - - require.Eventually(t, func() bool { - err = fixture.IsHealthy(ctx) - if err != nil { - t.Logf("waiting for agent healthy: %s", err.Error()) - return false - } - return true - }, 1*time.Minute, 1*time.Second) - - for _, tc := range tests { - require.Eventuallyf(t, - func() bool { - findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) - defer findCancel() - mustClauses := []map[string]any{ - {"match": map[string]any{"data_stream.type": tc.dsType}}, - {"match": map[string]any{"data_stream.dataset": tc.dsDataset}}, - {"match": map[string]any{"data_stream.namespace": actualNamespace}}, - } - - // Only add the "message" match if tc.message is not empty - // This conditional check will not be required when test for metrics is included - if tc.message != "" { - mustClauses = append(mustClauses, map[string]any{ - "match": map[string]any{"message": tc.message}, - }) - } - - rawQuery := map[string]any{ - "query": map[string]any{ - "bool": map[string]any{ - "must": mustClauses, - }, - }, - } - - docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, tc.dsType+"-*", info.ESClient) - require.NoError(t, err) - if docs.Hits.Total.Value != 0 { - key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace - agentDocs[key] = docs - } - return docs.Hits.Total.Value > 0 - }, - 2*time.Minute, 5*time.Second, - "No documents found for type: %s, dataset: %s, namespace: %s", tc.dsType, tc.dsDataset, tc.dsNamespace) + updatedPolicyBytes, err := yaml.Marshal(policy) + require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) + t.Cleanup(func() { + if t.Failed() { + t.Logf("policy was %s", string(updatedPolicyBytes)) } }) - t.Run("compare logs ingested by agent monitoring vs otel monitoring", func(t *testing.T) { - // skipping this because the log-path should be handled differently in windows - if runtime.GOOS == "windows" { - t.Skip("skipping this test on windows for now") + classicFixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + err = classicFixture.Prepare(ctx) + require.NoError(t, err, "error preparing fixture") + + err = classicFixture.Configure(ctx, updatedPolicyBytes) + require.NoError(t, err, "error configuring fixture") + + output, err := classicFixture.InstallWithoutEnroll(ctx, &installOpts) + require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output)) + timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + require.Eventually(t, func() bool { + err = classicFixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false } + return true + }, 1*time.Minute, 1*time.Second) - // Not proceed with this test if monitoring logs from elastic-agent does not exist - monitoringLogIndex := "logs-elastic_agent-" + info.Namespace - require.NotPanics( - t, func() { - _ = agentDocs[monitoringLogIndex].Hits.Hits[0].Source - }, "monitoring logs from elastic-agent should exist before proceeding", - ) - - ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) - t.Cleanup(cancel) - - // beats processes and beats receivers should use a different namespace to ensure each test looks only at the - // right data - actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "otel") - policy.Agent.Monitoring["namespace"] = actualNamespace - - // switch monitoring to the otel runtime - policy.Agent.Monitoring["_runtime_experimental"] = "otel" - - updatedPolicyBytes, err := yaml.Marshal(policy) - require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) - t.Cleanup(func() { - if t.Failed() { - t.Logf("policy was %s", string(updatedPolicyBytes)) - } - }) - - // 3. Install without enrolling in fleet - fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) - require.NoError(t, err) - - err = fixture.Prepare(ctx) - require.NoError(t, err, "error preparing fixture") - - err = fixture.Configure(ctx, updatedPolicyBytes) - require.NoError(t, err, "error configuring fixture") - - output, err := fixture.InstallWithoutEnroll(ctx, &installOpts) - require.NoErrorf(t, err, "error install without enroll: %s\ncombinedoutput:\n%s", err, string(output)) - - require.EventuallyWithT(t, func(collect *assert.CollectT) { - status, statusErr := fixture.ExecStatus(ctx) - assert.NoError(collect, statusErr) - // agent should be healthy - assert.Equal(collect, int(cproto.State_HEALTHY), status.State) - // we should have no normal components running - assert.Zero(collect, len(status.Components)) - - // we should have filebeatreceiver and metricbeatreceiver running - otelCollectorStatus := status.Collector - require.NotNil(collect, otelCollectorStatus) - assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status) - pipelineStatusMap := otelCollectorStatus.ComponentStatusMap - - // we should have 3 pipelines running: filestream for logs, http metrics and beats metrics - assert.Equal(collect, 3, len(pipelineStatusMap)) - - fileStreamPipeline := "pipeline:logs/_agent-component/filestream-monitoring" - httpMetricsPipeline := "pipeline:logs/_agent-component/http/metrics-monitoring" - beatsMetricsPipeline := "pipeline:logs/_agent-component/beat/metrics-monitoring" - assert.Contains(collect, pipelineStatusMap, fileStreamPipeline) - assert.Contains(collect, pipelineStatusMap, httpMetricsPipeline) - assert.Contains(collect, pipelineStatusMap, beatsMetricsPipeline) - - // and all the components should be healthy - assertCollectorComponentsHealthy(collect, otelCollectorStatus) - - return - }, 1*time.Minute, 1*time.Second) - - // run this only for logs for now - tc := tests[0] + // 2. Assert monitoring logs and metrics are available on ES + for _, tc := range tests { require.Eventuallyf(t, func() bool { findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) defer findCancel() - mustClauses := []map[string]any{ - {"match": map[string]any{"message": tc.message}}, - {"match": map[string]any{"data_stream.type": tc.dsType}}, - {"match": map[string]any{"data_stream.dataset": tc.dsDataset}}, - {"match": map[string]any{"data_stream.namespace": actualNamespace}}, - } rawQuery := map[string]any{ "query": map[string]any{ "bool": map[string]any{ - "must": mustClauses, + "must": tc.query, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, }, }, } - docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+monitoringLogIndex+"*", info.ESClient) + index := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+index+"*", info.ESClient) require.NoError(t, err) if docs.Hits.Total.Value != 0 { - otelDocs[monitoringLogIndex] = docs + key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace + agentDocs[key] = docs } return docs.Hits.Total.Value > 0 }, 2*time.Minute, 5*time.Second, - "No documents found in otel mode for type : %s, dataset: %s, namespace: %s", tc.dsType, tc.dsDataset, tc.dsNamespace) + "agent monitoring classic no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestamp, tc.dsType, tc.dsDataset, tc.dsNamespace, tc.query) + } - agent := agentDocs[monitoringLogIndex].Hits.Hits[0].Source - otel := otelDocs[monitoringLogIndex].Hits.Hits[0].Source + // 3. Uninstall + combinedOutput, err := classicFixture.Uninstall(ctx, &atesting.UninstallOpts{Force: true}) + require.NoErrorf(t, err, "error uninstalling classic agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) + + // 4. switch monitoring to the otel runtime + policy.Agent.Monitoring["_runtime_experimental"] = "otel" + updatedPolicyBytes, err = yaml.Marshal(policy) + require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) + t.Cleanup(func() { + if t.Failed() { + t.Logf("policy was %s", string(updatedPolicyBytes)) + } + }) + + beatReceiverFixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + err = beatReceiverFixture.Prepare(ctx) + require.NoError(t, err) + err = beatReceiverFixture.Configure(ctx, updatedPolicyBytes) + require.NoError(t, err) + combinedOutput, err = beatReceiverFixture.InstallWithoutEnroll(ctx, &installOpts) + require.NoErrorf(t, err, "error install without enroll: %s\ncombinedoutput:\n%s", err, string(combinedOutput)) + // store timestamp to filter otel docs with timestamp greater than this value + timestampBeatReceiver := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + status, statusErr := beatReceiverFixture.ExecStatus(ctx) + assert.NoError(collect, statusErr) + // agent should be healthy + assert.Equal(collect, int(cproto.State_HEALTHY), status.State) + // we should have no normal components running + assert.Zero(collect, len(status.Components)) + + // we should have filebeatreceiver and metricbeatreceiver running + otelCollectorStatus := status.Collector + require.NotNil(collect, otelCollectorStatus) + assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status) + pipelineStatusMap := otelCollectorStatus.ComponentStatusMap + + // we should have 3 pipelines running: filestream for logs, http metrics and beats metrics + assert.Equal(collect, 3, len(pipelineStatusMap)) + + fileStreamPipeline := "pipeline:logs/_agent-component/filestream-monitoring" + httpMetricsPipeline := "pipeline:logs/_agent-component/http/metrics-monitoring" + beatsMetricsPipeline := "pipeline:logs/_agent-component/beat/metrics-monitoring" + assert.Contains(collect, pipelineStatusMap, fileStreamPipeline) + assert.Contains(collect, pipelineStatusMap, httpMetricsPipeline) + assert.Contains(collect, pipelineStatusMap, beatsMetricsPipeline) + + // and all the components should be healthy + assertCollectorComponentsHealthy(collect, otelCollectorStatus) + + return + }, 1*time.Minute, 1*time.Second) + + // 5. Assert monitoring logs and metrics are available on ES (for otel mode) + for _, tc := range tests { + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) + defer findCancel() + + rawQuery := map[string]any{ + "query": map[string]any{ + "bool": map[string]any{ + "must": tc.query, + "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestampBeatReceiver}}}, + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + index := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+index+"*", info.ESClient) + require.NoError(t, err) + if docs.Hits.Total.Value != 0 { + key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace + otelDocs[key] = docs + } + return docs.Hits.Total.Value > 0 + }, + 4*time.Minute, 5*time.Second, + "agent monitoring beats receivers no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestampBeatReceiver, tc.dsType, tc.dsDataset, tc.dsNamespace, tc.query) + } + + // 6. Uninstall + combinedOutput, err = beatReceiverFixture.Uninstall(ctx, &atesting.UninstallOpts{Force: true}) + require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput)) + + // 7. Compare both documents are equivalent + for _, tc := range tests[:3] { + key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace + agent := agentDocs[key].Hits.Hits[0].Source + otel := otelDocs[key].Hits.Hits[0].Source ignoredFields := []string{ // Expected to change between agentDocs and OtelDocs "@timestamp", @@ -341,20 +358,20 @@ func TestAgentMonitoring(t *testing.T) { "agent.id", // agent.version is different because we force version 9.0.0 in CI "agent.version", - // elastic_agent.id is different because we currently start a new agent in the second subtest - // this should be fixed in the future "elastic_agent.id", - "data_stream.namespace", "log.file.inode", "log.file.fingerprint", "log.file.path", "log.offset", "event.ingested", } - - AssertMapsEqual(t, agent, otel, ignoredFields, "expected documents to be equal") - }) - + switch tc.onlyCompareKeys { + case true: + AssertMapstrKeysEqual(t, agent, otel, append(ignoredFields, tc.ignoreFields...), fmt.Sprintf("expected document keys to be equal for dataset: %s", key)) + case false: + AssertMapsEqual(t, agent, otel, ignoredFields, fmt.Sprintf("expected document to be equal for dataset: %s", key)) + } + } } func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) { diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 17379cfaf3..93ee6bfe01 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1661,6 +1661,28 @@ func AssertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") } +func AssertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + + t.Helper() + // Delete all ignored fields. + for _, f := range ignoredFields { + _ = m1.Delete(f) + _ = m2.Delete(f) + } + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + + for k := range flatM1 { + flatM1[k] = "" + } + for k := range flatM2 { + flatM2[k] = "" + } + + require.Zero(t, cmp.Diff(flatM1, flatM2), "expected keys of maps to be equal") +} + func TestFBOtelRestartE2E(t *testing.T) { // This test ensures that filebeatreceiver is able to deliver logs even // in advent of a collector restart.