Skip to content

Commit 135d04f

Browse files
authored
fbreceiver: refactor receiver tests for maintainability (#47969)
## Proposed commit message Changes: - Refactor TestMultipleReceivers to easily scale to n>2 receivers - Clean up *.sock files - Add multiReceiverHelper struct to encapsulate receiver test setup - Add helper functions: hostFromSocket, writeFile - Use UUID for socket path generation instead of random bytes - Use DialContext instead of Dial to fix noctx lint warning Pulled out from #47870 for easier review.
1 parent e08d9b3 commit 135d04f

File tree

2 files changed

+147
-161
lines changed

2 files changed

+147
-161
lines changed

x-pack/filebeat/fbreceiver/receiver_leak_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ package fbreceiver
66

77
import (
88
"errors"
9-
"path/filepath"
10-
"runtime"
119
"testing"
1210
"time"
1311

@@ -25,13 +23,7 @@ import (
2523
)
2624

2725
func TestLeak(t *testing.T) {
28-
monitorSocket := genSocketPath()
29-
var monitorHost string
30-
if runtime.GOOS == "windows" {
31-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
32-
} else {
33-
monitorHost = "unix://" + monitorSocket
34-
}
26+
monitorHost := hostFromSocket(genSocketPath(t))
3527
config := Config{
3628
Beatconfig: map[string]any{
3729
"filebeat": map[string]any{
@@ -81,7 +73,6 @@ func TestLeak(t *testing.T) {
8173
consumeLogs := oteltest.DummyConsumer{ConsumeError: errors.New("cannot publish data")}
8274
startAndStopReceiver(t, factory, &consumeLogs, &config)
8375
})
84-
8576
}
8677

8778
// StartAndStopReceiver creates a receiver using the provided parameters, starts it, verifies that the expected logs

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 146 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ package fbreceiver
77
import (
88
"bytes"
99
"context"
10-
"encoding/base64"
1110
"encoding/json"
1211
"fmt"
1312
"io"
14-
"math/rand/v2"
1513
"net"
1614
"net/http"
1715
"net/url"
@@ -23,9 +21,7 @@ import (
2321
"sync/atomic"
2422
"testing"
2523

26-
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
27-
"github.com/elastic/elastic-agent-libs/mapstr"
28-
24+
"github.com/gofrs/uuid/v5"
2925
"github.com/stretchr/testify/assert"
3026
"github.com/stretchr/testify/require"
3127
"go.opentelemetry.io/collector/component"
@@ -36,16 +32,14 @@ import (
3632
"go.uber.org/zap"
3733
"go.uber.org/zap/zapcore"
3834
"go.uber.org/zap/zaptest/observer"
35+
36+
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
37+
"github.com/elastic/elastic-agent-libs/mapstr"
3938
)
4039

4140
func TestNewReceiver(t *testing.T) {
42-
monitorSocket := genSocketPath()
43-
var monitorHost string
44-
if runtime.GOOS == "windows" {
45-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
46-
} else {
47-
monitorHost = "unix://" + monitorSocket
48-
}
41+
monitorSocket := genSocketPath(t)
42+
monitorHost := hostFromSocket(monitorSocket)
4943
config := Config{
5044
Beatconfig: map[string]any{
5145
"filebeat": map[string]any{
@@ -164,147 +158,134 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
164158
}
165159
}
166160

167-
func TestMultipleReceivers(t *testing.T) {
168-
// This test verifies that multiple receivers can be instantiated
169-
// in isolation, started, and can ingest logs without interfering
170-
// with each other.
171-
172-
// Receivers need distinct home directories so wrap the config in a function.
173-
config := func(monitorSocket string, homePath string, ingestPath string) *Config {
174-
var monitorHost string
175-
if runtime.GOOS == "windows" {
176-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
177-
} else {
178-
monitorHost = "unix://" + monitorSocket
179-
}
180-
return &Config{
181-
Beatconfig: map[string]any{
182-
"filebeat": map[string]any{
183-
"inputs": []map[string]any{
184-
{
185-
"type": "benchmark",
186-
"enabled": true,
187-
"message": "test",
188-
"count": 1,
189-
},
190-
{
191-
"type": "filestream",
192-
"enabled": true,
193-
"id": "must-be-unique",
194-
"paths": []string{ingestPath},
195-
"file_identity.native": nil,
196-
},
161+
// multiReceiverConfig creates a Config for testing multiple receivers.
162+
// Each receiver gets a unique home path.
163+
func multiReceiverConfig(helper multiReceiverHelper) *Config {
164+
return &Config{
165+
Beatconfig: map[string]any{
166+
"filebeat": map[string]any{
167+
"inputs": []map[string]any{
168+
{
169+
"type": "benchmark",
170+
"enabled": true,
171+
"message": "test",
172+
"count": 1,
197173
},
198-
},
199-
"logging": map[string]any{
200-
"level": "info",
201-
"selectors": []string{
202-
"*",
174+
{
175+
"type": "filestream",
176+
"enabled": true,
177+
"id": "must-be-unique",
178+
"paths": []string{helper.ingest},
179+
"file_identity.native": nil,
203180
},
204181
},
205-
"path.home": homePath,
206-
"http.enabled": true,
207-
"http.host": monitorHost,
208182
},
209-
}
183+
"logging": map[string]any{
184+
"level": "info",
185+
"selectors": []string{
186+
"*",
187+
},
188+
},
189+
"path.home": helper.home,
190+
"http.enabled": true,
191+
"http.host": hostFromSocket(helper.monitorSocket),
192+
},
210193
}
194+
}
195+
196+
type multiReceiverHelper struct {
197+
name string
198+
home string
199+
ingest string
200+
monitorSocket string
201+
}
202+
203+
func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
204+
return multiReceiverHelper{
205+
name: fmt.Sprintf("r%d", number),
206+
home: t.TempDir(),
207+
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
208+
monitorSocket: genSocketPath(t),
209+
}
210+
}
211+
212+
// TestMultipleReceivers verifies that multiple receivers can be instantiated in isolation, started, and can ingest logs
213+
// without interfering with each other.
214+
func TestMultipleReceivers(t *testing.T) {
215+
const nReceivers = 2
211216

212217
factory := NewFactory()
213-
monitorSocket1 := genSocketPath()
214-
monitorSocket2 := genSocketPath()
215-
dir1 := t.TempDir()
216-
dir2 := t.TempDir()
217-
ingest1 := filepath.Join(t.TempDir(), "test1.log")
218-
ingest2 := filepath.Join(t.TempDir(), "test2.log")
218+
219+
helpers := make([]multiReceiverHelper, nReceivers)
220+
configs := make([]oteltest.ReceiverConfig, nReceivers)
221+
for i := range helpers {
222+
helper := newMultiReceiverHelper(t, i)
223+
helpers[i] = helper
224+
configs[i] = oteltest.ReceiverConfig{
225+
Name: helper.name,
226+
Beat: "filebeat",
227+
Config: multiReceiverConfig(helper),
228+
Factory: factory,
229+
}
230+
}
231+
219232
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
220233
T: t,
221234
NumRestarts: 5,
222-
Receivers: []oteltest.ReceiverConfig{
223-
{
224-
Name: "r1",
225-
Beat: "filebeat",
226-
Config: config(monitorSocket1, dir1, ingest1),
227-
Factory: factory,
228-
},
229-
{
230-
Name: "r2",
231-
Beat: "filebeat",
232-
Config: config(monitorSocket2, dir2, ingest2),
233-
Factory: factory,
234-
},
235-
},
235+
Receivers: configs,
236236
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
237-
// Add data to be ingested with filestream
238-
f1, err := os.OpenFile(ingest1, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
239-
require.NoError(c, err)
240-
_, err = f1.WriteString("A log line\n")
241-
require.NoError(c, err)
242-
f1.Close()
243-
f2, err := os.OpenFile(ingest2, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
244-
require.NoError(c, err)
245-
_, err = f2.WriteString("A log line\n")
246-
require.NoError(c, err)
247-
f2.Close()
248-
249-
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
250-
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")
251-
252-
assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected r1 message field to be 'test'")
253-
assert.Equal(c, "test", logs["r2"][0].Flatten()["message"], "expected r2 message field to be 'test'")
254-
255-
// Make sure that each receiver has a separate logger
256-
// instance and does not interfere with others. Previously, the
257-
// logger in Beats was global, causing logger fields to be
258-
// overwritten when multiple receivers started in the same process.
259-
r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r1"))
260-
assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log")
261-
r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r2"))
262-
assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log")
263-
264-
meta1Path := filepath.Join(dir1, "/data/meta.json")
265-
assert.FileExists(c, meta1Path, "dir1/data/meta.json should exist")
266-
meta1Data, err := os.ReadFile(meta1Path)
267-
assert.NoError(c, err)
268-
269-
meta2Path := filepath.Join(dir2, "/data/meta.json")
270-
assert.FileExists(c, meta2Path, "dir2/data/meta.json should exist")
271-
meta2Data, err := os.ReadFile(meta2Path)
272-
assert.NoError(c, err)
273-
274-
assert.NotEqual(c, meta1Data, meta2Data, "meta data files should be different")
237+
allMetaData := make([]string, 0, nReceivers)
238+
allRegData := make([]string, 0, nReceivers)
239+
for _, helper := range helpers {
240+
writeFile(c, helper.ingest, "A log line")
241+
242+
require.Greaterf(c, len(logs[helper.name]), 0, "receiver %v does not have any logs", helper)
243+
244+
assert.Equalf(c, "test", logs[helper.name][0].Flatten()["message"], "expected %v message field to be 'test'", helper)
245+
246+
// Make sure that each receiver has a separate logger
247+
// instance and does not interfere with others. Previously, the
248+
// logger in Beats was global, causing logger fields to be
249+
// overwritten when multiple receivers started in the same process.
250+
startLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/"+helper.name))
251+
assert.Equalf(c, 1, startLogs.Len(), "%v should have a single start log", helper)
252+
253+
metaPath := filepath.Join(helper.home, "/data/meta.json")
254+
assert.FileExistsf(c, metaPath, "%s of %v should exist", metaPath, helper)
255+
metaData, err := os.ReadFile(metaPath)
256+
assert.NoError(c, err)
257+
allMetaData = append(allMetaData, string(metaData))
258+
259+
var lastError strings.Builder
260+
assert.Conditionf(c, func() bool {
261+
return getFromSocket(t, &lastError, helper.monitorSocket, "stats")
262+
}, "failed to connect to monitoring socket of %v, stats endpoint, last error was: %s", helper, &lastError)
263+
assert.Conditionf(c, func() bool {
264+
return getFromSocket(t, &lastError, helper.monitorSocket, "inputs")
265+
}, "failed to connect to monitoring socket of %v, inputs endpoint, last error was: %s", helper, &lastError)
266+
267+
ingestJson, err := json.Marshal(helper.ingest)
268+
assert.NoError(c, err)
269+
270+
regPath := filepath.Join(helper.home, "/data/registry/filebeat/log.json")
271+
assert.FileExistsf(c, regPath, "receiver %v filebeat registry should exist", helper)
272+
regData, err := os.ReadFile(regPath)
273+
allRegData = append(allRegData, string(regData))
274+
assert.NoError(c, err)
275+
assert.Containsf(c, string(regData), string(ingestJson), "receiver %v registry should contain '%s', but was: %s", helper, string(ingestJson), string(regData))
276+
}
275277

276-
var lastError strings.Builder
277-
assert.Conditionf(c, func() bool {
278-
return getFromSocket(t, &lastError, monitorSocket1, "stats")
279-
}, "failed to connect to monitoring socket1, stats endpoint, last error was: %s", &lastError)
280-
assert.Conditionf(c, func() bool {
281-
return getFromSocket(t, &lastError, monitorSocket1, "inputs")
282-
}, "failed to connect to monitoring socket1, inputs endpoint, last error was: %s", &lastError)
283-
assert.Conditionf(c, func() bool {
284-
return getFromSocket(t, &lastError, monitorSocket2, "stats")
285-
}, "failed to connect to monitoring socket2, stats endpoint, last error was: %s", &lastError)
286-
assert.Conditionf(c, func() bool {
287-
return getFromSocket(t, &lastError, monitorSocket2, "inputs")
288-
}, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s", &lastError)
289-
290-
ingest1Json, err := json.Marshal(ingest1)
291-
require.NoError(c, err)
292-
ingest2Json, err := json.Marshal(ingest2)
293-
require.NoError(c, err)
294-
295-
reg1Path := filepath.Join(dir1, "/data/registry/filebeat/log.json")
296-
require.FileExists(c, reg1Path, "receiver 1 filebeat registry should exist")
297-
reg1Data, err := os.ReadFile(reg1Path)
298-
require.NoError(c, err)
299-
require.Containsf(c, string(reg1Data), string(ingest1Json), "receiver 1 registry should contain '%s', but was: %s", string(ingest1Json), string(reg1Data))
300-
require.NotContainsf(c, string(reg1Data), string(ingest2Json), "receiver 1 registry should not contain '%s', but was: %s", string(ingest2Json), string(reg1Data))
301-
302-
reg2Path := filepath.Join(dir2, "/data/registry/filebeat/log.json")
303-
require.FileExists(c, reg2Path, "receiver 2 filebeat registry should exist")
304-
reg2Data, err := os.ReadFile(reg2Path)
305-
require.NoError(c, err)
306-
require.Containsf(c, string(reg2Data), string(ingest2Json), "receiver 2 registry should contain '%s', but was: %s", string(ingest2Json), string(reg2Data))
307-
require.NotContainsf(c, string(reg2Data), string(ingest1Json), "receiver 2 registry should not contain '%s', but was: %s", string(ingest1Json), string(reg2Data))
278+
for i := range nReceivers {
279+
for j := range nReceivers {
280+
if i == j {
281+
continue
282+
}
283+
h1 := helpers[i]
284+
h2 := helpers[j]
285+
assert.NotEqualf(c, allMetaData[i], allMetaData[j], "meta data files between %v and %v should be different", h1, h2)
286+
assert.NotContainsf(c, allRegData[i], allRegData[j], "receiver %v registry should not contain data from %v registry", h1, h2)
287+
}
288+
}
308289
},
309290
})
310291
}
@@ -380,14 +361,14 @@ func TestReceiverDegraded(t *testing.T) {
380361
}
381362
}
382363

383-
func genSocketPath() string {
384-
randData := make([]byte, 16)
385-
for i := range len(randData) {
386-
randData[i] = uint8(rand.UintN(255)) //nolint:gosec // 0-255 fits in a uint8
387-
}
388-
socketName := base64.URLEncoding.EncodeToString(randData) + ".sock"
389-
socketDir := os.TempDir()
390-
return filepath.Join(socketDir, socketName)
364+
func genSocketPath(t *testing.T) string {
365+
t.Helper()
366+
socketName, err := uuid.NewV4()
367+
require.NoError(t, err)
368+
// Use os.TempDir() for short Unix socket paths
369+
sockPath := filepath.Join(os.TempDir(), socketName.String()+".sock")
370+
t.Cleanup(func() { _ = os.Remove(sockPath) })
371+
return sockPath
391372
}
392373

393374
func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoint string) bool {
@@ -397,8 +378,8 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
397378
}
398379
client := http.Client{
399380
Transport: &http.Transport{
400-
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
401-
return net.Dial("unix", socketPath)
381+
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
382+
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
402383
},
403384
},
404385
}
@@ -612,3 +593,17 @@ func TestReceiverHook(t *testing.T) {
612593
// one for beat metrics, one for input metrics and one for getting the registry.
613594
oteltest.TestReceiverHook(t, &cfg, NewFactory(), receiverSettings, 3)
614595
}
596+
597+
func hostFromSocket(socket string) string {
598+
if runtime.GOOS == "windows" {
599+
return "npipe:///" + filepath.Base(socket)
600+
}
601+
return "unix://" + socket
602+
}
603+
604+
func writeFile(t require.TestingT, path string, data string) {
605+
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
606+
require.NoErrorf(t, err, "Could not open file %s", path)
607+
_, err = f.WriteString(data + "\n")
608+
require.NoErrorf(t, err, "Could not write %s to file %s", data, path)
609+
}

0 commit comments

Comments
 (0)