Skip to content

Commit 07559a5

Browse files
swiatekmycombinator
authored andcommitted
Rewrite the beats receiver log test using common tooling (#10153)
* Rewrite the beats receiver log test using common tooling * Expect the outputs to be degraded * Relax checks * Fix assertion * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Switch assert to required * Fix constants --------- Co-authored-by: Shaunak Kashyap <[email protected]> (cherry picked from commit 71ece0d)
1 parent 3fb0407 commit 07559a5

File tree

1 file changed

+65
-47
lines changed

1 file changed

+65
-47
lines changed

testing/integration/ess/beat_receivers_test.go

Lines changed: 65 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -698,8 +698,6 @@ func TestBeatsReceiverLogs(t *testing.T) {
698698
Stack: nil,
699699
})
700700

701-
t.Skip("Skip this test as it's flaky. See https://github.com/elastic/elastic-agent/issues/9890")
702-
703701
type configOptions struct {
704702
RuntimeExperimental string
705703
}
@@ -726,66 +724,87 @@ agent.monitoring.enabled: false
726724
require.NoError(t,
727725
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
728726
configOptions{
729-
RuntimeExperimental: "process",
727+
RuntimeExperimental: string(component.ProcessRuntimeManager),
730728
}))
731729
processConfig := configBuffer.Bytes()
732730
require.NoError(t,
733731
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
734732
configOptions{
735-
RuntimeExperimental: "otel",
733+
RuntimeExperimental: string(component.OtelRuntimeManager),
736734
}))
737735
receiverConfig := configBuffer.Bytes()
738736
// this is the context for the whole test, with a global timeout defined
739737
ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute))
740738
defer cancel()
741739

742-
// use a subcontext for the agent
743-
agentProcessCtx, agentProcessCancel := context.WithCancel(ctx)
744-
fixture, cmd, output := prepareAgentCmd(t, agentProcessCtx, processConfig)
740+
// since we set the output to a nonexistent ES endpoint, we expect it to be degraded, but the input to be healthy
741+
assertBeatsReady := func(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager) {
742+
t.Helper()
745743

746-
require.NoError(t, cmd.Start())
744+
var componentVersionInfoName string
745+
switch runtime {
746+
case component.OtelRuntimeManager:
747+
componentVersionInfoName = "beats-receiver"
748+
default:
749+
componentVersionInfoName = "beat-v2-client"
750+
}
747751

748-
require.EventuallyWithT(t, func(collect *assert.CollectT) {
749-
var statusErr error
750-
status, statusErr := fixture.ExecStatus(agentProcessCtx)
751-
assert.NoError(collect, statusErr)
752-
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1)
753-
return
754-
}, 1*time.Minute, 1*time.Second)
752+
// we don't actually care about anything here other than the receiver itself
753+
assert.Equal(t, 1, len(status.Components))
754+
755+
// all the components should be degraded, their output units should be degraded, the input units should be healthy,
756+
// and should identify themselves appropriately via their version info
757+
for _, comp := range status.Components {
758+
assert.Equal(t, componentVersionInfoName, comp.VersionInfo.Name)
759+
for _, unit := range comp.Units {
760+
if unit.UnitType == int(cproto.UnitType_INPUT) {
761+
assert.Equal(t, int(cproto.State_HEALTHY), unit.State,
762+
"expected state of unit %s to be %s, got %s",
763+
unit.UnitID, cproto.State_HEALTHY.String(), cproto.State(unit.State).String())
764+
}
765+
}
766+
}
767+
}
755768

756-
agentProcessCancel()
757-
require.Error(t, cmd.Wait())
758-
processLogsString := output.String()
759-
output.Reset()
769+
// set up a standalone agent
770+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
771+
require.NoError(t, err)
772+
773+
err = fixture.Prepare(ctx)
774+
require.NoError(t, err)
775+
err = fixture.Configure(ctx, processConfig)
776+
require.NoError(t, err)
760777

