Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 65 additions & 47 deletions testing/integration/ess/beat_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,6 @@ func TestBeatsReceiverLogs(t *testing.T) {
Stack: nil,
})

t.Skip("Skip this test as it's flaky. See https://github.com/elastic/elastic-agent/issues/9890")

type configOptions struct {
RuntimeExperimental string
}
Expand All @@ -724,66 +722,87 @@ agent.monitoring.enabled: false
require.NoError(t,
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
configOptions{
RuntimeExperimental: "process",
RuntimeExperimental: string(component.ProcessRuntimeManager),
}))
processConfig := configBuffer.Bytes()
require.NoError(t,
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
configOptions{
RuntimeExperimental: "otel",
RuntimeExperimental: string(component.OtelRuntimeManager),
}))
receiverConfig := configBuffer.Bytes()
// this is the context for the whole test, with a global timeout defined
ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute))
defer cancel()

// use a subcontext for the agent
agentProcessCtx, agentProcessCancel := context.WithCancel(ctx)
fixture, cmd, output := prepareAgentCmd(t, agentProcessCtx, processConfig)
// since we set the output to a nonexistent ES endpoint, we expect it to be degraded, but the input to be healthy
assertBeatsReady := func(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager) {
t.Helper()

require.NoError(t, cmd.Start())
var componentVersionInfoName string
switch runtime {
case component.OtelRuntimeManager:
componentVersionInfoName = "beats-receiver"
default:
componentVersionInfoName = "beat-v2-client"
}

require.EventuallyWithT(t, func(collect *assert.CollectT) {
var statusErr error
status, statusErr := fixture.ExecStatus(agentProcessCtx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1)
return
}, 1*time.Minute, 1*time.Second)
// we don't actually care about anything here other than the receiver itself
assert.Equal(t, 1, len(status.Components))

// all the components should be degraded, their output units should be degraded, the input units should be healthy,
// and should identify themselves appropriately via their version info
for _, comp := range status.Components {
assert.Equal(t, componentVersionInfoName, comp.VersionInfo.Name)
for _, unit := range comp.Units {
if unit.UnitType == int(cproto.UnitType_INPUT) {
assert.Equal(t, int(cproto.State_HEALTHY), unit.State,
"expected state of unit %s to be %s, got %s",
unit.UnitID, cproto.State_HEALTHY.String(), cproto.State(unit.State).String())
}
}
}
}

agentProcessCancel()
require.Error(t, cmd.Wait())
processLogsString := output.String()
output.Reset()
// set up a standalone agent
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

// use a subcontext for the agent
agentReceiverCtx, agentReceiverCancel := context.WithCancel(ctx)
fixture, cmd, output = prepareAgentCmd(t, agentReceiverCtx, receiverConfig)
err = fixture.Prepare(ctx)
require.NoError(t, err)
err = fixture.Configure(ctx, processConfig)
require.NoError(t, err)

require.NoError(t, cmd.Start())
output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true})
require.NoError(t, err, "failed to install agent: %s", output)

t.Cleanup(func() {
if t.Failed() {
t.Log("Elastic-Agent output:")
t.Log(output.String())
}
})
require.EventuallyWithT(t, func(collect *assert.CollectT) {
var statusErr error
status, statusErr := fixture.ExecStatus(ctx)
require.NoError(collect, statusErr)
assertBeatsReady(collect, &status, component.ProcessRuntimeManager)
return
}, 2*time.Minute, 5*time.Second)

// change configuration and wait until the beats receiver is healthy
err = fixture.Configure(ctx, receiverConfig)
require.NoError(t, err)

require.EventuallyWithT(t, func(collect *assert.CollectT) {
var statusErr error
status, statusErr := fixture.ExecStatus(agentReceiverCtx)
assert.NoError(collect, statusErr)
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 1)
status, statusErr := fixture.ExecStatus(ctx)
require.NoError(collect, statusErr)
assertBeatsReady(collect, &status, component.OtelRuntimeManager)
return
}, 1*time.Minute, 1*time.Second)
agentReceiverCancel()
require.Error(t, cmd.Wait())
receiverLogsString := output.String()
}, 2*time.Minute, 5*time.Second)

logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
require.NoError(t, err, "failed to read logs: %v", err)

processLog := getBeatStartLogRecord(processLogsString)
assert.NotEmpty(t, processLog)
receiverLog := getBeatStartLogRecord(receiverLogsString)
assert.NotEmpty(t, receiverLog)
beatStartLogs := getBeatStartLogRecords(string(logsBytes))

require.Len(t, beatStartLogs, 2, "expected to find one log line for each configuration")
processLog, receiverLog := beatStartLogs[0], beatStartLogs[1]

// Check that the process log is a subset of the receiver log
for key, value := range processLog {
Expand Down Expand Up @@ -920,9 +939,10 @@ func assertBeatsHealthy(t *assert.CollectT, status *atesting.AgentStatusOutput,
}
}

// getBeatStartLogRecord returns the log record for the a particular log line emitted when the beat starts
// getBeatStartLogRecords returns the log records for a particular log line emitted when the beat starts
// This log line is identical between beats processes and receivers, so it's a good point of comparison
func getBeatStartLogRecord(logs string) map[string]any {
func getBeatStartLogRecords(logs string) []map[string]any {
var logRecords []map[string]any
for _, line := range strings.Split(logs, "\n") {
line = strings.TrimSpace(line)
if line == "" {
Expand All @@ -933,13 +953,11 @@ func getBeatStartLogRecord(logs string) map[string]any {
continue
}

if message, ok := logRecord["message"].(string); !ok || !strings.HasPrefix(message, "Beat name:") {
continue
if message, ok := logRecord["message"].(string); ok && strings.HasPrefix(message, "Beat name:") {
logRecords = append(logRecords, logRecord)
}

return logRecord
}
return nil
return logRecords
}

func prepareAgentCmd(t *testing.T, ctx context.Context, config []byte) (*atesting.Fixture, *exec.Cmd, *strings.Builder) {
Expand Down