Skip to content

Commit 1e2032d

Browse files
mergify[bot]swiatekmycombinator
authored
Rewrite the beats receiver log test using common tooling (#10153) (#10505)
* Rewrite the beats receiver log test using common tooling * Expect the outputs to be degraded * Relax checks * Fix assertion * Apply suggestion from @ycombinator * Apply suggestion from @ycombinator * Apply suggestion from @ycombinator * Switch assert to required * Fix constants --------- (cherry picked from commit 71ece0d) Co-authored-by: Mikołaj Świątek <[email protected]> Co-authored-by: Shaunak Kashyap <[email protected]>
1 parent db3bec0 commit 1e2032d

File tree

1 file changed

+63
-47
lines changed

1 file changed

+63
-47
lines changed

testing/integration/ess/beat_receivers_test.go

Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -698,8 +698,6 @@ func TestBeatsReceiverLogs(t *testing.T) {
698698
Stack: nil,
699699
})
700700

701-
t.Skip("Skip this test as it's flaky. See https://github.com/elastic/elastic-agent/issues/9890")
702-
703701
type configOptions struct {
704702
RuntimeExperimental string
705703
}
@@ -726,66 +724,85 @@ agent.monitoring.enabled: false
726724
require.NoError(t,
727725
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
728726
configOptions{
729-
RuntimeExperimental: "process",
727+
RuntimeExperimental: string(component.ProcessRuntimeManager),
730728
}))
731729
processConfig := configBuffer.Bytes()
732730
require.NoError(t,
733731
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
734732
configOptions{
735-
RuntimeExperimental: "otel",
733+
RuntimeExperimental: string(component.OtelRuntimeManager),
736734
}))
737735
receiverConfig := configBuffer.Bytes()
738736
// this is the context for the whole test, with a global timeout defined
739737
ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute))
740738
defer cancel()
741739

742-
// use a subcontext for the agent
743-
agentProcessCtx, agentProcessCancel := context.WithCancel(ctx)
744-
fixture, cmd, output := prepareAgentCmd(t, agentProcessCtx, processConfig)
740+
// since we set the output to a nonexistent ES endpoint, we expect it to be degraded, but the input to be healthy
741+
assertBeatsReady := func(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager) {
742+
var componentVersionInfoName string
743+
switch runtime {
744+
case component.OtelRuntimeManager:
745+
componentVersionInfoName = "beats-receiver"
746+
default:
747+
componentVersionInfoName = "beat-v2-client"
748+
}
745749

746-
require.NoError(t, cmd.Start())
750+
// we don't actually care about anything here other than the receiver itself
751+
assert.Equal(t, 1, len(status.Components))
752+
753+
// all the components should be degraded, their output units should be degraded, the input units should be healthy,
754+
// and should identify themselves appropriately via their version info
755+
for _, comp := range status.Components {
756+
assert.Equal(t, componentVersionInfoName, comp.VersionInfo.Name)
757+
for _, unit := range comp.Units {
758+
if unit.UnitType == int(cproto.UnitType_INPUT) {
759+
assert.Equal(t, int(cproto.State_HEALTHY), unit.State,
760+
"expected state of unit %s to be %s, got %s",
761+
unit.UnitID, cproto.State_HEALTHY.String(), cproto.State(unit.State).String())
762+
}
763+
}
764+
}
765+
}
747766

748-
require.EventuallyWithT(t, func(collect *assert.CollectT) {
749-
var statusErr error
750-
status, statusErr := fixture.ExecStatus(agentProcessCtx)
751-
assert.NoError(collect, statusErr)
752-
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1)
753-
return
754-
}, 1*time.Minute, 1*time.Second)
767+
// set up a standalone agent
768+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
769+
require.NoError(t, err)
755770

756-
agentProcessCancel()
757-
require.Error(t, cmd.Wait())
758-
processLogsString := output.String()
759-
output.Reset()
771+
err = fixture.Prepare(ctx)
772+
require.NoError(t, err)
773+
err = fixture.Configure(ctx, processConfig)
774+
require.NoError(t, err)
760775

