Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions x-pack/filebeat/fbreceiver/receiver_leak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package fbreceiver

import (
"errors"
"path/filepath"
"runtime"
"testing"
"time"

Expand All @@ -25,13 +23,7 @@ import (
)

func TestLeak(t *testing.T) {
monitorSocket := genSocketPath()
var monitorHost string
if runtime.GOOS == "windows" {
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
} else {
monitorHost = "unix://" + monitorSocket
}
monitorHost := hostFromSocket(genSocketPath(t))
config := Config{
Beatconfig: map[string]any{
"filebeat": map[string]any{
Expand Down Expand Up @@ -81,7 +73,6 @@ func TestLeak(t *testing.T) {
consumeLogs := oteltest.DummyConsumer{ConsumeError: errors.New("cannot publish data")}
startAndStopReceiver(t, factory, &consumeLogs, &config)
})

}

// StartAndStopReceiver creates a receiver using the provided parameters, starts it, verifies that the expected logs
Expand Down
297 changes: 146 additions & 151 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ package fbreceiver
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"math/rand/v2"
"net"
"net/http"
"net/url"
Expand All @@ -23,9 +21,7 @@ import (
"sync/atomic"
"testing"

"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -36,16 +32,14 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestNewReceiver(t *testing.T) {
monitorSocket := genSocketPath()
var monitorHost string
if runtime.GOOS == "windows" {
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
} else {
monitorHost = "unix://" + monitorSocket
}
monitorSocket := genSocketPath(t)
monitorHost := hostFromSocket(monitorSocket)
config := Config{
Beatconfig: map[string]any{
"filebeat": map[string]any{
Expand Down Expand Up @@ -164,147 +158,134 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
}
}

func TestMultipleReceivers(t *testing.T) {
// This test verifies that multiple receivers can be instantiated
// in isolation, started, and can ingest logs without interfering
// with each other.

// Receivers need distinct home directories so wrap the config in a function.
config := func(monitorSocket string, homePath string, ingestPath string) *Config {
var monitorHost string
if runtime.GOOS == "windows" {
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
} else {
monitorHost = "unix://" + monitorSocket
}
return &Config{
Beatconfig: map[string]any{
"filebeat": map[string]any{
"inputs": []map[string]any{
{
"type": "benchmark",
"enabled": true,
"message": "test",
"count": 1,
},
{
"type": "filestream",
"enabled": true,
"id": "must-be-unique",
"paths": []string{ingestPath},
"file_identity.native": nil,
},
// multiReceiverConfig creates a Config for testing multiple receivers.
// Each receiver gets a unique home path.
func multiReceiverConfig(helper multiReceiverHelper) *Config {
return &Config{
Beatconfig: map[string]any{
"filebeat": map[string]any{
"inputs": []map[string]any{
{
"type": "benchmark",
"enabled": true,
"message": "test",
"count": 1,
},
},
"logging": map[string]any{
"level": "info",
"selectors": []string{
"*",
{
"type": "filestream",
"enabled": true,
"id": "must-be-unique",
"paths": []string{helper.ingest},
"file_identity.native": nil,
},
},
"path.home": homePath,
"http.enabled": true,
"http.host": monitorHost,
},
}
"logging": map[string]any{
"level": "info",
"selectors": []string{
"*",
},
},
"path.home": helper.home,
"http.enabled": true,
"http.host": hostFromSocket(helper.monitorSocket),
},
}
}

type multiReceiverHelper struct {
name string
home string
ingest string
monitorSocket string
}

func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
return multiReceiverHelper{
name: fmt.Sprintf("r%d", number),
home: t.TempDir(),
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
monitorSocket: genSocketPath(t),
}
}

// TestMultipleReceivers verifies that multiple receivers can be instantiated in isolation, started, and can ingest logs
// without interfering with each other.
func TestMultipleReceivers(t *testing.T) {
const nReceivers = 2

factory := NewFactory()
monitorSocket1 := genSocketPath()
monitorSocket2 := genSocketPath()
dir1 := t.TempDir()
dir2 := t.TempDir()
ingest1 := filepath.Join(t.TempDir(), "test1.log")
ingest2 := filepath.Join(t.TempDir(), "test2.log")

helpers := make([]multiReceiverHelper, nReceivers)
configs := make([]oteltest.ReceiverConfig, nReceivers)
for i := range helpers {
helper := newMultiReceiverHelper(t, i)
helpers[i] = helper
configs[i] = oteltest.ReceiverConfig{
Name: helper.name,
Beat: "filebeat",
Config: multiReceiverConfig(helper),
Factory: factory,
}
}

oteltest.CheckReceivers(oteltest.CheckReceiversParams{
T: t,
NumRestarts: 5,
Receivers: []oteltest.ReceiverConfig{
{
Name: "r1",
Beat: "filebeat",
Config: config(monitorSocket1, dir1, ingest1),
Factory: factory,
},
{
Name: "r2",
Beat: "filebeat",
Config: config(monitorSocket2, dir2, ingest2),
Factory: factory,
},
},
Receivers: configs,
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
// Add data to be ingested with filestream
f1, err := os.OpenFile(ingest1, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
require.NoError(c, err)
_, err = f1.WriteString("A log line\n")
require.NoError(c, err)
f1.Close()
f2, err := os.OpenFile(ingest2, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
require.NoError(c, err)
_, err = f2.WriteString("A log line\n")
require.NoError(c, err)
f2.Close()

require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")

assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected r1 message field to be 'test'")
assert.Equal(c, "test", logs["r2"][0].Flatten()["message"], "expected r2 message field to be 'test'")

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
// overwritten when multiple receivers started in the same process.
r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r1"))
assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log")
r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r2"))
assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log")

meta1Path := filepath.Join(dir1, "/data/meta.json")
assert.FileExists(c, meta1Path, "dir1/data/meta.json should exist")
meta1Data, err := os.ReadFile(meta1Path)
assert.NoError(c, err)

meta2Path := filepath.Join(dir2, "/data/meta.json")
assert.FileExists(c, meta2Path, "dir2/data/meta.json should exist")
meta2Data, err := os.ReadFile(meta2Path)
assert.NoError(c, err)

assert.NotEqual(c, meta1Data, meta2Data, "meta data files should be different")
allMetaData := make([]string, 0, nReceivers)
allRegData := make([]string, 0, nReceivers)
for _, helper := range helpers {
writeFile(c, helper.ingest, "A log line")

require.Greaterf(c, len(logs[helper.name]), 0, "receiver %v does not have any logs", helper)

assert.Equalf(c, "test", logs[helper.name][0].Flatten()["message"], "expected %v message field to be 'test'", helper)

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
// overwritten when multiple receivers started in the same process.
startLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/"+helper.name))
assert.Equalf(c, 1, startLogs.Len(), "%v should have a single start log", helper)

metaPath := filepath.Join(helper.home, "/data/meta.json")
assert.FileExistsf(c, metaPath, "%s of %v should exist", metaPath, helper)
metaData, err := os.ReadFile(metaPath)
assert.NoError(c, err)
allMetaData = append(allMetaData, string(metaData))

var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, helper.monitorSocket, "stats")
}, "failed to connect to monitoring socket of %v, stats endpoint, last error was: %s", helper, &lastError)
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, helper.monitorSocket, "inputs")
}, "failed to connect to monitoring socket of %v, inputs endpoint, last error was: %s", helper, &lastError)

ingestJson, err := json.Marshal(helper.ingest)
assert.NoError(c, err)

regPath := filepath.Join(helper.home, "/data/registry/filebeat/log.json")
assert.FileExistsf(c, regPath, "receiver %v filebeat registry should exist", helper)
regData, err := os.ReadFile(regPath)
allRegData = append(allRegData, string(regData))
assert.NoError(c, err)
assert.Containsf(c, string(regData), string(ingestJson), "receiver %v registry should contain '%s', but was: %s", helper, string(ingestJson), string(regData))
}

var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket1, "stats")
}, "failed to connect to monitoring socket1, stats endpoint, last error was: %s", &lastError)
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket1, "inputs")
}, "failed to connect to monitoring socket1, inputs endpoint, last error was: %s", &lastError)
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket2, "stats")
}, "failed to connect to monitoring socket2, stats endpoint, last error was: %s", &lastError)
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket2, "inputs")
}, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s", &lastError)

