Skip to content

Commit 3b2fe00

Browse files
committed
Send data from beats processes and receivers to different namespaces
1 parent 183ff06 commit 3b2fe00

File tree

1 file changed

+31
-25
lines changed

1 file changed

+31
-25
lines changed

testing/integration/beat_receivers_test.go

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ import (
1717

1818
"github.com/stretchr/testify/assert"
1919

20-
"github.com/elastic/elastic-agent/pkg/control/v
2120
"gopkg.in/yaml.v2"
2221

22+
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
23+
2324
"github.com/gofrs/uuid/v5"
2425
"github.com/stretchr/testify/require"
2526

@@ -121,11 +122,14 @@ func TestAgentMonitoring(t *testing.T) {
121122
Outputs map[string]PolicyOutputs `yaml:"outputs"`
122123
Fleet map[string]any `yaml:"fleet"`
123124
OutputPermissions map[string]any `yaml:"output_permissions"`
124-
Agent map[string]any `yaml:"agent"`
125-
Inputs []map[string]any `yaml:"inputs"`
126-
Signed map[string]any `yaml:"signed"`
127-
SecretReferences []map[string]any `yaml:"secret_references"`
128-
Namespaces []map[string]any `yaml:"namespaces"`
125+
Agent struct {
126+
Monitoring map[string]any `yaml:"monitoring"`
127+
Rest map[string]any `yaml:",inline"`
128+
} `yaml:"agent"`
129+
Inputs []map[string]any `yaml:"inputs"`
130+
Signed map[string]any `yaml:"signed"`
131+
SecretReferences []map[string]any `yaml:"secret_references"`
132+
Namespaces []map[string]any `yaml:"namespaces"`
129133
}
130134

131135
policy := PolicyStruct{}
@@ -144,6 +148,12 @@ func TestAgentMonitoring(t *testing.T) {
144148
t.Run("verify elastic-agent monitoring functionality", func(t *testing.T) {
145149
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
146150
t.Cleanup(cancel)
151+
152+
// beats processes and beats receivers should use a different namespace to ensure each test looks only at the
153+
// right data
154+
actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "process")
155+
policy.Agent.Monitoring["namespace"] = actualNamespace
156+
147157
updatedPolicyBytes, err := yaml.Marshal(policy)
148158
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
149159
t.Cleanup(func() {
@@ -182,7 +192,7 @@ func TestAgentMonitoring(t *testing.T) {
182192
mustClauses := []map[string]any{
183193
{"match": map[string]any{"data_stream.type": tc.dsType}},
184194
{"match": map[string]any{"data_stream.dataset": tc.dsDataset}},
185-
{"match": map[string]any{"data_stream.namespace": tc.dsNamespace}},
195+
{"match": map[string]any{"data_stream.namespace": actualNamespace}},
186196
}
187197

188198
// Only add the "message" match if tc.message is not empty
@@ -231,11 +241,13 @@ func TestAgentMonitoring(t *testing.T) {
231241
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
232242
t.Cleanup(cancel)
233243

244+
// beats processes and beats receivers should use a different namespace to ensure each test looks only at the
245+
// right data
246+
actualNamespace := fmt.Sprintf("%s-%s", info.Namespace, "otel")
247+
policy.Agent.Monitoring["namespace"] = actualNamespace
248+
234249
// switch monitoring to the otel runtime
235-
monitoring := policy.Agent["monitoring"]
236-
monitoringMap := monitoring.(map[any]any)
237-
monitoringMap["_runtime_experimental"] = "otel"
238-
policy.Agent["monitoring"] = monitoringMap
250+
policy.Agent.Monitoring["_runtime_experimental"] = "otel"
239251

240252
updatedPolicyBytes, err := yaml.Marshal(policy)
241253
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
@@ -255,11 +267,6 @@ func TestAgentMonitoring(t *testing.T) {
255267
err = fixture.Configure(ctx, updatedPolicyBytes)
256268
require.NoError(t, err, "error configuring fixture")
257269

258-
// Get the timestamp before starting. Required to separate logs from agent and otel.
259-
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
260-
261-
fmt.Println(time.Now())
262-
263270
output, err := fixture.InstallWithoutEnroll(ctx, &installOpts)
264271
require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output))
265272

@@ -273,11 +280,12 @@ func TestAgentMonitoring(t *testing.T) {
273280

274281
// we should have filebeatreceiver and metricbeatreceiver running
275282
otelCollectorStatus := status.Collector
276-
assert.Equal(collect, cproto.CollectorComponentStatus_StatusOK, otelCollectorStatus.Status)
283+
assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status)
277284
pipelineStatusMap := otelCollectorStatus.ComponentStatusMap
278285

279286
// we should have 3 pipelines running: filestream for logs, http metrics and beats metrics
280287
assert.Equal(collect, 3, len(pipelineStatusMap))
288+
281289
fileStreamPipeline := "pipeline:logs/_agent-component/filestream-monitoring"
282290
httpMetricsPipeline := "pipeline:logs/_agent-component/http/metrics-monitoring"
283291
beatsMetricsPipeline := "pipeline:logs/_agent-component/beat/metrics-monitoring"
@@ -286,9 +294,9 @@ func TestAgentMonitoring(t *testing.T) {
286294
assert.Contains(collect, pipelineStatusMap, beatsMetricsPipeline)
287295

288296
// and they should be healthy
289-
assert.Equal(collect, cproto.CollectorComponentStatus_StatusOK, pipelineStatusMap[fileStreamPipeline].Status)
290-
assert.Equal(collect, cproto.CollectorComponentStatus_StatusOK, pipelineStatusMap[httpMetricsPipeline].Status)
291-
assert.Equal(collect, cproto.CollectorComponentStatus_StatusOK, pipelineStatusMap[beatsMetricsPipeline].Status)
297+
assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), pipelineStatusMap[fileStreamPipeline].Status)
298+
assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), pipelineStatusMap[httpMetricsPipeline].Status)
299+
assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), pipelineStatusMap[beatsMetricsPipeline].Status)
292300

293301
return
294302
}, 1*time.Minute, 1*time.Second)
@@ -301,11 +309,9 @@ func TestAgentMonitoring(t *testing.T) {
301309
defer findCancel()
302310
mustClauses := []map[string]any{
303311
{"match": map[string]any{"message": tc.message}},
304-
{"range": map[string]interface{}{
305-
"@timestamp": map[string]string{
306-
"gte": timestamp, // Use captured timestamp
307-
},
308-
}},
312+
{"match": map[string]any{"data_stream.type": tc.dsType}},
313+
{"match": map[string]any{"data_stream.dataset": tc.dsDataset}},
314+
{"match": map[string]any{"data_stream.namespace": actualNamespace}},
309315
}
310316

311317
rawQuery := map[string]any{

0 commit comments

Comments
 (0)