Skip to content

Commit fedb121

Browse files
swiatekmycombinator
authored andcommitted
Rewrite the beats receiver log test using common tooling (#10153)
* Rewrite the beats receiver log test using common tooling * Expect the outputs to be degraded * Relax checks * Fix assertion * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Switch assert to required * Fix constants --------- Co-authored-by: Shaunak Kashyap <[email protected]> (cherry picked from commit 71ece0d)
1 parent 87302f6 commit fedb121

File tree

1 file changed

+65
-47
lines changed

1 file changed

+65
-47
lines changed

testing/integration/ess/beat_receivers_test.go

Lines changed: 65 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -696,8 +696,6 @@ func TestBeatsReceiverLogs(t *testing.T) {
696696
Stack: nil,
697697
})
698698

699-
t.Skip("Skip this test as it's flaky. See https://github.com/elastic/elastic-agent/issues/9890")
700-
701699
type configOptions struct {
702700
RuntimeExperimental string
703701
}
@@ -724,66 +722,87 @@ agent.monitoring.enabled: false
724722
require.NoError(t,
725723
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
726724
configOptions{
727-
RuntimeExperimental: "process",
725+
RuntimeExperimental: string(component.ProcessRuntimeManager),
728726
}))
729727
processConfig := configBuffer.Bytes()
730728
require.NoError(t,
731729
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
732730
configOptions{
733-
RuntimeExperimental: "otel",
731+
RuntimeExperimental: string(component.OtelRuntimeManager),
734732
}))
735733
receiverConfig := configBuffer.Bytes()
736734
// this is the context for the whole test, with a global timeout defined
737735
ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute))
738736
defer cancel()
739737

740-
// use a subcontext for the agent
741-
agentProcessCtx, agentProcessCancel := context.WithCancel(ctx)
742-
fixture, cmd, output := prepareAgentCmd(t, agentProcessCtx, processConfig)
738+
// since we set the output to a nonexistent ES endpoint, we expect it to be degraded, but the input to be healthy
739+
assertBeatsReady := func(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager) {
740+
t.Helper()
743741

744-
require.NoError(t, cmd.Start())
742+
var componentVersionInfoName string
743+
switch runtime {
744+
case component.OtelRuntimeManager:
745+
componentVersionInfoName = "beats-receiver"
746+
default:
747+
componentVersionInfoName = "beat-v2-client"
748+
}
745749

746-
require.EventuallyWithT(t, func(collect *assert.CollectT) {
747-
var statusErr error
748-
status, statusErr := fixture.ExecStatus(agentProcessCtx)
749-
assert.NoError(collect, statusErr)
750-
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1)
751-
return
752-
}, 1*time.Minute, 1*time.Second)
750+
// we don't actually care about anything here other than the receiver itself
751+
assert.Equal(t, 1, len(status.Components))
752+
753+
// all the components should be degraded, their output units should be degraded, the input units should be healthy,
754+
// and should identify themselves appropriately via their version info
755+
for _, comp := range status.Components {
756+
assert.Equal(t, componentVersionInfoName, comp.VersionInfo.Name)
757+
for _, unit := range comp.Units {
758+
if unit.UnitType == int(cproto.UnitType_INPUT) {
759+
assert.Equal(t, int(cproto.State_HEALTHY), unit.State,
760+
"expected state of unit %s to be %s, got %s",
761+
unit.UnitID, cproto.State_HEALTHY.String(), cproto.State(unit.State).String())
762+
}
763+
}
764+
}
765+
}
753766

754-
agentProcessCancel()
755-
require.Error(t, cmd.Wait())
756-
processLogsString := output.String()
757-
output.Reset()
767+
// set up a standalone agent
768+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
769+
require.NoError(t, err)
758770

