diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 3adf3344cb2..78f11c35542 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -414,19 +414,19 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo componentInfo{ ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID), BinaryName: metricBeatName, - RuntimeManager: component.DefaultRuntimeManager, + RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), }, componentInfo{ ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID), BinaryName: metricBeatName, - RuntimeManager: component.DefaultRuntimeManager, + RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), }) } if b.config.C.MonitorLogs { componentInfos = append(componentInfos, componentInfo{ ID: monitoringFilesUnitsID, BinaryName: fileBeatName, - RuntimeManager: component.DefaultRuntimeManager, + RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), }) } // sort the components to ensure a consistent order of inputs in the configuration @@ -444,15 +444,19 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfo streams = append(streams, b.getServiceComponentFilestreamStreams(componentInfos)...) - inputs := []interface{}{ - map[string]interface{}{ - idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID), - "name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID), - "type": "filestream", - useOutputKey: monitoringOutput, - "streams": streams, - }, + input := map[string]interface{}{ + idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID), + "name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID), + "type": "filestream", + useOutputKey: monitoringOutput, + "streams": streams, + } + // Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled + if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager { + input["_runtime_experimental"] = b.config.C.RuntimeManager } + + inputs := []any{input} inputsNode, found := cfg[inputsKey] if !found { return fmt.Errorf("no inputs in config") @@ -518,6 +522,14 @@ func (b *BeatsMonitor) injectMetricsInput( }, } + // Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled + if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager { + for _, input := range inputs { + inputMap := input.(map[string]interface{}) + inputMap["_runtime_experimental"] = b.config.C.RuntimeManager + } + } + // add system/process metrics for services that can't be monitored via json/beats metrics inputs = append(inputs, b.getServiceComponentProcessMetricInputs( componentInfos, metricsCollectionIntervalString)...) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 22a57bf2b90..c8d4e717338 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -39,6 +39,7 @@ func TestMonitoringFull(t *testing.T) { HTTP: &monitoringcfg.MonitoringHTTPConfig{ Enabled: true, }, + RuntimeManager: monitoringcfg.DefaultRuntimeManager, }, }, agentInfo: agentInfo, @@ -863,6 +864,7 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) { HTTP: &monitoringcfg.MonitoringHTTPConfig{ Enabled: false, }, + RuntimeManager: monitoringcfg.DefaultRuntimeManager, }, } @@ -914,7 +916,9 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) { // Verify that if we're using filebeat receiver, there's no filebeat input var monitoringCfg struct { Inputs []struct { - Streams []struct { + ID string + RuntimeManager string `mapstructure:"_runtime_experimental"` + Streams []struct { Path string `mapstructure:"path"` } } @@ -931,7 +935,72 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) { } } } - assert.Len(t, streamsForInputMetrics, 2) // we have two filebeats running: filestream-process and monitoring + assert.Len(t, streamsForInputMetrics, 2) +} + +func TestMonitoringWithOtelRuntime(t *testing.T) { + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + cfg := &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorLogs: true, + MonitorMetrics: true, + Namespace: "test", + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + RuntimeManager: monitoringcfg.OtelRuntimeManager, + }, + } + + policy := map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "logs": false, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + } + + b := &BeatsMonitor{ + enabled: true, + config: cfg, + agentInfo: agentInfo, + } + + components := []component.Component{ + { + ID: "filestream-receiver", + InputSpec: &component.InputRuntimeSpec{ + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Name: "filebeat", + }, + }, + }, + RuntimeManager: component.OtelRuntimeManager, + }, + } + monitoringCfgMap, err := b.MonitoringConfig(policy, components, map[string]uint64{}) + require.NoError(t, err) + + // Verify that if we're using filebeat receiver, there's no filebeat input + var monitoringCfg struct { + Inputs []struct { + ID string + RuntimeManager string `mapstructure:"_runtime_experimental"` + } + } + err = mapstructure.Decode(monitoringCfgMap, &monitoringCfg) + require.NoError(t, err) + for _, input := range monitoringCfg.Inputs { + assert.Equal(t, monitoringcfg.OtelRuntimeManager, input.RuntimeManager) + } } func TestEnrichArgs(t *testing.T) { diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index 263caddc08f..cb0e19987d1 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -16,7 +16,10 @@ const ( defaultNamespace = "default" // DefaultHost is used when host is not defined or empty - DefaultHost = "localhost" + DefaultHost = "localhost" + ProcessRuntimeManager = "process" + OtelRuntimeManager = "otel" + DefaultRuntimeManager = ProcessRuntimeManager ) // MonitoringConfig describes a configuration of a monitoring @@ -33,6 +36,7 @@ type MonitoringConfig struct { MonitorTraces bool `yaml:"traces" config:"traces"` APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"` Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"` + RuntimeManager string `yaml:"_runtime_experimental,omitempty" config:"_runtime_experimental,omitempty"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent @@ -118,9 +122,10 @@ func DefaultConfig() *MonitoringConfig { Host: DefaultHost, Port: defaultPort, }, - Namespace: defaultNamespace, - APM: defaultAPMConfig(), - Diagnostics: defaultDiagnostics(), + Namespace: defaultNamespace, + APM: defaultAPMConfig(), + Diagnostics: defaultDiagnostics(), + RuntimeManager: DefaultRuntimeManager, } } diff --git a/testing/integration/beat_receivers_test.go b/testing/integration/beat_receivers_test.go index fc8582c12b7..b69a5b82f3a 100644 --- a/testing/integration/beat_receivers_test.go +++ b/testing/integration/beat_receivers_test.go @@ -7,28 +7,28 @@ package integration import ( - "bytes" "context" "fmt" "io" "net/http" "runtime" "testing" - "text/template" "time" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" + + "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/testing/estools" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" - "github.com/elastic/elastic-agent/pkg/utils" - - "github.com/gofrs/uuid/v5" - "github.com/stretchr/testify/require" ) var ( @@ -79,6 +79,67 @@ func TestAgentMonitoring(t *testing.T) { Force: true, } + // prepare the policy and marshalled configuration + policyCtx, policyCancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + t.Cleanup(policyCancel) + + // 1. Create and install policy with just monitoring + createPolicyReq := kibana.AgentPolicy{ + Name: fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()), + Namespace: info.Namespace, + Description: fmt.Sprintf("%s policy", t.Name()), + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + } + policyResponse, err := info.KibanaClient.CreatePolicy(policyCtx, createPolicyReq) + require.NoError(t, err, "error creating policy") + + // 2. Download the policy, add the API key + downloadURL := fmt.Sprintf("/api/fleet/agent_policies/%s/download", policyResponse.ID) + resp, err := info.KibanaClient.Connection.SendWithContext(policyCtx, http.MethodGet, downloadURL, nil, nil, nil) + require.NoError(t, err, "error downloading policy") + policyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err, "error reading policy response") + defer resp.Body.Close() + + apiKeyResponse, err := createESApiKey(info.ESClient) + require.NoError(t, err, "failed to get api key") + require.True(t, len(apiKeyResponse.Encoded) > 1, "api key is invalid %q", apiKeyResponse) + apiKey, err := getDecodedApiKey(apiKeyResponse) + require.NoError(t, err, "error decoding api key") + + type PolicyOutputs struct { + Type string `yaml:"type"` + Hosts []string `yaml:"hosts"` + Preset string `yaml:"preset"` + ApiKey string `yaml:"api_key"` + } + type PolicyStruct struct { + ID string `yaml:"id"` + Revision int `yaml:"revision"` + Outputs map[string]PolicyOutputs `yaml:"outputs"` + Fleet map[string]any `yaml:"fleet"` + OutputPermissions map[string]any `yaml:"output_permissions"` + Agent struct { + Monitoring map[string]any `yaml:"monitoring"` + Rest map[string]any `yaml:",inline"` + } `yaml:"agent"` + Inputs []map[string]any `yaml:"inputs"` + Signed map[string]any `yaml:"signed"` + SecretReferences []map[string]any `yaml:"secret_references"` + Namespaces []map[string]any `yaml:"namespaces"` + } + + policy := PolicyStruct{} + err = yaml.Unmarshal(policyBytes, &policy) + require.NoError(t, err, "error unmarshalling policy") + d, prs := policy.Outputs["default"] + require.True(t, prs, "default must be in outputs") + d.ApiKey = string(apiKey) + policy.Outputs["default"] = d + // Flow // 1. Create and install policy with just monitoring // 2. Download the policy, add the API key @@ -86,66 +147,18 @@ func TestAgentMonitoring(t *testing.T) { // 4. Make sure logs and metrics for agent monitoring are being received t.Run("verify elastic-agent monitoring functionality", func(t *testing.T) { ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) - defer cancel() - - // 1. Create and install policy with just monitoring - createPolicyReq := kibana.AgentPolicy{ - Name: fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()), - Namespace: info.Namespace, - Description: fmt.Sprintf("%s policy", t.Name()), - MonitoringEnabled: []kibana.MonitoringEnabledOption{ - kibana.MonitoringEnabledLogs, - kibana.MonitoringEnabledMetrics, - }, - } - policyResponse, err := info.KibanaClient.CreatePolicy(ctx, createPolicyReq) - require.NoError(t, err, "error creating policy") - - // 2. Download the policy, add the API key - downloadURL := fmt.Sprintf("/api/fleet/agent_policies/%s/download", policyResponse.ID) - resp, err := info.KibanaClient.Connection.SendWithContext(ctx, http.MethodGet, downloadURL, nil, nil, nil) - require.NoError(t, err, "error downloading policy") - policy, err := io.ReadAll(resp.Body) - require.NoError(t, err, "error reading policy response") - defer resp.Body.Close() - - apiKeyResponse, err := createESApiKey(info.ESClient) - require.NoError(t, err, "failed to get api key") - require.True(t, len(apiKeyResponse.Encoded) > 1, "api key is invalid %q", apiKeyResponse) - apiKey, err := getDecodedApiKey(apiKeyResponse) - require.NoError(t, err, "error decoding api key") - - type PolicyOutputs struct { - Type string `yaml:"type"` - Hosts []string `yaml:"hosts"` - Preset string `yaml:"preset"` - ApiKey string `yaml:"api_key"` - } - type PolicyStruct struct { - ID string `yaml:"id"` - Revision int `yaml:"revision"` - Outputs map[string]PolicyOutputs `yaml:"outputs"` - Fleet map[string]any `yaml:"fleet"` - OutputPermissions map[string]any `yaml:"output_permissions"` - Agent map[string]any `yaml:"agent"` - Inputs []map[string]any `yaml:"inputs"` - Signed map[string]any `yaml:"signed"` - SecretReferences []map[string]any `yaml:"secret_references"` - Namespaces []map[string]any `yaml:"namespaces"` - } + t.Cleanup(cancel) - y := PolicyStruct{} - err = yaml.Unmarshal(policy, &y) - require.NoError(t, err, "error unmarshalling policy") - d, prs := y.Outputs["default"] - require.True(t, prs, "default must be in outputs") - d.ApiKey = string(apiKey) - y.Outputs["default"] = d - policyBytes, err := yaml.Marshal(y) - require.NoErrorf(t, err, "error marshalling policy, struct was %v", y) + // beats processes and beats receivers should use a different namespace to ensure each test looks only at the + // right data + actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "process") + policy.Agent.Monitoring["namespace"] = actualNamespace + + updatedPolicyBytes, err := yaml.Marshal(policy) + require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) t.Cleanup(func() { if t.Failed() { - t.Logf("policy was %s", string(policyBytes)) + t.Logf("policy was %s", string(updatedPolicyBytes)) } }) @@ -156,7 +169,7 @@ func TestAgentMonitoring(t *testing.T) { err = fixture.Prepare(ctx) require.NoError(t, err, "error preparing fixture") - err = fixture.Configure(ctx, policyBytes) + err = fixture.Configure(ctx, updatedPolicyBytes) require.NoError(t, err, "error configuring fixture") output, err := fixture.InstallWithoutEnroll(ctx, &installOpts) @@ -179,7 +192,7 @@ func TestAgentMonitoring(t *testing.T) { mustClauses := []map[string]any{ {"match": map[string]any{"data_stream.type": tc.dsType}}, {"match": map[string]any{"data_stream.dataset": tc.dsDataset}}, - {"match": map[string]any{"data_stream.namespace": tc.dsNamespace}}, + {"match": map[string]any{"data_stream.namespace": actualNamespace}}, } // Only add the "message" match if tc.message is not empty @@ -225,198 +238,66 @@ func TestAgentMonitoring(t *testing.T) { }, "monitoring logs from elastic-agent should exist before proceeding", ) - type configOptions struct { - InputPath string - ESEndpoint string - ESApiKey string - SocketEndpoint string - Namespace string - } - esEndpoint, err := getESHost() - require.NoError(t, err, "error getting elasticsearch endpoint") - esApiKey, err := createESApiKey(info.ESClient) - require.NoError(t, err, "error creating API key") - require.NotEmptyf(t, esApiKey.Encoded, "api key is invalid %q", esApiKey) - - // Start monitoring in otel mode - fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) - require.NoError(t, err) - ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) - defer cancel() + t.Cleanup(cancel) - err = fixture.Prepare(ctx) - require.NoError(t, err) - - // installs elastic-agent with empty elastic-agent.yml to get its working dir first - err = fixture.Configure(ctx, []byte{}) - require.NoError(t, err) + // beats processes and beats receivers should use a different namespace to ensure each test looks only at the + // right data + actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "otel") + policy.Agent.Monitoring["namespace"] = actualNamespace - output, err := fixture.InstallWithoutEnroll(ctx, &installOpts) - require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output)) + // switch monitoring to the otel runtime + policy.Agent.Monitoring["_runtime_experimental"] = "otel" - // Ensure elastic-agent is healthy, otherwise we cannot perform retstart operation - require.Eventually(t, func() bool { - err = fixture.IsHealthy(ctx) - if err != nil { - t.Logf("waiting for agent healthy: %s", err.Error()) - return false - } - return true - }, 30*time.Second, 1*time.Second) - - configTemplateOTel := ` -receivers: - filebeatreceiver/filestream-monitoring: - filebeat: - inputs: - - type: filestream - enabled: true - id: filestream-monitoring-agent - paths: - - {{.InputPath}}/data/elastic-agent-*/logs/elastic-agent-*.ndjson - - {{.InputPath}}/data/elastic-agent-*/logs/elastic-agent-watcher-*.ndjson - close: - on_state_change: - inactive: 5m - parsers: - - ndjson: - add_error_key: true - message_key: message - overwrite_keys: true - target: "" - processors: - - add_fields: - fields: - dataset: elastic_agent - namespace: {{.Namespace}} - type: logs - target: data_stream - - add_fields: - fields: - dataset: elastic_agent - target: event - - add_fields: - fields: - id: 0ddca301-e7c0-4eac-8432-7dd05bc9cb06 - snapshot: false - version: 8.19.0 - target: elastic_agent - - add_fields: - fields: - id: 0879f47d-df41-464d-8462-bc2b8fef45bf - target: agent - - drop_event: - when: - regexp: - component.id: .*-monitoring$ - - drop_event: - when: - regexp: - message: ^Non-zero metrics in the last - - copy_fields: - fields: - - from: data_stream.dataset - to: data_stream.dataset_original - - drop_fields: - fields: - - data_stream.dataset - - copy_fields: - fail_on_error: false - fields: - - from: component.dataset - to: data_stream.dataset - ignore_missing: true - - copy_fields: - fail_on_error: false - fields: - - from: data_stream.dataset_original - to: data_stream.dataset - - drop_fields: - fields: - - data_stream.dataset_original - - event.dataset - - copy_fields: - fields: - - from: data_stream.dataset - to: event.dataset - - drop_fields: - fields: - - ecs.version - ignore_missing: true - output: - otelconsumer: - queue: - mem: - flush: - timeout: 0s - logging: - level: info - selectors: - - '*' - http.enabled: true - http.host: {{ .SocketEndpoint }} -exporters: - debug: - use_internal_logger: false - verbosity: detailed - elasticsearch/log: - endpoints: - - {{.ESEndpoint}} - compression: none - api_key: {{.ESApiKey}} - logs_dynamic_index: - enabled: true - batcher: - enabled: true - flush_timeout: 0.5s - mapping: - mode: bodymap -service: - pipelines: - logs: - receivers: - - filebeatreceiver/filestream-monitoring - exporters: - - elasticsearch/log -` - socketEndpoint := utils.SocketURLWithFallback(uuid.Must(uuid.NewV4()).String(), paths.TempDir()) - - // configure elastic-agent.yml with new config - var configBuffer bytes.Buffer - template.Must(template.New("config").Parse(configTemplateOTel)).Execute(&configBuffer, - configOptions{ - InputPath: fixture.WorkDir(), - ESEndpoint: esEndpoint, - ESApiKey: esApiKey.Encoded, - SocketEndpoint: socketEndpoint, - Namespace: info.Namespace, - }) - configOTelContents := configBuffer.Bytes() + updatedPolicyBytes, err := yaml.Marshal(policy) + require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) t.Cleanup(func() { if t.Failed() { - t.Logf("Contents of agent config file:\n%s\n", string(configOTelContents)) + t.Logf("policy was %s", string(updatedPolicyBytes)) } }) - err = fixture.Configure(ctx, configOTelContents) + + // 3. Install without enrolling in fleet + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) require.NoError(t, err) - // Get the timestamp before restarting. Required to separate logs from agent and otel - timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + err = fixture.Prepare(ctx) + require.NoError(t, err, "error preparing fixture") - fmt.Println(time.Now()) - // Restart elastic-agent - output, err = fixture.Exec(ctx, []string{"restart"}) - require.NoErrorf(t, err, "error restarting agent: %s\ncombinedoutput:\n%s", err, string(output)) + err = fixture.Configure(ctx, updatedPolicyBytes) + require.NoError(t, err, "error configuring fixture") - require.Eventually(t, func() bool { - err = fixture.IsHealthy(ctx) - if err != nil { - t.Logf("waiting for agent healthy: %s", err.Error()) - return false - } - return true - }, 30*time.Second, 1*time.Second) + output, err := fixture.InstallWithoutEnroll(ctx, &installOpts) + require.NoErrorf(t, err, "error install without enroll: %s\ncombinedoutput:\n%s", err, string(output)) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + status, statusErr := fixture.ExecStatus(ctx) + assert.NoError(collect, statusErr) + // agent should be healthy + assert.Equal(collect, int(cproto.State_HEALTHY), status.State) + // we should have no normal components running + assert.Zero(collect, len(status.Components)) + + // we should have filebeatreceiver and metricbeatreceiver running + otelCollectorStatus := status.Collector + assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status) + pipelineStatusMap := otelCollectorStatus.ComponentStatusMap + + // we should have 3 pipelines running: filestream for logs, http metrics and beats metrics + assert.Equal(collect, 3, len(pipelineStatusMap)) + + fileStreamPipeline := "pipeline:logs/_agent-component/filestream-monitoring" + httpMetricsPipeline := "pipeline:logs/_agent-component/http/metrics-monitoring" + beatsMetricsPipeline := "pipeline:logs/_agent-component/beat/metrics-monitoring" + assert.Contains(collect, pipelineStatusMap, fileStreamPipeline) + assert.Contains(collect, pipelineStatusMap, httpMetricsPipeline) + assert.Contains(collect, pipelineStatusMap, beatsMetricsPipeline) + + // and all the components should be healthy + assertCollectorComponentsHealthy(collect, otelCollectorStatus) + + return + }, 1*time.Minute, 1*time.Second) // run this only for logs for now tc := tests[0] @@ -426,11 +307,9 @@ service: defer findCancel() mustClauses := []map[string]any{ {"match": map[string]any{"message": tc.message}}, - {"range": map[string]interface{}{ - "@timestamp": map[string]string{ - "gte": timestamp, // Use captured timestamp - }, - }}, + {"match": map[string]any{"data_stream.type": tc.dsType}}, + {"match": map[string]any{"data_stream.dataset": tc.dsDataset}}, + {"match": map[string]any{"data_stream.namespace": actualNamespace}}, } rawQuery := map[string]any{ @@ -457,25 +336,30 @@ service: // Expected to change between agentDocs and OtelDocs "@timestamp", "agent.ephemeral_id", + // agent.id is different because it's the id of the underlying beat "agent.id", + // agent.version is different because we force version 9.0.0 in CI "agent.version", + // elastic_agent.id is different because we currently start a new agent in the second subtest + // this should be fixed in the future + "elastic_agent.id", "data_stream.namespace", "log.file.inode", "log.file.fingerprint", "log.file.path", "log.offset", - - // needs investigation - "event.agent_id_status", "event.ingested", - - // elastic_agent * fields are hardcoded in processor list for now which is why they differ - "elastic_agent.id", - "elastic_agent.snapshot", - "elastic_agent.version", } AssertMapsEqual(t, agent, otel, ignoredFields, "expected documents to be equal") }) } + +func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) { + assert.Equal(t, int(cproto.CollectorComponentStatus_StatusOK), status.Status, "component status should be ok") + assert.Equal(t, "", status.Error, "component status should not have an error") + for _, componentStatus := range status.ComponentStatusMap { + assertCollectorComponentsHealthy(t, componentStatus) + } +}