Skip to content

Commit 11a9005

Browse files
VihasMakwanahayotbisonai
authored andcommitted
[edot][diagnostics] remove otel diagnostics from manager (elastic#10415)
* chore: remove otel diagnostics from manager * test * more testing and refactor * fix tests * refactor
1 parent 2abb071 commit 11a9005

File tree

5 files changed

+66
-457
lines changed

5 files changed

+66
-457
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_diagnostics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func TestDiagnosticsHandlerWithEDOT(t *testing.T) {
419419
err := os.MkdirAll(path.Join(tempAgentRoot, "data"), 0755)
420420
require.NoError(t, err)
421421
called := false
422-
s := NewMockServer(t, paths.DiagnosticsExtensionSocket(), &called)
422+
s := NewMockServer(t, paths.DiagnosticsExtensionSocket(), &called, nil)
423423
defer func() {
424424
require.NoError(t, s.Shutdown(context.Background()))
425425
}()

internal/pkg/agent/application/actions/handlers/mock_server_test.go renamed to internal/pkg/agent/application/actions/handlers/mock_server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ import (
1717
"github.com/elastic/elastic-agent/pkg/ipc"
1818
)
1919

20-
func NewMockServer(t *testing.T, host string, called *bool) *http.Server {
20+
func NewMockServer(t *testing.T, host string, called *bool, response *elasticdiagnostics.Response) *http.Server {
2121
mux := http.NewServeMux()
2222
mux.HandleFunc("/diagnostics", func(w http.ResponseWriter, r *http.Request) {
23-
*called = true
23+
if called != nil {
24+
*called = true
25+
}
2426
resp := elasticdiagnostics.Response{
2527
GlobalDiagnostics: []*proto.ActionDiagnosticUnitResult{
2628
{
@@ -31,6 +33,10 @@ func NewMockServer(t *testing.T, host string, called *bool) *http.Server {
3133
},
3234
},
3335
}
36+
if response != nil {
37+
// overwrite default response
38+
resp = *response
39+
}
3440
err := json.NewEncoder(w).Encode(resp)
3541
require.NoError(t, err)
3642
w.Header().Set("Content-Type", "application/json")

internal/pkg/otel/manager/diagnostics.go

Lines changed: 5 additions & 293 deletions
Original file line numberDiff line numberDiff line change
@@ -5,39 +5,19 @@
55
package manager
66

77
import (
8-
"archive/tar"
9-
"bytes"
10-
"compress/gzip"
118
"context"
12-
"encoding/json"
139
"errors"
1410
"fmt"
15-
"io"
16-
"io/fs"
17-
"os"
18-
"path/filepath"
19-
"regexp"
2011
"strings"
2112
"syscall"
2213

23-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
24-
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
2514
"github.com/elastic/elastic-agent/internal/pkg/otel"
2615

27-
"google.golang.org/protobuf/types/known/timestamppb"
28-
29-
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
30-
31-
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
32-
"github.com/elastic/elastic-agent/pkg/core/logger"
33-
3416
"github.com/elastic/elastic-agent/pkg/component"
3517
"github.com/elastic/elastic-agent/pkg/component/runtime"
3618
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
3719
)
3820

39-
var fileBeatRegistryPathRegExps = getRegexpsForRegistryFiles()
40-
4121
// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
4222
// it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning
4323
// is logged.
@@ -138,285 +118,17 @@ func (m *OTelManager) PerformComponentDiagnostics(
138118
}
139119

140120
for idx, diag := range diagnostics {
121+
found := false
141122
for _, extDiag := range extDiagnostics.ComponentDiagnostics {
142123
if strings.Contains(extDiag.Name, diag.Component.ID) {
143-
diagnostics[idx].Results = append(diag.Results, extDiag)
124+
found = true
125+
diagnostics[idx].Results = append(diagnostics[idx].Results, extDiag)
144126
}
145127
}
146-
}
147-
148-
for idx, diag := range diagnostics {
149-
var results []*proto.ActionDiagnosticUnitResult
150-
var errs []error
151-
jsonMetricDiagnostic, err := GetBeatJsonMetricsDiagnostics(ctx, diag.Component.ID)
152-
errs = append(errs, err)
153-
if jsonMetricDiagnostic != nil {
154-
results = append(results, jsonMetricDiagnostic)
155-
}
156-
157-
inputMetricsDiagnostic, err := GetBeatInputMetricsDiagnostics(ctx, diag.Component.ID)
158-
errs = append(errs, err)
159-
if inputMetricsDiagnostic != nil {
160-
results = append(results, inputMetricsDiagnostic)
161-
}
162-
163-
if translate.GetBeatNameForComponent(&diag.Component) == "filebeat" {
164-
// include filebeat registry, reimplementation of a filebeat diagnostic hook
165-
registryTarGzBytes, err := FileBeatRegistryTarGz(m.logger, diag.Component.ID)
166-
if err != nil {
167-
errs = append(errs, fmt.Errorf("failed to get filebeat registry archive: %w", err))
168-
}
169-
if registryTarGzBytes != nil {
170-
m.logger.Debugf("created registry tar.gz, size %d", len(registryTarGzBytes))
171-
results = append(results, &proto.ActionDiagnosticUnitResult{
172-
Name: "registry",
173-
Description: "Filebeat's registry",
174-
Filename: "registry.tar.gz",
175-
ContentType: "application/octet-stream",
176-
Content: registryTarGzBytes,
177-
Generated: timestamppb.Now(),
178-
})
179-
}
180-
128+
if !found {
129+
diagnostics[idx].Err = fmt.Errorf("failed to get diagnostics for %s", diag.Component.ID)
181130
}
182-
183-
diagnostics[idx].Results = append(diagnostics[idx].Results, results...)
184-
diagnostics[idx].Err = errors.Join(errs...)
185131
}
186132

187133
return diagnostics, nil
188134
}
189-
190-
func GetBeatJsonMetricsDiagnostics(ctx context.Context, componentID string) (*proto.ActionDiagnosticUnitResult, error) {
191-
beatMetrics, err := GetBeatMetricsPayload(ctx, componentID, "/stats")
192-
if err != nil {
193-
return nil, fmt.Errorf("failed to get stats beat metrics: %w", err)
194-
}
195-
196-
beatMetrics, err = formatJSON(beatMetrics)
197-
if err != nil {
198-
return nil, fmt.Errorf("failed to format stats beat metrics: %w", err)
199-
}
200-
201-
result := &proto.ActionDiagnosticUnitResult{
202-
Name: "beat_metrics",
203-
Description: "Metrics from the default monitoring namespace and expvar.",
204-
Filename: "beat_metrics.json",
205-
ContentType: "application/json",
206-
Content: beatMetrics,
207-
Generated: timestamppb.Now(),
208-
}
209-
return result, nil
210-
}
211-
212-
func GetBeatInputMetricsDiagnostics(ctx context.Context, componentID string) (*proto.ActionDiagnosticUnitResult, error) {
213-
inputMetrics, err := GetBeatMetricsPayload(ctx, componentID, "/inputs/")
214-
if err != nil {
215-
return nil, fmt.Errorf("failed to get input beat metrics: %w", err)
216-
}
217-
218-
inputMetrics, err = formatJSON(inputMetrics)
219-
if err != nil {
220-
return nil, fmt.Errorf("failed to format input beat metrics: %w", err)
221-
}
222-
223-
result := &proto.ActionDiagnosticUnitResult{
224-
Name: "input_metrics",
225-
Description: "Metrics from active inputs.",
226-
Filename: "input_metrics.json",
227-
ContentType: "application/json",
228-
Content: inputMetrics,
229-
Generated: timestamppb.Now(),
230-
}
231-
return result, nil
232-
}
233-
234-
func GetBeatMetricsPayload(ctx context.Context, componentID string, path string) ([]byte, error) {
235-
endpoint := componentmonitoring.PrefixedEndpoint(componentmonitoring.BeatsMonitoringEndpoint(componentID))
236-
metricBytes, statusCode, err := monitoring.GetProcessMetrics(ctx, endpoint, path)
237-
if err != nil {
238-
return nil, err
239-
}
240-
if statusCode >= 300 {
241-
return nil, fmt.Errorf("unexpected status code %d", statusCode)
242-
}
243-
return metricBytes, nil
244-
}
245-
246-
func formatJSON(jsonBytes []byte) ([]byte, error) {
247-
// remarshal the metrics to produce nicely formatted json
248-
var data any
249-
if err := json.Unmarshal(jsonBytes, &data); err != nil {
250-
return nil, err
251-
}
252-
253-
formattedData, err := json.MarshalIndent(data, "", " ")
254-
if err != nil {
255-
return nil, err
256-
}
257-
return formattedData, nil
258-
}
259-
260-
func FileBeatRegistryPath(componentID string) string {
261-
dataPath := translate.BeatDataPath(componentID)
262-
return filepath.Join(dataPath, "registry")
263-
}
264-
265-
// FileBeatRegistryTarGz creates a tar.gz file containing the filebeat registry and returns its contents as bytes.
266-
func FileBeatRegistryTarGz(logger *logger.Logger, componentID string) ([]byte, error) {
267-
registryPath := FileBeatRegistryPath(componentID)
268-
269-
tempFile, err := os.CreateTemp("", "temp-registry.tar.gz")
270-
if err != nil {
271-
return nil, err
272-
}
273-
274-
defer func() {
275-
if closeErr := tempFile.Close(); closeErr != nil {
276-
logger.Warn("error closing temporary registry archive", "error", closeErr)
277-
}
278-
if removeErr := os.Remove(tempFile.Name()); removeErr != nil {
279-
logger.Warnf("cannot remove temporary registry archive '%s': '%s'", tempFile.Name(), removeErr)
280-
}
281-
}()
282-
283-
gzWriter := gzip.NewWriter(tempFile)
284-
defer func() {
285-
if closeErr := gzWriter.Close(); closeErr != nil {
286-
logger.Warnf("error closing gzip writer: %v", closeErr)
287-
}
288-
}()
289-
290-
err = tarFolder(logger, gzWriter, registryPath)
291-
if err != nil {
292-
return nil, err
293-
}
294-
if closeErr := gzWriter.Close(); closeErr != nil {
295-
return nil, closeErr
296-
}
297-
298-
stat, err := tempFile.Stat()
299-
if err != nil {
300-
return nil, err
301-
}
302-
303-
if stat.Size() > 20_000_000 {
304-
return nil, fmt.Errorf("registry is too large for diagnostics, %d > 20mb", stat.Size()/1_000_000)
305-
}
306-
307-
var output bytes.Buffer
308-
_, err = tempFile.Seek(0, 0)
309-
if err != nil {
310-
return nil, err
311-
}
312-
_, err = io.Copy(&output, tempFile)
313-
if err != nil {
314-
return nil, err
315-
}
316-
317-
return output.Bytes(), nil
318-
}
319-
320-
// getRegexpsForRegistryFiles returns a list of regexps to match filebeat registry files.
321-
func getRegexpsForRegistryFiles() []*regexp.Regexp {
322-
var registryFileRegExps []*regexp.Regexp
323-
preFilesList := [][]string{
324-
{"^registry$"},
325-
{"^registry", "filebeat$"},
326-
{"^registry", "filebeat", "meta\\.json$"},
327-
{"^registry", "filebeat", "log\\.json$"},
328-
{"^registry", "filebeat", "active\\.dat$"},
329-
{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
330-
}
331-
332-
for _, lst := range preFilesList {
333-
// On windows, we need to ensure we escape the path separator, because backslash has a special meaning
334-
separator := regexp.QuoteMeta(string(filepath.Separator))
335-
pathRe := strings.Join(lst, separator)
336-
re := regexp.MustCompile(pathRe)
337-
registryFileRegExps = append(registryFileRegExps, re)
338-
}
339-
340-
return registryFileRegExps
341-
}
342-
343-
// tarFolder creates a tar archive from the folder src and stores it at dst.
344-
//
345-
// dst must be the full path with extension, e.g: /tmp/foo.tar
346-
// If src is not a folder an error is returned
347-
func tarFolder(logger *logger.Logger, dst io.Writer, srcPath string) error {
348-
fullPath, err := filepath.Abs(srcPath)
349-
if err != nil {
350-
return fmt.Errorf("cannot get full path from '%s': '%w'", srcPath, err)
351-
}
352-
353-
tarWriter := tar.NewWriter(dst)
354-
defer func() {
355-
if err := tarWriter.Close(); err != nil {
356-
logger.Warnf("cannot close tar writer: '%s'", err)
357-
}
358-
}()
359-
360-
info, err := os.Stat(fullPath)
361-
if err != nil {
362-
return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err)
363-
}
364-
365-
if !info.IsDir() {
366-
return fmt.Errorf("'%s' is not a directory", fullPath)
367-
}
368-
baseDir := filepath.Base(srcPath)
369-
370-
logger.Debugf("starting to walk '%s'", fullPath)
371-
372-
return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error {
373-
// Stop if there is any errors
374-
if prevErr != nil {
375-
return prevErr
376-
}
377-
378-
pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, srcPath))
379-
if !matchRegistryFiles(fileBeatRegistryPathRegExps, pathInTar) {
380-
return nil
381-
}
382-
header, err := tar.FileInfoHeader(info, info.Name())
383-
if err != nil {
384-
return fmt.Errorf("cannot create tar info header: '%w'", err)
385-
}
386-
header.Name = pathInTar
387-
388-
if err := tarWriter.WriteHeader(header); err != nil {
389-
return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err)
390-
}
391-
392-
if info.IsDir() {
393-
return nil
394-
}
395-
396-
file, err := os.Open(path)
397-
if err != nil {
398-
return fmt.Errorf("cannot open '%s' for reading: '%w", path, err)
399-
}
400-
defer func() {
401-
if closeErr := file.Close(); closeErr != nil {
402-
logger.Warnf("cannot close file '%s': '%s'", path, closeErr)
403-
}
404-
}()
405-
406-
logger.Debugf("adding '%s' to the tar archive", file.Name())
407-
if _, err := io.Copy(tarWriter, file); err != nil {
408-
return fmt.Errorf("cannot read '%s': '%w'", path, err)
409-
}
410-
411-
return nil
412-
})
413-
}
414-
415-
func matchRegistryFiles(registryFileRegExps []*regexp.Regexp, path string) bool {
416-
for _, regExp := range registryFileRegExps {
417-
if regExp.MatchString(path) {
418-
return true
419-
}
420-
}
421-
return false
422-
}

0 commit comments

Comments
 (0)