759-
// use a subcontext for the agent
760-
agentReceiverCtx, agentReceiverCancel := context.WithCancel(ctx)
761-
fixture, cmd, output = prepareAgentCmd(t, agentReceiverCtx, receiverConfig)
771+
err = fixture.Prepare(ctx)
772+
require.NoError(t, err)
773+
err = fixture.Configure(ctx, processConfig)
774+
require.NoError(t, err)
762775

763-
require.NoError(t, cmd.Start())
776+
output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true})
777+
require.NoError(t, err, "failed to install agent: %s", output)
764778

765-
t.Cleanup(func() {
766-
if t.Failed() {
767-
t.Log("Elastic-Agent output:")
768-
t.Log(output.String())
769-
}
770-
})
779+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
780+
var statusErr error
781+
status, statusErr := fixture.ExecStatus(ctx)
782+
require.NoError(collect, statusErr)
783+
assertBeatsReady(collect, &status, component.ProcessRuntimeManager)
784+
return
785+
}, 2*time.Minute, 5*time.Second)
786+
787+
// change configuration and wait until the beats receiver is healthy
788+
err = fixture.Configure(ctx, receiverConfig)
789+
require.NoError(t, err)
771790

772791
require.EventuallyWithT(t, func(collect *assert.CollectT) {
773792
var statusErr error
774-
status, statusErr := fixture.ExecStatus(agentReceiverCtx)
775-
assert.NoError(collect, statusErr)
776-
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 1)
793+
status, statusErr := fixture.ExecStatus(ctx)
794+
require.NoError(collect, statusErr)
795+
assertBeatsReady(collect, &status, component.OtelRuntimeManager)
777796
return
778-
}, 1*time.Minute, 1*time.Second)
779-
agentReceiverCancel()
780-
require.Error(t, cmd.Wait())
781-
receiverLogsString := output.String()
797+
}, 2*time.Minute, 5*time.Second)
798+
799+
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
800+
require.NoError(t, err, "failed to read logs: %v", err)
782801

783-
processLog := getBeatStartLogRecord(processLogsString)
784-
assert.NotEmpty(t, processLog)
785-
receiverLog := getBeatStartLogRecord(receiverLogsString)
786-
assert.NotEmpty(t, receiverLog)
802+
beatStartLogs := getBeatStartLogRecords(string(logsBytes))
803+
804+
require.Len(t, beatStartLogs, 2, "expected to find one log line for each configuration")
805+
processLog, receiverLog := beatStartLogs[0], beatStartLogs[1]
787806

788807
// Check that the process log is a subset of the receiver log
789808
for key, value := range processLog {
@@ -920,9 +939,10 @@ func assertBeatsHealthy(t *assert.CollectT, status *atesting.AgentStatusOutput,
920939
}
921940
}
922941

923-
// getBeatStartLogRecord returns the log record for the a particular log line emitted when the beat starts
942+
// getBeatStartLogRecords returns the log records for a particular log line emitted when the beat starts
924943
// This log line is identical between beats processes and receivers, so it's a good point of comparison
925-
func getBeatStartLogRecord(logs string) map[string]any {
944+
func getBeatStartLogRecords(logs string) []map[string]any {
945+
var logRecords []map[string]any
926946
for _, line := range strings.Split(logs, "\n") {
927947
line = strings.TrimSpace(line)
928948
if line == "" {
@@ -933,13 +953,11 @@ func getBeatStartLogRecord(logs string) map[string]any {
933953
continue
934954
}
935955

936-
if message, ok := logRecord["message"].(string); !ok || !strings.HasPrefix(message, "Beat name:") {
937-
continue
956+
if message, ok := logRecord["message"].(string); ok && strings.HasPrefix(message, "Beat name:") {
957+
logRecords = append(logRecords, logRecord)
938958
}
939-
940-
return logRecord
941959
}
942-
return nil
960+
return logRecords
943961
}
944962

945963
func prepareAgentCmd(t *testing.T, ctx context.Context, config []byte) (*atesting.Fixture, *exec.Cmd, *strings.Builder) {

0 commit comments

Comments
 (0)