Skip to content

[otel]: Add e2e test for monitoring metrics in otel mode #8009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
May 30, 2025
Merged
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f07b76f
[Draft]: Add e2e test for monitoring metrics in otel mode
khushijain21 Apr 28, 2025
cee8091
add test for http/metrics
khushijain21 Apr 29, 2025
e3309ee
add ignored fields for http/metrics
khushijain21 Apr 29, 2025
24368eb
Merge branch 'main' into e2emetrics
khushijain21 Apr 29, 2025
1d47945
Merge remote-tracking branch 'refs/remotes/upstream/main' into e2emet…
khushijain21 May 5, 2025
b78e280
after support for default processor in mbreceiver
khushijain21 May 5, 2025
3ba9084
add support for beat/metrics
khushijain21 May 5, 2025
c917993
remove host processor
khushijain21 May 5, 2025
13b4664
Merge remote-tracking branch 'refs/remotes/upstream/main' into e2emet…
khushijain21 May 5, 2025
fb556b5
refactor
khushijain21 May 5, 2025
5020631
update ignored fields
khushijain21 May 5, 2025
86065ad
add missing newline
khushijain21 May 6, 2025
adb48da
refactor impelmentaion
khushijain21 May 7, 2025
d329105
Use template instead of policy
khushijain21 May 7, 2025
84a9061
Merge remote-tracking branch 'refs/remotes/upstream/main' into e2emet…
khushijain21 May 23, 2025
766d71e
refactor monitoring code
khushijain21 May 23, 2025
71ac038
refactor monitoring code
khushijain21 May 23, 2025
baf61f5
use runtime_experimental
khushijain21 May 23, 2025
9c7064f
drop process related fields
khushijain21 May 23, 2025
ea25080
Merge branch 'main' into e2emetrics
khushijain21 May 23, 2025
05be492
fix ut for monitoring
khushijain21 May 23, 2025
1e2b700
fix if condition for http streams
khushijain21 May 23, 2025
cff7efe
Merge branch 'main' into e2emetrics
khushijain21 May 26, 2025
a52437f
address comments
khushijain21 May 26, 2025
6eee04d
address review comments2
khushijain21 May 26, 2025
e20cbe5
Merge branch 'main' into e2emetrics
khushijain21 May 26, 2025
86d59e3
cleanup
khushijain21 May 27, 2025
3f4c583
add event.ingested
khushijain21 May 27, 2025
b3b1898
check ci
khushijain21 May 27, 2025
e11dee5
Merge branch 'main' into e2emetrics
khushijain21 May 27, 2025
a3eada8
pick metrics that have libbeat.output.events fields
khushijain21 May 27, 2025
6edea59
Merge branch 'main' into e2emetrics
khushijain21 May 27, 2025
fb56579
use policy instead of agent.monitoring section because that is what m…
khushijain21 May 28, 2025
ac0b0e8
Merge branch 'main' into e2emetrics
khushijain21 May 28, 2025
b8df1c4
Merge branch 'main' into e2emetrics
khushijain21 May 30, 2025
1fbed5e
sort ascending
khushijain21 May 30, 2025
2cec25c
fix check ci
khushijain21 May 30, 2025
7eb543c
fix tests
khushijain21 May 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
432 changes: 226 additions & 206 deletions testing/integration/beat_receivers_test.go
Original file line number Diff line number Diff line change
@@ -11,74 +11,130 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent/pkg/control/v2/cproto"

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-libs/kibana"
"github.com/elastic/elastic-agent-libs/testing/estools"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
)

