Skip to content

Commit 7c98587

Browse files
committed
Add filebeat receiver to otel mode (#5833)
(cherry picked from commit 89dca1d) # Conflicts: # NOTICE.txt # go.mod # go.sum
1 parent 4dc8aa1 commit 7c98587

File tree

7 files changed

+38545
-18348
lines changed

7 files changed

+38545
-18348
lines changed

NOTICE.txt

Lines changed: 37856 additions & 18279 deletions
Large diffs are not rendered by default.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add filebeat otel receiver
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; a word indicating the component this changeset affects.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/elastic-agent/pull/5833
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

dev-tools/notice/overrides.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
{"name": "github.com/pascaldekloe/goe", "licenceFile": "LICENSE", "licenceType": "CC0-1.0"}
1919
{"name": "github.com/dnaeon/go-vcr", "licenceFile": "LICENSE", "licenceType": "BSD-2-Clause"}
2020
{"name": "github.com/grpc-ecosystem/go-grpc-middleware/v2", "licenceFile": "LICENSE", "licenceType": "Apache-2.0"}
21+
{"name": "github.com/JohnCGriffin/overflow", "licenceFile": "README.md", "licenceType": "MIT"}

go.mod

Lines changed: 153 additions & 14 deletions
Large diffs are not rendered by default.

go.sum

Lines changed: 382 additions & 45 deletions
Large diffs are not rendered by default.

internal/pkg/otel/components.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
zipkinreceiver "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
2727
otlpreceiver "go.opentelemetry.io/collector/receiver/otlpreceiver"
2828

29+
fbreceiver "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver"
30+
2931
// Processors:
3032
attributesprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" // for modifying signal attributes
3133
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
@@ -76,6 +78,7 @@ func components() (otelcol.Factories, error) {
7678
prometheusreceiver.NewFactory(),
7779
jaegerreceiver.NewFactory(),
7880
zipkinreceiver.NewFactory(),
81+
fbreceiver.NewFactory(),
7982
)
8083
if err != nil {
8184
return otelcol.Factories{}, err

testing/integration/otel_test.go

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"strings"
1717
"sync"
1818
"testing"
19+
"text/template"
1920
"time"
2021

2122
"github.com/stretchr/testify/require"
@@ -126,7 +127,7 @@ func TestOtelFileProcessing(t *testing.T) {
126127
// otel mode should be detected automatically
127128
tempDir := t.TempDir()
128129
cfgFilePath := filepath.Join(tempDir, "otel.yml")
129-
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0600))
130+
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0o600))
130131

131132
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
132133
require.NoError(t, err)
@@ -183,7 +184,7 @@ func TestOtelFileProcessing(t *testing.T) {
183184

184185
func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesting.Fixture, tempDir string) {
185186
cfgFilePath := filepath.Join(tempDir, "otel-valid.yml")
186-
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0600))
187+
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileProcessingConfig), 0o600))
187188

188189
// check `elastic-agent otel validate` command works for otel config
189190
cmd, err := fixture.PrepareAgentCommand(ctx, []string{"otel", "validate", "--config", cfgFilePath})
@@ -199,7 +200,7 @@ func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesti
199200

200201
// check `elastic-agent otel validate` command works for invalid otel config
201202
cfgFilePath = filepath.Join(tempDir, "otel-invalid.yml")
202-
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileInvalidOtelConfig), 0600))
203+
require.NoError(t, os.WriteFile(cfgFilePath, []byte(fileInvalidOtelConfig), 0o600))
203204

204205
out, err = fixture.Exec(ctx, []string{"otel", "validate", "--config", cfgFilePath})
205206
require.Error(t, err)
@@ -276,7 +277,7 @@ func TestOtelLogsIngestion(t *testing.T) {
276277
logsIngestionConfig = strings.ReplaceAll(logsIngestionConfig, "{{.TestId}}", testId)
277278

278279
cfgFilePath := filepath.Join(tempDir, "otel.yml")
279-
require.NoError(t, os.WriteFile(cfgFilePath, []byte(logsIngestionConfig), 0600))
280+
require.NoError(t, os.WriteFile(cfgFilePath, []byte(logsIngestionConfig), 0o600))
280281

281282
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
282283
require.NoError(t, err)
@@ -300,7 +301,7 @@ func TestOtelLogsIngestion(t *testing.T) {
300301

301302
// Write logs to input file.
302303
logsCount := 10_000
303-
inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0600)
304+
inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0o600)
304305
require.NoError(t, err)
305306
for i := 0; i < logsCount; i++ {
306307
_, err = fmt.Fprintf(inputFile, "This is a test log message %d\n", i+1)
@@ -357,8 +358,8 @@ func TestOtelAPMIngestion(t *testing.T) {
357358
cfgFilePath := filepath.Join(tempDir, "otel.yml")
358359
fileName := "content.log"
359360
apmConfig := fmt.Sprintf(apmOtelConfig, filepath.Join(tempDir, fileName), testId)
360-
require.NoError(t, os.WriteFile(cfgFilePath, []byte(apmConfig), 0600))
361-
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte{}, 0600))
361+
require.NoError(t, os.WriteFile(cfgFilePath, []byte(apmConfig), 0o600))
362+
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte{}, 0o600))
362363

