Skip to content

Commit b84e37a

Browse files
orestisflmergify[bot]
authored andcommitted
fbreceiver: refactor receiver tests for maintainability (#47969)
## Proposed commit message Changes: - Refactor TestMultipleReceivers to easily scale to n>2 receivers - Clean up *.sock files - Add multiReceiverHelper struct to encapsulate receiver test setup - Add helper functions: hostFromSocket, writeFile - Use UUID for socket path generation instead of random bytes - Use DialContext instead of Dial to fix noctx lint warning Pulled out from #47870 for easier review. (cherry picked from commit 135d04f) # Conflicts: # x-pack/filebeat/fbreceiver/receiver_test.go
1 parent c32d542 commit b84e37a

File tree

2 files changed

+151
-153
lines changed

2 files changed

+151
-153
lines changed

x-pack/filebeat/fbreceiver/receiver_leak_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ package fbreceiver
66

77
import (
88
"errors"
9-
"path/filepath"
10-
"runtime"
119
"testing"
1210
"time"
1311

@@ -25,13 +23,7 @@ import (
2523
)
2624

2725
func TestLeak(t *testing.T) {
28-
monitorSocket := genSocketPath()
29-
var monitorHost string
30-
if runtime.GOOS == "windows" {
31-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
32-
} else {
33-
monitorHost = "unix://" + monitorSocket
34-
}
26+
monitorHost := hostFromSocket(genSocketPath(t))
3527
config := Config{
3628
Beatconfig: map[string]any{
3729
"filebeat": map[string]any{
@@ -84,7 +76,6 @@ func TestLeak(t *testing.T) {
8476
consumeLogs := oteltest.DummyConsumer{ConsumeError: errors.New("cannot publish data")}
8577
startAndStopReceiver(t, factory, &consumeLogs, &config)
8678
})
87-
8879
}
8980

9081
// StartAndStopReceiver creates a receiver using the provided parameters, starts it, verifies that the expected logs

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 150 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ package fbreceiver
77
import (
88
"bytes"
99
"context"
10-
"encoding/base64"
1110
"encoding/json"
1211
"fmt"
1312
"io"
14-
"math/rand/v2"
1513
"net"
1614
"net/http"
1715
"net/url"
@@ -23,9 +21,7 @@ import (
2321
"sync/atomic"
2422
"testing"
2523

26-
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
27-
"github.com/elastic/elastic-agent-libs/mapstr"
28-
24+
"github.com/gofrs/uuid/v5"
2925
"github.com/stretchr/testify/assert"
3026
"github.com/stretchr/testify/require"
3127
"go.opentelemetry.io/collector/component"
@@ -36,16 +32,14 @@ import (
3632
"go.uber.org/zap"
3733
"go.uber.org/zap/zapcore"
3834
"go.uber.org/zap/zaptest/observer"
35+
36+
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
37+
"github.com/elastic/elastic-agent-libs/mapstr"
3938
)
4039

4140
func TestNewReceiver(t *testing.T) {
42-
monitorSocket := genSocketPath()
43-
var monitorHost string
44-
if runtime.GOOS == "windows" {
45-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
46-
} else {
47-
monitorHost = "unix://" + monitorSocket
48-
}
41+
monitorSocket := genSocketPath(t)
42+
monitorHost := hostFromSocket(monitorSocket)
4943
config := Config{
5044
Beatconfig: map[string]any{
5145
"filebeat": map[string]any{
@@ -170,38 +164,28 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
170164
}
171165
}
172166

173-
func TestMultipleReceivers(t *testing.T) {
174-
// This test verifies that multiple receivers can be instantiated
175-
// in isolation, started, and can ingest logs without interfering
176-
// with each other.
177-
178-
// Receivers need distinct home directories so wrap the config in a function.
179-
config := func(monitorSocket string, homePath string, ingestPath string) *Config {
180-
var monitorHost string
181-
if runtime.GOOS == "windows" {
182-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
183-
} else {
184-
monitorHost = "unix://" + monitorSocket
185-
}
186-
return &Config{
187-
Beatconfig: map[string]any{
188-
"filebeat": map[string]any{
189-
"inputs": []map[string]any{
190-
{
191-
"type": "benchmark",
192-
"enabled": true,
193-
"message": "test",
194-
"count": 1,
195-
},
196-
{
197-
"type": "filestream",
198-
"enabled": true,
199-
"id": "must-be-unique",
200-
"paths": []string{ingestPath},
201-
"file_identity.native": nil,
202-
},
167+
// multiReceiverConfig creates a Config for testing multiple receivers.
168+
// Each receiver gets a unique home path.
169+
func multiReceiverConfig(helper multiReceiverHelper) *Config {
170+
return &Config{
171+
Beatconfig: map[string]any{
172+
"filebeat": map[string]any{
173+
"inputs": []map[string]any{
174+
{
175+
"type": "benchmark",
176+
"enabled": true,
177+
"message": "test",
178+
"count": 1,
179+
},
180+
{
181+
"type": "filestream",
182+
"enabled": true,
183+
"id": "must-be-unique",
184+
"paths": []string{helper.ingest},
185+
"file_identity.native": nil,
203186
},
204187
},
188+
<<<<<<< HEAD
205189
"output": map[string]any{
206190
"otelconsumer": map[string]any{},
207191
},
@@ -214,106 +198,115 @@ func TestMultipleReceivers(t *testing.T) {
214198
"path.home": homePath,
215199
"http.enabled": true,
216200
"http.host": monitorHost,
201+
=======
202+
>>>>>>> 135d04f52 (fbreceiver: refactor receiver tests for maintainability (#47969))
217203
},
218-
}
204+
"logging": map[string]any{
205+
"level": "info",
206+
"selectors": []string{
207+
"*",
208+
},
209+
},
210+
"path.home": helper.home,
211+
"http.enabled": true,
212+
"http.host": hostFromSocket(helper.monitorSocket),
213+
},
219214
}
215+
}
216+
217+
type multiReceiverHelper struct {
218+
name string
219+
home string
220+
ingest string
221+
monitorSocket string
222+
}
223+
224+
func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
225+
return multiReceiverHelper{
226+
name: fmt.Sprintf("r%d", number),
227+
home: t.TempDir(),
228+
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
229+
monitorSocket: genSocketPath(t),
230+
}
231+
}
232+
233+
// TestMultipleReceivers verifies that multiple receivers can be instantiated in isolation, started, and can ingest logs
234+
// without interfering with each other.
235+
func TestMultipleReceivers(t *testing.T) {
236+
const nReceivers = 2
220237

221238
factory := NewFactory()
222-
monitorSocket1 := genSocketPath()
223-
monitorSocket2 := genSocketPath()
224-
dir1 := t.TempDir()
225-
dir2 := t.TempDir()
226-
ingest1 := filepath.Join(t.TempDir(), "test1.log")
227-
ingest2 := filepath.Join(t.TempDir(), "test2.log")
239+
240+
helpers := make([]multiReceiverHelper, nReceivers)
241+
configs := make([]oteltest.ReceiverConfig, nReceivers)
242+
for i := range helpers {
243+
helper := newMultiReceiverHelper(t, i)
244+
helpers[i] = helper
245+
configs[i] = oteltest.ReceiverConfig{
246+
Name: helper.name,
247+
Beat: "filebeat",
248+
Config: multiReceiverConfig(helper),
249+
Factory: factory,
250+
}
251+
}
252+
228253
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
229254
T: t,
230255
NumRestarts: 5,
231-
Receivers: []oteltest.ReceiverConfig{
232-
{
233-
Name: "r1",
234-
Beat: "filebeat",
235-
Config: config(monitorSocket1, dir1, ingest1),
236-
Factory: factory,
237-
},
238-
{
239-
Name: "r2",
240-
Beat: "filebeat",
241-
Config: config(monitorSocket2, dir2, ingest2),
242-
Factory: factory,
243-
},
244-
},
256+
Receivers: configs,
245257
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
246-
// Add data to be ingested with filestream
247-
f1, err := os.OpenFile(ingest1, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
248-
require.NoError(c, err)
249-
_, err = f1.WriteString("A log line\n")
250-
require.NoError(c, err)
251-
f1.Close()
252-
f2, err := os.OpenFile(ingest2, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
253-
require.NoError(c, err)
254-
_, err = f2.WriteString("A log line\n")
255-
require.NoError(c, err)
256-
f2.Close()
257-
258-
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
259-
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")
260-
261-
assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected r1 message field to be 'test'")
262-
assert.Equal(c, "test", logs["r2"][0].Flatten()["message"], "expected r2 message field to be 'test'")
263-
264-
// Make sure that each receiver has a separate logger
265-
// instance and does not interfere with others. Previously, the
266-
// logger in Beats was global, causing logger fields to be
267-
// overwritten when multiple receivers started in the same process.
268-
r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r1"))
269-
assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log")
270-
r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r2"))
271-
assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log")
272-
273-
meta1Path := filepath.Join(dir1, "/data/meta.json")
274-
assert.FileExists(c, meta1Path, "dir1/data/meta.json should exist")
275-
meta1Data, err := os.ReadFile(meta1Path)
276-
assert.NoError(c, err)
277-
278-
meta2Path := filepath.Join(dir2, "/data/meta.json")
279-
assert.FileExists(c, meta2Path, "dir2/data/meta.json should exist")
280-
meta2Data, err := os.ReadFile(meta2Path)
281-
assert.NoError(c, err)
282-
283-
assert.NotEqual(c, meta1Data, meta2Data, "meta data files should be different")
258+
allMetaData := make([]string, 0, nReceivers)
259+
allRegData := make([]string, 0, nReceivers)
260+
for _, helper := range helpers {
261+
writeFile(c, helper.ingest, "A log line")
262+
263+
require.Greaterf(c, len(logs[helper.name]), 0, "receiver %v does not have any logs", helper)
264+
265+
assert.Equalf(c, "test", logs[helper.name][0].Flatten()["message"], "expected %v message field to be 'test'", helper)
266+
267+
// Make sure that each receiver has a separate logger
268+
// instance and does not interfere with others. Previously, the
269+
// logger in Beats was global, causing logger fields to be
270+
// overwritten when multiple receivers started in the same process.
271+
startLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/"+helper.name))
272+
assert.Equalf(c, 1, startLogs.Len(), "%v should have a single start log", helper)
273+
274+
metaPath := filepath.Join(helper.home, "/data/meta.json")
275+
assert.FileExistsf(c, metaPath, "%s of %v should exist", metaPath, helper)
276+
metaData, err := os.ReadFile(metaPath)
277+
assert.NoError(c, err)
278+
allMetaData = append(allMetaData, string(metaData))
279+
280+
var lastError strings.Builder
281+
assert.Conditionf(c, func() bool {
282+
return getFromSocket(t, &lastError, helper.monitorSocket, "stats")
283+
}, "failed to connect to monitoring socket of %v, stats endpoint, last error was: %s", helper, &lastError)
284+
assert.Conditionf(c, func() bool {
285+
return getFromSocket(t, &lastError, helper.monitorSocket, "inputs")
286+
}, "failed to connect to monitoring socket of %v, inputs endpoint, last error was: %s", helper, &lastError)
287+
288+
ingestJson, err := json.Marshal(helper.ingest)
289+
assert.NoError(c, err)
290+
291+
regPath := filepath.Join(helper.home, "/data/registry/filebeat/log.json")
292+
assert.FileExistsf(c, regPath, "receiver %v filebeat registry should exist", helper)
293+
regData, err := os.ReadFile(regPath)
294+
allRegData = append(allRegData, string(regData))
295+
assert.NoError(c, err)
296+
assert.Containsf(c, string(regData), string(ingestJson), "receiver %v registry should contain '%s', but was: %s", helper, string(ingestJson), string(regData))
297+
}
284298

285-
var lastError strings.Builder
286-
assert.Conditionf(c, func() bool {
287-
return getFromSocket(t, &lastError, monitorSocket1, "stats")
288-
}, "failed to connect to monitoring socket1, stats endpoint, last error was: %s", &lastError)
289-
assert.Conditionf(c, func() bool {
290-
return getFromSocket(t, &lastError, monitorSocket1, "inputs")
291-
}, "failed to connect to monitoring socket1, inputs endpoint, last error was: %s", &lastError)
292-
assert.Conditionf(c, func() bool {
293-
return getFromSocket(t, &lastError, monitorSocket2, "stats")
294-
}, "failed to connect to monitoring socket2, stats endpoint, last error was: %s", &lastError)
295-
assert.Conditionf(c, func() bool {
296-
return getFromSocket(t, &lastError, monitorSocket2, "inputs")
297-
}, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s", &lastError)
298-
299-
ingest1Json, err := json.Marshal(ingest1)
300-
require.NoError(c, err)
301-
ingest2Json, err := json.Marshal(ingest2)
302-
require.NoError(c, err)
303-
304-
reg1Path := filepath.Join(dir1, "/data/registry/filebeat/log.json")
305-
require.FileExists(c, reg1Path, "receiver 1 filebeat registry should exist")
306-
reg1Data, err := os.ReadFile(reg1Path)
307-
require.NoError(c, err)
308-
require.Containsf(c, string(reg1Data), string(ingest1Json), "receiver 1 registry should contain '%s', but was: %s", string(ingest1Json), string(reg1Data))
309-
require.NotContainsf(c, string(reg1Data), string(ingest2Json), "receiver 1 registry should not contain '%s', but was: %s", string(ingest2Json), string(reg1Data))
310-
311-
reg2Path := filepath.Join(dir2, "/data/registry/filebeat/log.json")
312-
require.FileExists(c, reg2Path, "receiver 2 filebeat registry should exist")
313-
reg2Data, err := os.ReadFile(reg2Path)
314-
require.NoError(c, err)
315-
require.Containsf(c, string(reg2Data), string(ingest2Json), "receiver 2 registry should contain '%s', but was: %s", string(ingest2Json), string(reg2Data))
316-
require.NotContainsf(c, string(reg2Data), string(ingest1Json), "receiver 2 registry should not contain '%s', but was: %s", string(ingest1Json), string(reg2Data))
299+
for i := range nReceivers {
300+
for j := range nReceivers {
301+
if i == j {
302+
continue
303+
}
304+
h1 := helpers[i]
305+
h2 := helpers[j]
306+
assert.NotEqualf(c, allMetaData[i], allMetaData[j], "meta data files between %v and %v should be different", h1, h2)
307+
assert.NotContainsf(c, allRegData[i], allRegData[j], "receiver %v registry should not contain data from %v registry", h1, h2)
308+
}
309+
}
317310
},
318311
})
319312
}
@@ -392,14 +385,14 @@ func TestReceiverDegraded(t *testing.T) {
392385
}
393386
}
394387

395-
func genSocketPath() string {
396-
randData := make([]byte, 16)
397-
for i := range len(randData) {
398-
randData[i] = uint8(rand.UintN(255)) //nolint:gosec // 0-255 fits in a uint8
399-
}
400-
socketName := base64.URLEncoding.EncodeToString(randData) + ".sock"
401-
socketDir := os.TempDir()
402-
return filepath.Join(socketDir, socketName)
388+
func genSocketPath(t *testing.T) string {
389+
t.Helper()
390+
socketName, err := uuid.NewV4()
391+
require.NoError(t, err)
392+
// Use os.TempDir() for short Unix socket paths
393+
sockPath := filepath.Join(os.TempDir(), socketName.String()+".sock")
394+
t.Cleanup(func() { _ = os.Remove(sockPath) })
395+
return sockPath
403396
}
404397

405398
func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoint string) bool {
@@ -409,8 +402,8 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
409402
}
410403
client := http.Client{
411404
Transport: &http.Transport{
412-
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
413-
return net.Dial("unix", socketPath)
405+
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
406+
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
414407
},
415408
},
416409
}
@@ -630,3 +623,17 @@ func TestReceiverHook(t *testing.T) {
630623
// one for beat metrics, one for input metrics and one for getting the registry.
631624
oteltest.TestReceiverHook(t, &cfg, NewFactory(), receiverSettings, 3)
632625
}
626+
627+
func hostFromSocket(socket string) string {
628+
if runtime.GOOS == "windows" {
629+
return "npipe:///" + filepath.Base(socket)
630+
}
631+
return "unix://" + socket
632+
}
633+
634+
func writeFile(t require.TestingT, path string, data string) {
635+
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
636+
require.NoErrorf(t, err, "Could not open file %s", path)
637+
_, err = f.WriteString(data + "\n")
638+
require.NoErrorf(t, err, "Could not write %s to file %s", data, path)
639+
}

0 commit comments

Comments
 (0)