var (
agentDocs map[string]estools.Documents
otelDocs map[string]estools.Documents
commonLogMessage = "Determined allowed capabilities"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestAgentMonitoring is a test to provide a baseline for what
// elastic-agent monitoring looks like with classic monitoring. It
// will be expanded in the future to compare with beats receivers for
// elastic-agent monitoring.
func TestAgentMonitoring(t *testing.T) {
// TestClassicAndReceiverAgentMonitoring is a test to compare documents ingested by
// elastic-agent monitoring classic mode vs otel mode
func TestClassicAndReceiverAgentMonitoring(t *testing.T) {
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
OS: []define.OS{
{Type: define.Linux},
{Type: define.Darwin},
{Type: define.Windows},
},
Stack: &define.Stack{},
Sudo: true,
})

agentDocs = make(map[string]estools.Documents)
otelDocs = make(map[string]estools.Documents)
agentDocs := make(map[string]estools.Documents)
otelDocs := make(map[string]estools.Documents)

// Tests logs and metrics are present
type test struct {
dsType string
dsDataset string
dsNamespace string
message string
dsType string
dsDataset string
dsNamespace string
query map[string]any
onlyCompareKeys bool
ignoreFields []string
}

tests := []test{
{dsType: "logs", dsDataset: "elastic_agent", dsNamespace: info.Namespace, message: commonLogMessage},
{dsType: "metrics", dsDataset: "elastic_agent.elastic_agent", dsNamespace: info.Namespace},
{dsType: "metrics", dsDataset: "elastic_agent.filebeat", dsNamespace: info.Namespace},
{dsType: "metrics", dsDataset: "elastic_agent.filebeat_input", dsNamespace: info.Namespace},
{dsType: "metrics", dsDataset: "elastic_agent.metricbeat", dsNamespace: info.Namespace},
{
dsType: "logs",
dsDataset: "elastic_agent",
dsNamespace: info.Namespace,
query: map[string]any{"match_phrase": map[string]any{"message": "Determined allowed capabilities"}},
onlyCompareKeys: false,
},

{
dsType: "metrics",
dsDataset: "elastic_agent.filebeat",
dsNamespace: info.Namespace,
query: map[string]any{"exists": map[string]any{"field": "beat.stats.libbeat.pipeline.queue.acked"}},
onlyCompareKeys: true,
ignoreFields: []string{
// all process related metrics are dropped for beatreceivers
"beat.stats.cgroup",
"beat.stats.cpu",
"beat.stats.handles",
"beat.stats.memstats",
"beat.stats.runtime",
"beat.elasticsearch.cluster.id",
"beat.stats.libbeat.config",
},
},
{
dsType: "metrics",
dsDataset: "elastic_agent.metricbeat",
dsNamespace: info.Namespace,
query: map[string]any{"exists": map[string]any{"field": "beat.stats.libbeat.pipeline.queue.acked"}},
onlyCompareKeys: true,
ignoreFields: []string{
// all process related metrics are dropped for beatreceivers
"beat.stats.cgroup",
"beat.stats.cpu",
"beat.stats.handles",
"beat.stats.memstats",
"beat.stats.runtime",
"beat.elasticsearch.cluster.id",
"beat.stats.libbeat.config",
},
},
{
dsType: "metrics",
dsDataset: "elastic_agent.elastic_agent",
dsNamespace: info.Namespace,
onlyCompareKeys: true,
query: map[string]any{"exists": map[string]any{"field": "system.process.memory.size"}},
},
// TODO: fbreceiver must support /inputs/ endpoint for this to work
// {
// dsType: "metrics",
// dsDataset: "elastic_agent.filebeat_input",
// dsNamespace: info.Namespace,
// query: map[string]any{"exists": map[string]any{"field": "filebeat_input.bytes_processed_total"}},
// },
}

installOpts := atesting.InstallOpts{
NonInteractive: true,
Privileged: true,
Force: true,
Develop: true,
}

// Flow
// 1. Start elastic agent monitoring in classic mode (configure, install and wait for elastic-agent healthy)
// 2. Assert monitoring logs and metrics are available on ES
// 3. Uninstall

// 4. Start elastic agent monitoring in otel mode
// 5. Assert monitoring logs and metrics are available on ES (for otel mode)
// 6. Uninstall

// 7. Compare both documents are equivalent

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
t.Cleanup(cancel)

// prepare the policy and marshalled configuration
policyCtx, policyCancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
t.Cleanup(policyCancel)
@@ -140,199 +196,163 @@ func TestAgentMonitoring(t *testing.T) {
d.ApiKey = string(apiKey)
policy.Outputs["default"] = d

// Flow
// 1. Create and install policy with just monitoring
// 2. Download the policy, add the API key
// 3. Install without enrolling in fleet
// 4. Make sure logs and metrics for agent monitoring are being received
t.Run("verify elastic-agent monitoring functionality", func(t *testing.T) {
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
t.Cleanup(cancel)

// beats processes and beats receivers should use a different namespace to ensure each test looks only at the
// right data
actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "process")
policy.Agent.Monitoring["namespace"] = actualNamespace

updatedPolicyBytes, err := yaml.Marshal(policy)
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
t.Cleanup(func() {
if t.Failed() {
t.Logf("policy was %s", string(updatedPolicyBytes))
}
})

// 3. Install without enrolling in fleet
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

err = fixture.Prepare(ctx)
require.NoError(t, err, "error preparing fixture")

err = fixture.Configure(ctx, updatedPolicyBytes)
require.NoError(t, err, "error configuring fixture")

output, err := fixture.InstallWithoutEnroll(ctx, &installOpts)
require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output))

require.Eventually(t, func() bool {
err = fixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 1*time.Minute, 1*time.Second)

for _, tc := range tests {
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second)
defer findCancel()
mustClauses := []map[string]any{
{"match": map[string]any{"data_stream.type": tc.dsType}},
{"match": map[string]any{"data_stream.dataset": tc.dsDataset}},
{"match": map[string]any{"data_stream.namespace": actualNamespace}},
}

// Only add the "message" match if tc.message is not empty
// This conditional check will not be required when test for metrics is included
if tc.message != "" {
mustClauses = append(mustClauses, map[string]any{
"match": map[string]any{"message": tc.message},
})
}

rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": mustClauses,
},
},
}

docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, tc.dsType+"-*", info.ESClient)
require.NoError(t, err)
if docs.Hits.Total.Value != 0 {
key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace
agentDocs[key] = docs
}
return docs.Hits.Total.Value > 0
},
2*time.Minute, 5*time.Second,
"No documents found for type: %s, dataset: %s, namespace: %s", tc.dsType, tc.dsDataset, tc.dsNamespace)
updatedPolicyBytes, err := yaml.Marshal(policy)
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
t.Cleanup(func() {
if t.Failed() {
t.Logf("policy was %s", string(updatedPolicyBytes))
}
})

t.Run("compare logs ingested by agent monitoring vs otel monitoring", func(t *testing.T) {
// skipping this because the log-path should be handled differently in windows
if runtime.GOOS == "windows" {
t.Skip("skipping this test on windows for now")
classicFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

err = classicFixture.Prepare(ctx)
require.NoError(t, err, "error preparing fixture")

err = classicFixture.Configure(ctx, updatedPolicyBytes)
require.NoError(t, err, "error configuring fixture")

output, err := classicFixture.InstallWithoutEnroll(ctx, &installOpts)
require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output))
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

require.Eventually(t, func() bool {
err = classicFixture.IsHealthy(ctx)
if err != nil {
t.Logf("waiting for agent healthy: %s", err.Error())
return false
}
return true
}, 1*time.Minute, 1*time.Second)

// Not proceed with this test if monitoring logs from elastic-agent does not exist
monitoringLogIndex := "logs-elastic_agent-" + info.Namespace
require.NotPanics(
t, func() {
_ = agentDocs[monitoringLogIndex].Hits.Hits[0].Source
}, "monitoring logs from elastic-agent should exist before proceeding",
)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
t.Cleanup(cancel)

// beats processes and beats receivers should use a different namespace to ensure each test looks only at the
// right data
actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "otel")
policy.Agent.Monitoring["namespace"] = actualNamespace

// switch monitoring to the otel runtime
policy.Agent.Monitoring["_runtime_experimental"] = "otel"

updatedPolicyBytes, err := yaml.Marshal(policy)
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
t.Cleanup(func() {
if t.Failed() {
t.Logf("policy was %s", string(updatedPolicyBytes))
}
})

// 3. Install without enrolling in fleet
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

err = fixture.Prepare(ctx)
require.NoError(t, err, "error preparing fixture")

err = fixture.Configure(ctx, updatedPolicyBytes)
require.NoError(t, err, "error configuring fixture")

