Skip to content

Commit d3e62d6

Browse files
swiatekmmergify[bot]
authored andcommitted
Rewrite the beats receiver log test using common tooling (#10153)
* Rewrite the beats receiver log test using common tooling * Expect the outputs to be degraded * Relax checks * Fix assertion * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> * Switch assert to required * Fix constants --------- Co-authored-by: Shaunak Kashyap <[email protected]> (cherry picked from commit 71ece0d) # Conflicts: # testing/integration/ess/beat_receivers_test.go
1 parent 6a7913a commit d3e62d6

File tree

1 file changed

+297
-0
lines changed

1 file changed

+297
-0
lines changed

testing/integration/ess/beat_receivers_test.go

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,234 @@ outputs:
723723
})
724724
}
725725

726+
<<<<<<< HEAD
727+
=======
728+
// TestBeatsReceiverLogs is a test that compares logs emitted by beats processes to those emitted by beats receivers.
729+
func TestBeatsReceiverLogs(t *testing.T) {
730+
_ = define.Require(t, define.Requirements{
731+
Group: integration.Default,
732+
Local: true,
733+
Sudo: true,
734+
OS: []define.OS{
735+
{Type: define.Windows},
736+
{Type: define.Linux},
737+
{Type: define.Darwin},
738+
},
739+
Stack: nil,
740+
})
741+
742+
type configOptions struct {
743+
RuntimeExperimental string
744+
}
745+
configTemplate := `agent.logging.level: info
746+
agent.logging.to_stderr: true
747+
agent.logging.to_files: false
748+
inputs:
749+
# Collecting system metrics
750+
- type: system/metrics
751+
id: unique-system-metrics-input
752+
_runtime_experimental: {{.RuntimeExperimental}}
753+
streams:
754+
- metricsets:
755+
- cpu
756+
outputs:
757+
default:
758+
type: elasticsearch
759+
hosts: [http://localhost:9200]
760+
api_key: placeholder
761+
agent.monitoring.enabled: false
762+
`
763+
764+
var configBuffer bytes.Buffer
765+
require.NoError(t,
766+
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
767+
configOptions{
768+
RuntimeExperimental: string(component.ProcessRuntimeManager),
769+
}))
770+
processConfig := configBuffer.Bytes()
771+
require.NoError(t,
772+
template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer,
773+
configOptions{
774+
RuntimeExperimental: string(component.OtelRuntimeManager),
775+
}))
776+
receiverConfig := configBuffer.Bytes()
777+
// this is the context for the whole test, with a global timeout defined
778+
ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute))
779+
defer cancel()
780+
781+
// since we set the output to a nonexistent ES endpoint, we expect it to be degraded, but the input to be healthy
782+
assertBeatsReady := func(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager) {
783+
t.Helper()
784+
785+
var componentVersionInfoName string
786+
switch runtime {
787+
case component.OtelRuntimeManager:
788+
componentVersionInfoName = "beats-receiver"
789+
default:
790+
componentVersionInfoName = "beat-v2-client"
791+
}
792+
793+
// we don't actually care about anything here other than the receiver itself
794+
assert.Equal(t, 1, len(status.Components))
795+
796+
// all the components should be degraded, their output units should be degraded, the input units should be healthy,
797+
// and should identify themselves appropriately via their version info
798+
for _, comp := range status.Components {
799+
assert.Equal(t, componentVersionInfoName, comp.VersionInfo.Name)
800+
for _, unit := range comp.Units {
801+
if unit.UnitType == int(cproto.UnitType_INPUT) {
802+
assert.Equal(t, int(cproto.State_HEALTHY), unit.State,
803+
"expected state of unit %s to be %s, got %s",
804+
unit.UnitID, cproto.State_HEALTHY.String(), cproto.State(unit.State).String())
805+
}
806+
}
807+
}
808+
}
809+
810+
// set up a standalone agent
811+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
812+
require.NoError(t, err)
813+
814+
err = fixture.Prepare(ctx)
815+
require.NoError(t, err)
816+
err = fixture.Configure(ctx, processConfig)
817+
require.NoError(t, err)
818+
819+
output, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true})
820+
require.NoError(t, err, "failed to install agent: %s", output)
821+
822+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
823+
var statusErr error
824+
status, statusErr := fixture.ExecStatus(ctx)
825+
require.NoError(collect, statusErr)
826+
assertBeatsReady(collect, &status, component.ProcessRuntimeManager)
827+
return
828+
}, 2*time.Minute, 5*time.Second)
829+
830+
// change configuration and wait until the beats receiver is healthy
831+
err = fixture.Configure(ctx, receiverConfig)
832+
require.NoError(t, err)
833+
834+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
835+
var statusErr error
836+
status, statusErr := fixture.ExecStatus(ctx)
837+
require.NoError(collect, statusErr)
838+
assertBeatsReady(collect, &status, component.OtelRuntimeManager)
839+
return
840+
}, 2*time.Minute, 5*time.Second)
841+
842+
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
843+
require.NoError(t, err, "failed to read logs: %v", err)
844+
845+
beatStartLogs := getBeatStartLogRecords(string(logsBytes))
846+
847+
require.Len(t, beatStartLogs, 2, "expected to find one log line for each configuration")
848+
processLog, receiverLog := beatStartLogs[0], beatStartLogs[1]
849+
850+
// Check that the process log is a subset of the receiver log
851+
for key, value := range processLog {
852+
assert.Contains(t, receiverLog, key)
853+
if key == "@timestamp" { // the timestamp value will be different
854+
continue
855+
}
856+
assert.Equal(t, value, receiverLog[key])
857+
}
858+
}
859+
860+
// TestBeatsReceiverProcessRuntimeFallback verifies that we fall back to the process runtime if the otel runtime
861+
// does not support the requested configuration.
862+
func TestBeatsReceiverProcessRuntimeFallback(t *testing.T) {
863+
_ = define.Require(t, define.Requirements{
864+
Group: integration.Default,
865+
Local: true,
866+
Sudo: true,
867+
OS: []define.OS{
868+
{Type: define.Windows},
869+
{Type: define.Linux},
870+
{Type: define.Darwin},
871+
},
872+
Stack: nil,
873+
})
874+
875+
config := `agent.logging.to_stderr: true
876+
agent.logging.to_files: false
877+
inputs:
878+
# Collecting system metrics
879+
- type: system/metrics
880+
id: unique-system-metrics-input
881+
_runtime_experimental: otel
882+
streams:
883+
- metricsets:
884+
- cpu
885+
outputs:
886+
default:
887+
type: elasticsearch
888+
hosts: [http://localhost:9200]
889+
api_key: placeholder
890+
indices: [] # not supported by the elasticsearch exporter
891+
agent.monitoring.enabled: false
892+
`
893+
894+
// this is the context for the whole test, with a global timeout defined
895+
ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute))
896+
defer cancel()
897+
898+
// set up a standalone agent
899+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
900+
require.NoError(t, err)
901+
902+
err = fixture.Prepare(ctx)
903+
require.NoError(t, err)
904+
err = fixture.Configure(ctx, []byte(config))
905+
require.NoError(t, err)
906+
907+
installOutput, err := fixture.Install(ctx, &atesting.InstallOpts{Privileged: true, Force: true})
908+
require.NoError(t, err, "install failed, output: %s", string(installOutput))
909+
910+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
911+
var statusErr error
912+
status, statusErr := fixture.ExecStatus(ctx)
913+
assert.NoError(collect, statusErr)
914+
// we should be running beats processes even though the otel runtime was requested
915+
assertBeatsHealthy(collect, &status, component.ProcessRuntimeManager, 1)
916+
return
917+
}, 1*time.Minute, 1*time.Second)
918+
logsBytes, err := fixture.Exec(ctx, []string{"logs", "-n", "1000", "--exclude-events"})
919+
require.NoError(t, err)
920+
921+
// verify we've logged a warning about using the process runtime
922+
var unsupportedLogRecord map[string]any
923+
for _, line := range strings.Split(string(logsBytes), "\n") {
924+
line = strings.TrimSpace(line)
925+
if line == "" {
926+
continue
927+
}
928+
var logRecord map[string]any
929+
if unmarshalErr := json.Unmarshal([]byte(line), &logRecord); unmarshalErr != nil {
930+
continue
931+
}
932+
933+
if message, ok := logRecord["message"].(string); ok && strings.HasPrefix(message, "otel runtime is not supported") {
934+
unsupportedLogRecord = logRecord
935+
break
936+
}
937+
}
938+
939+
t.Cleanup(func() {
940+
if t.Failed() {
941+
t.Log("Elastic-Agent logs seen by the test:")
942+
t.Log(string(logsBytes))
943+
}
944+
})
945+
946+
require.NotNil(t, unsupportedLogRecord, "unsupported log message should be present")
947+
message, ok := unsupportedLogRecord["message"].(string)
948+
require.True(t, ok, "log message field should be a string")
949+
expectedMessage := "otel runtime is not supported for component system/metrics-default, switching to process runtime, reason: unsupported configuration for system/metrics-default: error translating config for output: default, unit: system/metrics-default, error: indices is currently not supported: unsupported operation"
950+
assert.Equal(t, expectedMessage, message)
951+
}
952+
953+
>>>>>>> 71ece0de9 (Rewrite the beats receiver log test using common tooling (#10153))
726954
func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.AgentStatusCollectorOutput) {
727955
assert.Equal(t, int(cproto.CollectorComponentStatus_StatusOK), status.Status, "component status should be ok")
728956
assert.Equal(t, "", status.Error, "component status should not have an error")
@@ -731,6 +959,75 @@ func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.Agent
731959
}
732960
}
733961