ingest1Json, err := json.Marshal(ingest1)
require.NoError(c, err)
ingest2Json, err := json.Marshal(ingest2)
require.NoError(c, err)

reg1Path := filepath.Join(dir1, "/data/registry/filebeat/log.json")
require.FileExists(c, reg1Path, "receiver 1 filebeat registry should exist")
reg1Data, err := os.ReadFile(reg1Path)
require.NoError(c, err)
require.Containsf(c, string(reg1Data), string(ingest1Json), "receiver 1 registry should contain '%s', but was: %s", string(ingest1Json), string(reg1Data))
require.NotContainsf(c, string(reg1Data), string(ingest2Json), "receiver 1 registry should not contain '%s', but was: %s", string(ingest2Json), string(reg1Data))

reg2Path := filepath.Join(dir2, "/data/registry/filebeat/log.json")
require.FileExists(c, reg2Path, "receiver 2 filebeat registry should exist")
reg2Data, err := os.ReadFile(reg2Path)
require.NoError(c, err)
require.Containsf(c, string(reg2Data), string(ingest2Json), "receiver 2 registry should contain '%s', but was: %s", string(ingest2Json), string(reg2Data))
require.NotContainsf(c, string(reg2Data), string(ingest1Json), "receiver 2 registry should not contain '%s', but was: %s", string(ingest1Json), string(reg2Data))
for i := range nReceivers {
for j := range nReceivers {
if i == j {
continue
}
h1 := helpers[i]
h2 := helpers[j]
assert.NotEqualf(c, allMetaData[i], allMetaData[j], "meta data files between %v and %v should be different", h1, h2)
assert.NotContainsf(c, allRegData[i], allRegData[j], "receiver %v registry should not contain data from %v registry", h1, h2)
}
}
},
})
}
Expand Down Expand Up @@ -380,14 +361,14 @@ func TestReceiverDegraded(t *testing.T) {
}
}