output, err := fixture.InstallWithoutEnroll(ctx, &installOpts)
require.NoErrorf(t, err, "error install without enroll: %s\ncombinedoutput:\n%s", err, string(output))

require.EventuallyWithT(t, func(collect *assert.CollectT) {
status, statusErr := fixture.ExecStatus(ctx)
assert.NoError(collect, statusErr)
// agent should be healthy
assert.Equal(collect, int(cproto.State_HEALTHY), status.State)
// we should have no normal components running
assert.Zero(collect, len(status.Components))

// we should have filebeatreceiver and metricbeatreceiver running
otelCollectorStatus := status.Collector
require.NotNil(collect, otelCollectorStatus)
assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status)
pipelineStatusMap := otelCollectorStatus.ComponentStatusMap

// we should have 3 pipelines running: filestream for logs, http metrics and beats metrics
assert.Equal(collect, 3, len(pipelineStatusMap))

fileStreamPipeline := "pipeline:logs/_agent-component/filestream-monitoring"
httpMetricsPipeline := "pipeline:logs/_agent-component/http/metrics-monitoring"
beatsMetricsPipeline := "pipeline:logs/_agent-component/beat/metrics-monitoring"
assert.Contains(collect, pipelineStatusMap, fileStreamPipeline)
assert.Contains(collect, pipelineStatusMap, httpMetricsPipeline)
assert.Contains(collect, pipelineStatusMap, beatsMetricsPipeline)

// and all the components should be healthy
assertCollectorComponentsHealthy(collect, otelCollectorStatus)

return
}, 1*time.Minute, 1*time.Second)

// run this only for logs for now
tc := tests[0]
// 2. Assert monitoring logs and metrics are available on ES
for _, tc := range tests {
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second)
defer findCancel()
mustClauses := []map[string]any{
{"match": map[string]any{"message": tc.message}},
{"match": map[string]any{"data_stream.type": tc.dsType}},
{"match": map[string]any{"data_stream.dataset": tc.dsDataset}},
{"match": map[string]any{"data_stream.namespace": actualNamespace}},
}

rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": mustClauses,
"must": tc.query,
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+monitoringLogIndex+"*", info.ESClient)
index := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace
docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+index+"*", info.ESClient)
require.NoError(t, err)
if docs.Hits.Total.Value != 0 {
otelDocs[monitoringLogIndex] = docs
agentDocs[index] = docs
}
return docs.Hits.Total.Value > 0
},
2*time.Minute, 5*time.Second,
"No documents found in otel mode for type : %s, dataset: %s, namespace: %s", tc.dsType, tc.dsDataset, tc.dsNamespace)
"agent monitoring classic no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestamp, tc.dsType, tc.dsDataset, tc.dsNamespace, tc.query)
}

// 3. Uninstall
combinedOutput, err := classicFixture.Uninstall(ctx, &atesting.UninstallOpts{Force: true})
require.NoErrorf(t, err, "error uninstalling classic agent monitoring, err: %s, combined output: %s", err, string(combinedOutput))

// 4. switch monitoring to the otel runtime
policy.Agent.Monitoring["_runtime_experimental"] = "otel"
updatedPolicyBytes, err = yaml.Marshal(policy)
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
t.Cleanup(func() {
if t.Failed() {
t.Logf("policy was %s", string(updatedPolicyBytes))
}
})

beatReceiverFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)
err = beatReceiverFixture.Prepare(ctx)
require.NoError(t, err)
err = beatReceiverFixture.Configure(ctx, updatedPolicyBytes)
require.NoError(t, err)
combinedOutput, err = beatReceiverFixture.InstallWithoutEnroll(ctx, &installOpts)
require.NoErrorf(t, err, "error install without enroll: %s\ncombinedoutput:\n%s", err, string(combinedOutput))
// store timestamp to filter otel docs with timestamp greater than this value
timestampBeatReceiver := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")