761-
// use a subcontext for the agent
762-
agentReceiverCtx, agentReceiverCancel := context.WithCancel(ctx)
763-
fixture, cmd, output = prepareAgentCmd(t, agentReceiverCtx, receiverConfig)
778+
output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true})
779+
require.NoError(t, err, "failed to install agent: %s", output)
764780

765-
require.NoError(t, cmd.Start())
781+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
782+
var statusErr error
783+
status, statusErr := fixture.ExecStatus(ctx)
784+
require.NoError(collect, statusErr)
785+
assertBeatsReady(collect, &status, component.ProcessRuntimeManager)
786+
return
787+
}, 2*time.Minute, 5*time.Second)
766788

767-
t.Cleanup(func() {
768-
if t.Failed() {
769-
t.Log("Elastic-Agent output:")
770-
t.Log(output.String())
771-
}
772-
})
789+
// change configuration and wait until the beats receiver is healthy
790+
err = fixture.Configure(ctx, receiverConfig)
791+
require.NoError(t, err)
773792

774793
require.EventuallyWithT(t, func(collect *assert.CollectT) {
775794
var statusErr error
776-
status, statusErr := fixture.ExecStatus(agentReceiverCtx)
777-
assert.NoError(collect, statusErr)
778-
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 1)
795+
status, statusErr := fixture.ExecStatus(ctx)
796+
require.NoError(collect, statusErr)
797+
assertBeatsReady(collect, &status, component.OtelRuntimeManager)
779798
return
780-
}, 1*time.Minute, 1*time.Second)
781-
agentReceiverCancel()
782-
require.Error(t, cmd.Wait())
783-
receiverLogsString := output.String()
799+
}, 2*time.Minute, 5*time.Second)
784800

785-
processLog := getBeatStartLogRecord(processLogsString)
786-
assert.NotEmpty(t, processLog)
787-
receiverLog := getBeatStartLogRecord(receiverLogsString)
788-
assert.NotEmpty(t, receiverLog)
801+
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
802+
require.NoError(t, err, "failed to read logs: %v", err)
803+
804+
beatStartLogs := getBeatStartLogRecords(string(logsBytes))
805+
806+
require.Len(t, beatStartLogs, 2, "expected to find one log line for each configuration")
807+
processLog, receiverLog := beatStartLogs[0], beatStartLogs[1]
789808

790809
// Check that the process log is a subset of the receiver log
791810
for key, value := range processLog {
@@ -922,9 +941,10 @@ func assertBeatsHealthy(t *assert.CollectT, status *atesting.AgentStatusOutput,
922941
}
923942
}
924943

925-
// getBeatStartLogRecord returns the log record for the a particular log line emitted when the beat starts
944+
// getBeatStartLogRecords returns the log records for a particular log line emitted when the beat starts
926945
// This log line is identical between beats processes and receivers, so it's a good point of comparison
927-
func getBeatStartLogRecord(logs string) map[string]any {
946+
func getBeatStartLogRecords(logs string) []map[string]any {
947+
var logRecords []map[string]any
928948
for _, line := range strings.Split(logs, "\n") {
929949
line = strings.TrimSpace(line)
930950
if line == "" {
@@ -935,13 +955,11 @@ func getBeatStartLogRecord(logs string) map[string]any {
935955
continue
936956
}
937957

938-
if message, ok := logRecord["message"].(string); !ok || !strings.HasPrefix(message, "Beat name:") {
939-
continue
958+
if message, ok := logRecord["message"].(string); ok && strings.HasPrefix(message, "Beat name:") {
959+
logRecords = append(logRecords, logRecord)
940960
}
941-
942-
return logRecord
943961
}
944-
return nil
962+
return logRecords
945963
}
946964

947965
func prepareAgentCmd(t *testing.T, ctx context.Context, config []byte) (*atesting.Fixture, *exec.Cmd, *strings.Builder) {

0 commit comments

Comments
 (0)