func genSocketPath() string {
randData := make([]byte, 16)
for i := range len(randData) {
randData[i] = uint8(rand.UintN(255)) //nolint:gosec // 0-255 fits in a uint8
}
socketName := base64.URLEncoding.EncodeToString(randData) + ".sock"
socketDir := os.TempDir()
return filepath.Join(socketDir, socketName)
func genSocketPath(t *testing.T) string {
t.Helper()
socketName, err := uuid.NewV4()
require.NoError(t, err)
// Use os.TempDir() for short Unix socket paths
sockPath := filepath.Join(os.TempDir(), socketName.String()+".sock")
t.Cleanup(func() { _ = os.Remove(sockPath) })
return sockPath
}

func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoint string) bool {
Expand All @@ -397,8 +378,8 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
}
client := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketPath)
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
},
},
}
Expand Down Expand Up @@ -612,3 +593,17 @@ func TestReceiverHook(t *testing.T) {
// one for beat metrics, one for input metrics and one for getting the registry.
oteltest.TestReceiverHook(t, &cfg, NewFactory(), receiverSettings, 3)
}

func hostFromSocket(socket string) string {
if runtime.GOOS == "windows" {
return "npipe:///" + filepath.Base(socket)
}
return "unix://" + socket
}

func writeFile(t require.TestingT, path string, data string) {
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
require.NoErrorf(t, err, "Could not open file %s", path)
_, err = f.WriteString(data + "\n")
require.NoErrorf(t, err, "Could not write %s to file %s", data, path)
}