require.EventuallyWithT(t, func(collect *assert.CollectT) {
status, statusErr := beatReceiverFixture.ExecStatus(ctx)
assert.NoError(collect, statusErr)
// agent should be healthy
assert.Equal(collect, int(cproto.State_HEALTHY), status.State)
// we should have no normal components running
assert.Zero(collect, len(status.Components))

// we should have filebeatreceiver and metricbeatreceiver running
otelCollectorStatus := status.Collector
require.NotNil(collect, otelCollectorStatus)
assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status)
pipelineStatusMap := otelCollectorStatus.ComponentStatusMap

// we should have 3 pipelines running: filestream for logs, http metrics and beats metrics
assert.Equal(collect, 3, len(pipelineStatusMap))

fileStreamPipeline := "pipeline:logs/_agent-component/filestream-monitoring"
httpMetricsPipeline := "pipeline:logs/_agent-component/http/metrics-monitoring"
beatsMetricsPipeline := "pipeline:logs/_agent-component/beat/metrics-monitoring"
assert.Contains(collect, pipelineStatusMap, fileStreamPipeline)
assert.Contains(collect, pipelineStatusMap, httpMetricsPipeline)
assert.Contains(collect, pipelineStatusMap, beatsMetricsPipeline)

// and all the components should be healthy
assertCollectorComponentsHealthy(collect, otelCollectorStatus)

return
}, 1*time.Minute, 1*time.Second)

// 5. Assert monitoring logs and metrics are available on ES (for otel mode)
for _, tc := range tests {
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second)
defer findCancel()

rawQuery := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": tc.query,
"filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestampBeatReceiver}}},
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

index := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace
docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+index+"*", info.ESClient)
require.NoError(t, err)
if docs.Hits.Total.Value != 0 {
key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace
otelDocs[key] = docs
}
return docs.Hits.Total.Value > 0
},
4*time.Minute, 5*time.Second,
"agent monitoring beats receivers no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestampBeatReceiver, tc.dsType, tc.dsDataset, tc.dsNamespace, tc.query)
}

agent := agentDocs[monitoringLogIndex].Hits.Hits[0].Source
otel := otelDocs[monitoringLogIndex].Hits.Hits[0].Source
// 6. Uninstall
combinedOutput, err = beatReceiverFixture.Uninstall(ctx, &atesting.UninstallOpts{Force: true})
require.NoErrorf(t, err, "error uninstalling beat receiver agent monitoring, err: %s, combined output: %s", err, string(combinedOutput))

// 7. Compare both documents are equivalent
for _, tc := range tests[:3] {
key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace
agent := agentDocs[key].Hits.Hits[0].Source
otel := otelDocs[key].Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
@@ -341,20 +361,20 @@ func TestAgentMonitoring(t *testing.T) {
"agent.id",
// agent.version is different because we force version 9.0.0 in CI
"agent.version",
// elastic_agent.id is different because we currently start a new agent in the second subtest
// this should be fixed in the future
"elastic_agent.id",
"data_stream.namespace",
"log.file.inode",
"log.file.fingerprint",
"log.file.path",
"log.offset",
"event.ingested",
}

AssertMapsEqual(t, agent, otel, ignoredFields, "expected documents to be equal")
})

switch tc.onlyCompareKeys {
case true:
AssertMapstrKeysEqual(t, agent, otel, append(ignoredFields, tc.ignoreFields...), fmt.Sprintf("expected document keys to be equal for dataset: %s", key))
case false:
AssertMapsEqual(t, agent, otel, ignoredFields, fmt.Sprintf("expected document to be equal for dataset: %s", key))
}
}
}

func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) {
22 changes: 22 additions & 0 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
@@ -1661,6 +1661,28 @@ func AssertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg
require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal")
}

func AssertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {

t.Helper()
// Delete all ignored fields.
for _, f := range ignoredFields {
_ = m1.Delete(f)
_ = m2.Delete(f)
}

flatM1 := m1.Flatten()
flatM2 := m2.Flatten()

for k := range flatM1 {
flatM1[k] = ""
}
for k := range flatM2 {
flatM2[k] = ""
}

require.Zero(t, cmp.Diff(flatM1, flatM2), msg)
}

func TestFBOtelRestartE2E(t *testing.T) {
// This test ensures that filebeatreceiver is able to deliver logs even
// in advent of a collector restart.