761-
// use a subcontext for the agent
762-
agentReceiverCtx, agentReceiverCancel := context.WithCancel(ctx)
763-
fixture, cmd, output = prepareAgentCmd(t, agentReceiverCtx, receiverConfig)
776+
output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true})
777+
require.NoError(t, err, "failed to install agent: %s", output)
764778

765-
require.NoError(t, cmd.Start())
779+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
780+
var statusErr error
781+
status, statusErr := fixture.ExecStatus(ctx)
782+
require.NoError(collect, statusErr)
783+
assertBeatsReady(collect, &status, component.ProcessRuntimeManager)
784+
return
785+
}, 2*time.Minute, 5*time.Second)
766786

767-
t.Cleanup(func() {
768-
if t.Failed() {
769-
t.Log("Elastic-Agent output:")
770-
t.Log(output.String())
771-
}
772-
})
787+
// change configuration and wait until the beats receiver is healthy
788+
err = fixture.Configure(ctx, receiverConfig)
789+
require.NoError(t, err)
773790

774791
require.EventuallyWithT(t, func(collect *assert.CollectT) {
775792
var statusErr error
776-
status, statusErr := fixture.ExecStatus(agentReceiverCtx)
777-
assert.NoError(collect, statusErr)
778-
assertBeatsHealthy(collect, &status, component.OtelRuntimeManager, 1)
793+
status, statusErr := fixture.ExecStatus(ctx)
794+
require.NoError(collect, statusErr)
795+
assertBeatsReady(collect, &status, component.OtelRuntimeManager)
779796
return
780-
}, 1*time.Minute, 1*time.Second)
781-
agentReceiverCancel()
782-
require.Error(t, cmd.Wait())
783-
receiverLogsString := output.String()
797+
}, 2*time.Minute, 5*time.Second)
798+
799+
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
800+
require.NoError(t, err, "failed to read logs: %v", err)
801+
802+
beatStartLogs := getBeatStartLogRecords(string(logsBytes))
784803

785-
processLog := getBeatStartLogRecord(processLogsString)
786-
assert.NotEmpty(t, processLog)
787-
receiverLog := getBeatStartLogRecord(receiverLogsString)
788-
assert.NotEmpty(t, receiverLog)
804+
require.Len(t, beatStartLogs, 2, "expected to find one log line for each configuration")
805+
processLog, receiverLog := beatStartLogs[0], beatStartLogs[1]
789806

790807
// Check that the process log is a subset of the receiver log
791808
for key, value := range processLog {
@@ -922,9 +939,10 @@ func assertBeatsHealthy(t *assert.CollectT, status *atesting.AgentStatusOutput,
922939
}
923940
}
924941

925-
// getBeatStartLogRecord returns the log record for the a particular log line emitted when the beat starts
942+
// getBeatStartLogRecords returns the log records for a particular log line emitted when the beat starts
926943
// This log line is identical between beats processes and receivers, so it's a good point of comparison
927-
func getBeatStartLogRecord(logs string) map[string]any {
944+
func getBeatStartLogRecords(logs string) []map[string]any {
945+
var logRecords []map[string]any
928946
for _, line := range strings.Split(logs, "\n") {
929947
line = strings.TrimSpace(line)
930948
if line == "" {
@@ -935,13 +953,11 @@ func getBeatStartLogRecord(logs string) map[string]any {
935953
continue
936954
}
937955

938-
if message, ok := logRecord["message"].(string); !ok || !strings.HasPrefix(message, "Beat name:") {
939-
continue
956+
if message, ok := logRecord["message"].(string); ok && strings.HasPrefix(message, "Beat name:") {
957+
logRecords = append(logRecords, logRecord)
940958
}
941-
942-
return logRecord
943959
}
944-
return nil
960+
return logRecords
945961
}
946962

947963
func prepareAgentCmd(t *testing.T, ctx context.Context, config []byte) (*atesting.Fixture, *exec.Cmd, *strings.Builder) {

0 commit comments

Comments
 (0)