Skip to content

Commit c9d4314

Browse files
committed
Modify e2e test
1 parent 2a0e5e8 commit c9d4314

File tree

1 file changed

+87
-236
lines changed

1 file changed

+87
-236
lines changed

testing/integration/beat_receivers_test.go

Lines changed: 87 additions & 236 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,21 @@
77
package integration
88

99
import (
10-
"bytes"
1110
"context"
1211
"fmt"
1312
"io"
1413
"net/http"
1514
"runtime"
1615
"testing"
17-
"text/template"
1816
"time"
1917

2018
"gopkg.in/yaml.v2"
2119

2220
"github.com/elastic/elastic-agent-libs/kibana"
2321
"github.com/elastic/elastic-agent-libs/testing/estools"
24-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2522
atesting "github.com/elastic/elastic-agent/pkg/testing"
2623
"github.com/elastic/elastic-agent/pkg/testing/define"
2724
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
28-
"github.com/elastic/elastic-agent/pkg/utils"
29-
3025
"github.com/gofrs/uuid/v5"
3126
"github.com/stretchr/testify/require"
3227
)
@@ -79,73 +74,77 @@ func TestAgentMonitoring(t *testing.T) {
7974
Force: true,
8075
}
8176

77+
// prepare the policy and marshalled configuration
78+
policyCtx, policyCancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
79+
t.Cleanup(policyCancel)
80+
81+
// 1. Create and install policy with just monitoring
82+
createPolicyReq := kibana.AgentPolicy{
83+
Name: fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()),
84+
Namespace: info.Namespace,
85+
Description: fmt.Sprintf("%s policy", t.Name()),
86+
MonitoringEnabled: []kibana.MonitoringEnabledOption{
87+
kibana.MonitoringEnabledLogs,
88+
kibana.MonitoringEnabledMetrics,
89+
},
90+
}
91+
policyResponse, err := info.KibanaClient.CreatePolicy(policyCtx, createPolicyReq)
92+
require.NoError(t, err, "error creating policy")
93+
94+
// 2. Download the policy, add the API key
95+
downloadURL := fmt.Sprintf("/api/fleet/agent_policies/%s/download", policyResponse.ID)
96+
resp, err := info.KibanaClient.Connection.SendWithContext(policyCtx, http.MethodGet, downloadURL, nil, nil, nil)
97+
require.NoError(t, err, "error downloading policy")
98+
policyBytes, err := io.ReadAll(resp.Body)
99+
require.NoError(t, err, "error reading policy response")
100+
defer resp.Body.Close()
101+
102+
apiKeyResponse, err := createESApiKey(info.ESClient)
103+
require.NoError(t, err, "failed to get api key")
104+
require.True(t, len(apiKeyResponse.Encoded) > 1, "api key is invalid %q", apiKeyResponse)
105+
apiKey, err := getDecodedApiKey(apiKeyResponse)
106+
require.NoError(t, err, "error decoding api key")
107+
108+
type PolicyOutputs struct {
109+
Type string `yaml:"type"`
110+
Hosts []string `yaml:"hosts"`
111+
Preset string `yaml:"preset"`
112+
ApiKey string `yaml:"api_key"`
113+
}
114+
type PolicyStruct struct {
115+
ID string `yaml:"id"`
116+
Revision int `yaml:"revision"`
117+
Outputs map[string]PolicyOutputs `yaml:"outputs"`
118+
Fleet map[string]any `yaml:"fleet"`
119+
OutputPermissions map[string]any `yaml:"output_permissions"`
120+
Agent map[string]any `yaml:"agent"`
121+
Inputs []map[string]any `yaml:"inputs"`
122+
Signed map[string]any `yaml:"signed"`
123+
SecretReferences []map[string]any `yaml:"secret_references"`
124+
Namespaces []map[string]any `yaml:"namespaces"`
125+
}
126+
127+
policy := PolicyStruct{}
128+
err = yaml.Unmarshal(policyBytes, &policy)
129+
require.NoError(t, err, "error unmarshalling policy")
130+
d, prs := policy.Outputs["default"]
131+
require.True(t, prs, "default must be in outputs")
132+
d.ApiKey = string(apiKey)
133+
policy.Outputs["default"] = d
134+
82135
// Flow
83136
// 1. Create and install policy with just monitoring
84137
// 2. Download the policy, add the API key
85138
// 3. Install without enrolling in fleet
86139
// 4. Make sure logs and metrics for agent monitoring are being received
87140
t.Run("verify elastic-agent monitoring functionality", func(t *testing.T) {
88141
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
89-
defer cancel()
90-
91-
// 1. Create and install policy with just monitoring
92-
createPolicyReq := kibana.AgentPolicy{
93-
Name: fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String()),
94-
Namespace: info.Namespace,
95-
Description: fmt.Sprintf("%s policy", t.Name()),
96-
MonitoringEnabled: []kibana.MonitoringEnabledOption{
97-
kibana.MonitoringEnabledLogs,
98-
kibana.MonitoringEnabledMetrics,
99-
},
100-
}
101-
policyResponse, err := info.KibanaClient.CreatePolicy(ctx, createPolicyReq)
102-
require.NoError(t, err, "error creating policy")
103-
104-
// 2. Download the policy, add the API key
105-
downloadURL := fmt.Sprintf("/api/fleet/agent_policies/%s/download", policyResponse.ID)
106-
resp, err := info.KibanaClient.Connection.SendWithContext(ctx, http.MethodGet, downloadURL, nil, nil, nil)
107-
require.NoError(t, err, "error downloading policy")
108-
policy, err := io.ReadAll(resp.Body)
109-
require.NoError(t, err, "error reading policy response")
110-
defer resp.Body.Close()
111-
112-
apiKeyResponse, err := createESApiKey(info.ESClient)
113-
require.NoError(t, err, "failed to get api key")
114-
require.True(t, len(apiKeyResponse.Encoded) > 1, "api key is invalid %q", apiKeyResponse)
115-
apiKey, err := getDecodedApiKey(apiKeyResponse)
116-
require.NoError(t, err, "error decoding api key")
117-
118-
type PolicyOutputs struct {
119-
Type string `yaml:"type"`
120-
Hosts []string `yaml:"hosts"`
121-
Preset string `yaml:"preset"`
122-
ApiKey string `yaml:"api_key"`
123-
}
124-
type PolicyStruct struct {
125-
ID string `yaml:"id"`
126-
Revision int `yaml:"revision"`
127-
Outputs map[string]PolicyOutputs `yaml:"outputs"`
128-
Fleet map[string]any `yaml:"fleet"`
129-
OutputPermissions map[string]any `yaml:"output_permissions"`
130-
Agent map[string]any `yaml:"agent"`
131-
Inputs []map[string]any `yaml:"inputs"`
132-
Signed map[string]any `yaml:"signed"`
133-
SecretReferences []map[string]any `yaml:"secret_references"`
134-
Namespaces []map[string]any `yaml:"namespaces"`
135-
}
136-
137-
y := PolicyStruct{}
138-
err = yaml.Unmarshal(policy, &y)
139-
require.NoError(t, err, "error unmarshalling policy")
140-
d, prs := y.Outputs["default"]
141-
require.True(t, prs, "default must be in outputs")
142-
d.ApiKey = string(apiKey)
143-
y.Outputs["default"] = d
144-
policyBytes, err := yaml.Marshal(y)
145-
require.NoErrorf(t, err, "error marshalling policy, struct was %v", y)
142+
t.Cleanup(cancel)
143+
updatedPolicyBytes, err := yaml.Marshal(policy)
144+
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
146145
t.Cleanup(func() {
147146
if t.Failed() {
148-
t.Logf("policy was %s", string(policyBytes))
147+
t.Logf("policy was %s", string(updatedPolicyBytes))
149148
}
150149
})
151150

@@ -156,7 +155,7 @@ func TestAgentMonitoring(t *testing.T) {
156155
err = fixture.Prepare(ctx)
157156
require.NoError(t, err, "error preparing fixture")
158157

159-
err = fixture.Configure(ctx, policyBytes)
158+
err = fixture.Configure(ctx, updatedPolicyBytes)
160159
require.NoError(t, err, "error configuring fixture")
161160

162161
output, err := fixture.InstallWithoutEnroll(ctx, &installOpts)
@@ -225,198 +224,50 @@ func TestAgentMonitoring(t *testing.T) {
225224
}, "monitoring logs from elastic-agent should exist before proceeding",
226225
)
227226

228-
type configOptions struct {
229-
InputPath string
230-
ESEndpoint string
231-
ESApiKey string
232-
SocketEndpoint string
233-
Namespace string
234-
}
235-
esEndpoint, err := getESHost()
236-
require.NoError(t, err, "error getting elasticsearch endpoint")
237-
esApiKey, err := createESApiKey(info.ESClient)
238-
require.NoError(t, err, "error creating API key")
239-
require.NotEmptyf(t, esApiKey.Encoded, "api key is invalid %q", esApiKey)
240-
241-
// Start monitoring in otel mode
242-
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
243-
require.NoError(t, err)
244-
245227
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
246-
defer cancel()
228+
t.Cleanup(cancel)
247229

248-
err = fixture.Prepare(ctx)
249-
require.NoError(t, err)
230+
// switch monitoring to the otel runtime
231+
monitoring := policy.Agent["monitoring"]
232+
monitoringMap := monitoring.(map[any]any)
233+
monitoringMap["_runtime_experimental"] = "otel"
234+
policy.Agent["monitoring"] = monitoringMap
250235

251-
// installs elastic-agent with empty elastic-agent.yml to get its working dir first
252-
err = fixture.Configure(ctx, []byte{})
253-
require.NoError(t, err)
254-
255-
output, err := fixture.InstallWithoutEnroll(ctx, &installOpts)
256-
require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output))
257-
258-
// Ensure elastic-agent is healthy, otherwise we cannot perform retstart operation
259-
require.Eventually(t, func() bool {
260-
err = fixture.IsHealthy(ctx)
261-
if err != nil {
262-
t.Logf("waiting for agent healthy: %s", err.Error())
263-
return false
264-
}
265-
return true
266-
}, 30*time.Second, 1*time.Second)
267-
268-
configTemplateOTel := `
269-
receivers:
270-
filebeatreceiver/filestream-monitoring:
271-
filebeat:
272-
inputs:
273-
- type: filestream
274-
enabled: true
275-
id: filestream-monitoring-agent
276-
paths:
277-
- {{.InputPath}}/data/elastic-agent-*/logs/elastic-agent-*.ndjson
278-
- {{.InputPath}}/data/elastic-agent-*/logs/elastic-agent-watcher-*.ndjson
279-
close:
280-
on_state_change:
281-
inactive: 5m
282-
parsers:
283-
- ndjson:
284-
add_error_key: true
285-
message_key: message
286-
overwrite_keys: true
287-
target: ""
288-
processors:
289-
- add_fields:
290-
fields:
291-
dataset: elastic_agent
292-
namespace: {{.Namespace}}
293-
type: logs
294-
target: data_stream
295-
- add_fields:
296-
fields:
297-
dataset: elastic_agent
298-
target: event
299-
- add_fields:
300-
fields:
301-
id: 0ddca301-e7c0-4eac-8432-7dd05bc9cb06
302-
snapshot: false
303-
version: 8.19.0
304-
target: elastic_agent
305-
- add_fields:
306-
fields:
307-
id: 0879f47d-df41-464d-8462-bc2b8fef45bf
308-
target: agent
309-
- drop_event:
310-
when:
311-
regexp:
312-
component.id: .*-monitoring$
313-
- drop_event:
314-
when:
315-
regexp:
316-
message: ^Non-zero metrics in the last
317-
- copy_fields:
318-
fields:
319-
- from: data_stream.dataset
320-
to: data_stream.dataset_original
321-
- drop_fields:
322-
fields:
323-
- data_stream.dataset
324-
- copy_fields:
325-
fail_on_error: false
326-
fields:
327-
- from: component.dataset
328-
to: data_stream.dataset
329-
ignore_missing: true
330-
- copy_fields:
331-
fail_on_error: false
332-
fields:
333-
- from: data_stream.dataset_original
334-
to: data_stream.dataset
335-
- drop_fields:
336-
fields:
337-
- data_stream.dataset_original
338-
- event.dataset
339-
- copy_fields:
340-
fields:
341-
- from: data_stream.dataset
342-
to: event.dataset
343-
- drop_fields:
344-
fields:
345-
- ecs.version
346-
ignore_missing: true
347-
output:
348-
otelconsumer:
349-
queue:
350-
mem:
351-
flush:
352-
timeout: 0s
353-
logging:
354-
level: info
355-
selectors:
356-
- '*'
357-
http.enabled: true
358-
http.host: {{ .SocketEndpoint }}
359-
exporters:
360-
debug:
361-
use_internal_logger: false
362-
verbosity: detailed
363-
elasticsearch/log:
364-
endpoints:
365-
- {{.ESEndpoint}}
366-
compression: none
367-
api_key: {{.ESApiKey}}
368-
logs_dynamic_index:
369-
enabled: true
370-
batcher:
371-
enabled: true
372-
flush_timeout: 0.5s
373-
mapping:
374-
mode: bodymap
375-
service:
376-
pipelines:
377-
logs:
378-
receivers:
379-
- filebeatreceiver/filestream-monitoring
380-
exporters:
381-
- elasticsearch/log
382-
`
383-
socketEndpoint := utils.SocketURLWithFallback(uuid.Must(uuid.NewV4()).String(), paths.TempDir())
384-
385-
// configure elastic-agent.yml with new config
386-
var configBuffer bytes.Buffer
387-
template.Must(template.New("config").Parse(configTemplateOTel)).Execute(&configBuffer,
388-
configOptions{
389-
InputPath: fixture.WorkDir(),
390-
ESEndpoint: esEndpoint,
391-
ESApiKey: esApiKey.Encoded,
392-
SocketEndpoint: socketEndpoint,
393-
Namespace: info.Namespace,
394-
})
395-
configOTelContents := configBuffer.Bytes()
236+
updatedPolicyBytes, err := yaml.Marshal(policy)
237+
require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy)
396238
t.Cleanup(func() {
397239
if t.Failed() {
398-
t.Logf("Contents of agent config file:\n%s\n", string(configOTelContents))
240+
t.Logf("policy was %s", string(updatedPolicyBytes))
399241
}
400242
})
401-
err = fixture.Configure(ctx, configOTelContents)
243+
244+
// 3. Install without enrolling in fleet
245+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
402246
require.NoError(t, err)
403247