363364
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", cfgFilePath}))
364365
require.NoError(t, err)
@@ -426,7 +427,7 @@ func TestOtelAPMIngestion(t *testing.T) {
426427
)
427428
require.NoError(t, err, "APM not initialized")
428429

429-
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0600))
430+
require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0o600))
430431

431432
// check index
432433
var hits int
@@ -436,10 +437,12 @@ func TestOtelAPMIngestion(t *testing.T) {
436437

437438
// apm mismatch or proper docs in ES
438439

439-
watchLines := linesTrackMap([]string{"This is a test error message",
440+
watchLines := linesTrackMap([]string{
441+
"This is a test error message",
440442
"This is a test debug message 2",
441443
"This is a test debug message 3",
442-
"This is a test debug message 4"})
444+
"This is a test debug message 4",
445+
})
443446

444447
// failed to get APM version mismatch in time
445448
// processing should be running
@@ -535,3 +538,108 @@ func mapAtLeastOneTrue(mm map[string]bool) bool {
535538

536539
return false
537540
}
541+
542+
func TestFileBeatReceiver(t *testing.T) {
543+
define.Require(t, define.Requirements{
544+
Group: Default,
545+
Local: true,
546+
OS: []define.OS{
547+
// {Type: define.Windows}, we don't support otel on Windows yet
548+
{Type: define.Linux},
549+
{Type: define.Darwin},
550+
},
551+
})
552+
553+
type otelConfigOptions struct {
554+
Message string
555+
Output string
556+
HomeDir string
557+
}
558+
testMessage := "supercalifragilisticexpialidocious"
559+
tmpDir := t.TempDir()
560+
exporterOutputPath := filepath.Join(tmpDir, "output.json")
561+
t.Cleanup(func() {
562+
if t.Failed() {
563+
contents, err := os.ReadFile(exporterOutputPath)
564+
if err != nil {
565+
t.Logf("No exporter output file")
566+
return
567+
}
568+
t.Logf("Contents of exporter output file:\n%s\n", string(contents))
569+
}
570+
})
571+
otelConfigPath := filepath.Join(tmpDir, "otel.yml")
572+
otelConfigTemplate := `receivers:
573+
filebeatreceiver:
574+
filebeat:
575+
inputs:
576+
- type: benchmark
577+
enabled: true
578+
count: 1
579+
message: {{.Message}}
580+
output:
581+
otelconsumer:
582+
logging:
583+
level: info
584+
selectors:
585+
- '*'
586+
path.home: {{.HomeDir}}
587+
exporters:
588+
file/no_rotation:
589+
path: {{.Output}}
590+
service:
591+
pipelines:
592+
logs:
593+
receivers: [filebeatreceiver]
594+
exporters: [file/no_rotation]
595+
`
596+
597+
var otelConfigBuffer bytes.Buffer
598+
require.NoError(t,
599+
template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer,
600+
otelConfigOptions{
601+
Message: testMessage,
602+
Output: exporterOutputPath,
603+
HomeDir: tmpDir,
604+
}))
605+
require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600))
606+
t.Cleanup(func() {
607+
if t.Failed() {
608+
contents, err := os.ReadFile(otelConfigPath)
609+
if err != nil {
610+
t.Logf("no otel config file")
611+
return
612+
}
613+
t.Logf("Contents of otel config file:\n%s\n", string(contents))
614+
}
615+
})
616+
fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath}))
617+
require.NoError(t, err)
618+
619+
ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
620+
defer cancel()
621+
err = fixture.Prepare(ctx, fakeComponent)
622+
require.NoError(t, err)
623+
624+
var fixtureWg sync.WaitGroup
625+
fixtureWg.Add(1)
626+
go func() {
627+
defer fixtureWg.Done()
628+
err = fixture.RunOtelWithClient(ctx)
629+
}()
630+
631+
require.Eventually(t,
632+
func() bool {
633+
content, err := os.ReadFile(exporterOutputPath)
634+
if err != nil || len(content) == 0 {
635+
return false
636+
}
637+
return bytes.Contains(content, []byte(testMessage))
638+
},
639+
3*time.Minute, 1*time.Second,
640+
fmt.Sprintf("there should be exported logs by now"))
641+
642+
cancel()
643+
fixtureWg.Wait()
644+
require.True(t, err == nil || err == context.Canceled || err == context.DeadlineExceeded, "Retrieved unexpected error: %s", err.Error())
645+
}

0 commit comments

Comments
 (0)