962+
<<<<<<< HEAD
963+
=======
964+
func assertBeatsHealthy(t *assert.CollectT, status *atesting.AgentStatusOutput, runtime component.RuntimeManager, componentCount int) {
965+
var componentVersionInfoName string
966+
switch runtime {
967+
case "otel":
968+
componentVersionInfoName = "beats-receiver"
969+
default:
970+
componentVersionInfoName = "beat-v2-client"
971+
}
972+
973+
// agent should be healthy
974+
assert.Equal(t, int(cproto.State_HEALTHY), status.State)
975+
assert.Equal(t, componentCount, len(status.Components))
976+
977+
// all the components should be healthy, their units should be healthy, and should identify themselves
978+
// as beats processes via their version info
979+
for _, comp := range status.Components {
980+
assert.Equal(t, int(cproto.State_HEALTHY), comp.State)
981+
assert.Equal(t, componentVersionInfoName, comp.VersionInfo.Name)
982+
for _, unit := range comp.Units {
983+
assert.Equal(t, int(cproto.State_HEALTHY), unit.State)
984+
}
985+
}
986+
}
987+
988+
// getBeatStartLogRecords returns the log records for a particular log line emitted when the beat starts
989+
// This log line is identical between beats processes and receivers, so it's a good point of comparison
990+
func getBeatStartLogRecords(logs string) []map[string]any {
991+
var logRecords []map[string]any
992+
for _, line := range strings.Split(logs, "\n") {
993+
line = strings.TrimSpace(line)
994+
if line == "" {
995+
continue
996+
}
997+
logRecord := make(map[string]any)
998+
if unmarshalErr := json.Unmarshal([]byte(line), &logRecord); unmarshalErr != nil {
999+
continue
1000+
}
1001+
1002+
if message, ok := logRecord["message"].(string); ok && strings.HasPrefix(message, "Beat name:") {
1003+
logRecords = append(logRecords, logRecord)
1004+
}
1005+
}
1006+
return logRecords
1007+
}
1008+
1009+
func prepareAgentCmd(t *testing.T, ctx context.Context, config []byte) (*atesting.Fixture, *exec.Cmd, *strings.Builder) {
1010+
// set up a standalone agent
1011+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
1012+
require.NoError(t, err)
1013+
1014+
err = fixture.Prepare(ctx)
1015+
require.NoError(t, err)
1016+
err = fixture.Configure(ctx, config)
1017+
require.NoError(t, err)
1018+
1019+
cmd, err := fixture.PrepareAgentCommand(ctx, nil)
1020+
require.NoError(t, err)
1021+
cmd.WaitDelay = 1 * time.Second
1022+
1023+
var output strings.Builder
1024+
cmd.Stderr = &output
1025+
cmd.Stdout = &output
1026+
1027+
return fixture, cmd, &output
1028+
}
1029+
1030+
>>>>>>> 71ece0de9 (Rewrite the beats receiver log test using common tooling (#10153))
7341031
func genIgnoredFields(goos string) []string {
7351032
switch goos {
7361033
case "windows":

0 commit comments

Comments
 (0)