404-
// Get the timestamp before restarting. Required to separate logs from agent and otel
248+
err = fixture.Prepare(ctx)
249+
require.NoError(t, err, "error preparing fixture")
250+
251+
err = fixture.Configure(ctx, updatedPolicyBytes)
252+
require.NoError(t, err, "error configuring fixture")
253+
254+
// Get the timestamp before starting. Required to separate logs from agent and otel.
405255
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
406256

407257
fmt.Println(time.Now())
408-
// Restart elastic-agent
409-
output, err = fixture.Exec(ctx, []string{"restart"})
410-
require.NoErrorf(t, err, "error restarting agent: %s\ncombinedoutput:\n%s", err, string(output))
411258

259+
output, err := fixture.InstallWithoutEnroll(ctx, &installOpts)
260+
require.NoErrorf(t, err, "error install withouth enroll: %s\ncombinedoutput:\n%s", err, string(output))
261+
262+
// Ensure elastic-agent is healthy, otherwise we cannot perform restart operation
412263
require.Eventually(t, func() bool {
413264
err = fixture.IsHealthy(ctx)
414265
if err != nil {
415266
t.Logf("waiting for agent healthy: %s", err.Error())
416267
return false
417268
}
418269
return true
419-
}, 30*time.Second, 1*time.Second)
270+
}, 1*time.Minute, 1*time.Second)
420271

421272
// run this only for logs for now
422273
tc := tests[0]

0 commit comments

Comments
 (0)