Skip to content

Commit b1413ea

Browse files
[8.19](backport #47969) fbreceiver: refactor receiver tests for maintainability (#47994)
* fbreceiver: refactor receiver tests for maintainability (#47969) ## Proposed commit message Changes: - Refactor TestMultipleReceivers to easily scale to n>2 receivers - Clean up *.sock files - Add multiReceiverHelper struct to encapsulate receiver test setup - Add helper functions: hostFromSocket, writeFile - Use UUID for socket path generation instead of random bytes - Use DialContext instead of Dial to fix noctx lint warning Pulled out from #47870 for easier review. (cherry picked from commit 135d04f) # Conflicts: # x-pack/filebeat/fbreceiver/receiver_test.go * Fix conflicts * fbreceiver: close file in writeFile to fix Windows test (#48019) On Windows, open file handles prevent t.TempDir() cleanup. Add defer f.Close() to properly release the file handle. Fixes test failure introduced in #47969. --------- Co-authored-by: Orestis Floros <[email protected]>
1 parent b5fbbfa commit b1413ea

File tree

2 files changed

+149
-163
lines changed

2 files changed

+149
-163
lines changed

x-pack/filebeat/fbreceiver/receiver_leak_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ package fbreceiver
66

77
import (
88
"errors"
9-
"path/filepath"
10-
"runtime"
119
"testing"
1210
"time"
1311

@@ -25,13 +23,7 @@ import (
2523
)
2624

2725
func TestLeak(t *testing.T) {
28-
monitorSocket := genSocketPath()
29-
var monitorHost string
30-
if runtime.GOOS == "windows" {
31-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
32-
} else {
33-
monitorHost = "unix://" + monitorSocket
34-
}
26+
monitorHost := hostFromSocket(genSocketPath(t))
3527
config := Config{
3628
Beatconfig: map[string]any{
3729
"filebeat": map[string]any{
@@ -81,7 +73,6 @@ func TestLeak(t *testing.T) {
8173
consumeLogs := oteltest.DummyConsumer{ConsumeError: errors.New("cannot publish data")}
8274
startAndStopReceiver(t, factory, &consumeLogs, &config)
8375
})
84-
8576
}
8677

8778
// StartAndStopReceiver creates a receiver using the provided parameters, starts it, verifies that the expected logs

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 148 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ package fbreceiver
77
import (
88
"bytes"
99
"context"
10-
"encoding/base64"
1110
"encoding/json"
1211
"fmt"
1312
"io"
14-
"math/rand/v2"
1513
"net"
1614
"net/http"
1715
"net/url"
@@ -21,9 +19,7 @@ import (
2119
"strings"
2220
"testing"
2321

24-
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
25-
"github.com/elastic/elastic-agent-libs/mapstr"
26-
22+
"github.com/gofrs/uuid/v5"
2723
"github.com/stretchr/testify/assert"
2824
"github.com/stretchr/testify/require"
2925
"go.opentelemetry.io/collector/component"
@@ -32,16 +28,14 @@ import (
3228
"go.uber.org/zap"
3329
"go.uber.org/zap/zapcore"
3430
"go.uber.org/zap/zaptest/observer"
31+
32+
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
33+
"github.com/elastic/elastic-agent-libs/mapstr"
3534
)
3635

3736
func TestNewReceiver(t *testing.T) {
38-
monitorSocket := genSocketPath()
39-
var monitorHost string
40-
if runtime.GOOS == "windows" {
41-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
42-
} else {
43-
monitorHost = "unix://" + monitorSocket
44-
}
37+
monitorSocket := genSocketPath(t)
38+
monitorHost := hostFromSocket(monitorSocket)
4539
config := Config{
4640
Beatconfig: map[string]any{
4741
"filebeat": map[string]any{
@@ -161,149 +155,135 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
161155
}
162156
}
163157

164-
func TestMultipleReceivers(t *testing.T) {
165-
// This test verifies that multiple receivers can be instantiated
166-
// in isolation, started, and can ingest logs without interfering
167-
// with each other.
168-
169-
// Receivers need distinct home directories so wrap the config in a function.
170-
config := func(monitorSocket string, homePath string, ingestPath string) *Config {
171-
var monitorHost string
172-
if runtime.GOOS == "windows" {
173-
monitorHost = "npipe:///" + filepath.Base(monitorSocket)
174-
} else {
175-
monitorHost = "unix://" + monitorSocket
176-
}
177-
return &Config{
178-
Beatconfig: map[string]any{
179-
"filebeat": map[string]any{
180-
"inputs": []map[string]any{
181-
{
182-
"type": "benchmark",
183-
"enabled": true,
184-
"message": "test",
185-
"count": 1,
186-
},
187-
{
188-
"type": "filestream",
189-
"enabled": true,
190-
"id": "must-be-unique",
191-
"paths": []string{ingestPath},
192-
"file_identity.native": nil,
193-
},
158+
// multiReceiverConfig creates a Config for testing multiple receivers.
159+
// Each receiver gets a unique home path.
160+
func multiReceiverConfig(helper multiReceiverHelper) *Config {
161+
return &Config{
162+
Beatconfig: map[string]any{
163+
"filebeat": map[string]any{
164+
"inputs": []map[string]any{
165+
{
166+
"type": "benchmark",
167+
"enabled": true,
168+
"message": "test",
169+
"count": 1,
194170
},
195-
},
196-
"logging": map[string]any{
197-
"level": "info",
198-
"selectors": []string{
199-
"*",
171+
{
172+
"type": "filestream",
173+
"enabled": true,
174+
"id": "must-be-unique",
175+
"paths": []string{helper.ingest},
176+
"file_identity.native": nil,
200177
},
201178
},
202-
"path.home": homePath,
203-
"http.enabled": true,
204-
"http.host": monitorHost,
205179
},
206-
}
180+
"logging": map[string]any{
181+
"level": "info",
182+
"selectors": []string{
183+
"*",
184+
},
185+
},
186+
"path.home": helper.home,
187+
"http.enabled": true,
188+
"http.host": hostFromSocket(helper.monitorSocket),
189+
},
207190
}
191+
}
192+
193+
type multiReceiverHelper struct {
194+
name string
195+
home string
196+
ingest string
197+
monitorSocket string
198+
}
199+
200+
func newMultiReceiverHelper(t *testing.T, number int) multiReceiverHelper {
201+
return multiReceiverHelper{
202+
name: fmt.Sprintf("r%d", number),
203+
home: t.TempDir(),
204+
ingest: filepath.Join(t.TempDir(), fmt.Sprintf("test%d.log", number)),
205+
monitorSocket: genSocketPath(t),
206+
}
207+
}
208+
209+
// TestMultipleReceivers verifies that multiple receivers can be instantiated in isolation, started, and can ingest logs
210+
// without interfering with each other.
211+
func TestMultipleReceivers(t *testing.T) {
212+
const nReceivers = 2
208213

209214
factory := NewFactory()
210-
monitorSocket1 := genSocketPath()
211-
monitorSocket2 := genSocketPath()
212-
dir1 := t.TempDir()
213-
dir2 := t.TempDir()
214-
ingest1 := filepath.Join(t.TempDir(), "test1.log")
215-
ingest2 := filepath.Join(t.TempDir(), "test2.log")
215+
216+
helpers := make([]multiReceiverHelper, nReceivers)
217+
configs := make([]oteltest.ReceiverConfig, nReceivers)
218+
for i := range helpers {
219+
helper := newMultiReceiverHelper(t, i)
220+
helpers[i] = helper
221+
configs[i] = oteltest.ReceiverConfig{
222+
Name: helper.name,
223+
Beat: "filebeat",
224+
Config: multiReceiverConfig(helper),
225+
Factory: factory,
226+
}
227+
}
228+
216229
oteltest.CheckReceivers(oteltest.CheckReceiversParams{
217230
T: t,
218231
NumRestarts: 5,
219-
Receivers: []oteltest.ReceiverConfig{
220-
{
221-
Name: "r1",
222-
Beat: "filebeat",
223-
Config: config(monitorSocket1, dir1, ingest1),
224-
Factory: factory,
225-
},
226-
{
227-
Name: "r2",
228-
Beat: "filebeat",
229-
Config: config(monitorSocket2, dir2, ingest2),
230-
Factory: factory,
231-
},
232-
},
232+
Receivers: configs,
233233
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
234-
// Add data to be ingested with filestream
235-
f1, err := os.OpenFile(ingest1, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
236-
require.NoError(c, err)
237-
_, err = f1.WriteString("A log line\n")
238-
require.NoError(c, err)
239-
f1.Close()
240-
f2, err := os.OpenFile(ingest2, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
241-
require.NoError(c, err)
242-
_, err = f2.WriteString("A log line\n")
243-
require.NoError(c, err)
244-
f2.Close()
245-
246-
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
247-
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")
248-
249-
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
250-
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
251-
assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
252-
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record")
253-
254-
// Make sure that each receiver has a separate logger
255-
// instance and does not interfere with others. Previously, the
256-
// logger in Beats was global, causing logger fields to be
257-
// overwritten when multiple receivers started in the same process.
258-
r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r1"))
259-
assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log")
260-
r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r2"))
261-
assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log")
262-
263-
meta1Path := filepath.Join(dir1, "/data/meta.json")
264-
assert.FileExists(c, meta1Path, "dir1/data/meta.json should exist")
265-
meta1Data, err := os.ReadFile(meta1Path)
266-
assert.NoError(c, err)
267-
268-
meta2Path := filepath.Join(dir2, "/data/meta.json")
269-
assert.FileExists(c, meta2Path, "dir2/data/meta.json should exist")
270-
meta2Data, err := os.ReadFile(meta2Path)
271-
assert.NoError(c, err)
272-
273-
assert.NotEqual(c, meta1Data, meta2Data, "meta data files should be different")
234+
allMetaData := make([]string, 0, nReceivers)
235+
allRegData := make([]string, 0, nReceivers)
236+
for _, helper := range helpers {
237+
writeFile(c, helper.ingest, "A log line")
238+
239+
require.Greaterf(c, len(logs[helper.name]), 0, "receiver %v does not have any logs", helper)
240+
241+
assert.Equalf(c, "filebeatreceiver/"+helper.name, logs[helper.name][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in %v log record", helper)
242+
assert.Equalf(c, "receiver", logs[helper.name][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in %v log record", helper)
243+
244+
// Make sure that each receiver has a separate logger
245+
// instance and does not interfere with others. Previously, the
246+
// logger in Beats was global, causing logger fields to be
247+
// overwritten when multiple receivers started in the same process.
248+
startLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/"+helper.name))
249+
assert.Equalf(c, 1, startLogs.Len(), "%v should have a single start log", helper)
250+
251+
metaPath := filepath.Join(helper.home, "/data/meta.json")
252+
assert.FileExistsf(c, metaPath, "%s of %v should exist", metaPath, helper)
253+
metaData, err := os.ReadFile(metaPath)
254+
assert.NoError(c, err)
255+
allMetaData = append(allMetaData, string(metaData))
256+
257+
var lastError strings.Builder
258+
assert.Conditionf(c, func() bool {
259+
return getFromSocket(t, &lastError, helper.monitorSocket, "stats")
260+
}, "failed to connect to monitoring socket of %v, stats endpoint, last error was: %s", helper, &lastError)
261+
assert.Conditionf(c, func() bool {
262+
return getFromSocket(t, &lastError, helper.monitorSocket, "inputs")
263+
}, "failed to connect to monitoring socket of %v, inputs endpoint, last error was: %s", helper, &lastError)
264+
265+
ingestJson, err := json.Marshal(helper.ingest)
266+
assert.NoError(c, err)
267+
268+
regPath := filepath.Join(helper.home, "/data/registry/filebeat/log.json")
269+
assert.FileExistsf(c, regPath, "receiver %v filebeat registry should exist", helper)
270+
regData, err := os.ReadFile(regPath)
271+
allRegData = append(allRegData, string(regData))
272+
assert.NoError(c, err)
273+
assert.Containsf(c, string(regData), string(ingestJson), "receiver %v registry should contain '%s', but was: %s", helper, string(ingestJson), string(regData))
274+
}
274275

275-
var lastError strings.Builder
276-
assert.Conditionf(c, func() bool {
277-
return getFromSocket(t, &lastError, monitorSocket1, "stats")
278-
}, "failed to connect to monitoring socket1, stats endpoint, last error was: %s", &lastError)
279-
assert.Conditionf(c, func() bool {
280-
return getFromSocket(t, &lastError, monitorSocket1, "inputs")
281-
}, "failed to connect to monitoring socket1, inputs endpoint, last error was: %s", &lastError)
282-
assert.Conditionf(c, func() bool {
283-
return getFromSocket(t, &lastError, monitorSocket2, "stats")
284-
}, "failed to connect to monitoring socket2, stats endpoint, last error was: %s", &lastError)
285-
assert.Conditionf(c, func() bool {
286-
return getFromSocket(t, &lastError, monitorSocket2, "inputs")
287-
}, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s", &lastError)
288-
289-
ingest1Json, err := json.Marshal(ingest1)
290-
require.NoError(c, err)
291-
ingest2Json, err := json.Marshal(ingest2)
292-
require.NoError(c, err)
293-
294-
reg1Path := filepath.Join(dir1, "/data/registry/filebeat/log.json")
295-
require.FileExists(c, reg1Path, "receiver 1 filebeat registry should exist")
296-
reg1Data, err := os.ReadFile(reg1Path)
297-
require.NoError(c, err)
298-
require.Containsf(c, string(reg1Data), string(ingest1Json), "receiver 1 registry should contain '%s', but was: %s", string(ingest1Json), string(reg1Data))
299-
require.NotContainsf(c, string(reg1Data), string(ingest2Json), "receiver 1 registry should not contain '%s', but was: %s", string(ingest2Json), string(reg1Data))
300-
301-
reg2Path := filepath.Join(dir2, "/data/registry/filebeat/log.json")
302-
require.FileExists(c, reg2Path, "receiver 2 filebeat registry should exist")
303-
reg2Data, err := os.ReadFile(reg2Path)
304-
require.NoError(c, err)
305-
require.Containsf(c, string(reg2Data), string(ingest2Json), "receiver 2 registry should contain '%s', but was: %s", string(ingest2Json), string(reg2Data))
306-
require.NotContainsf(c, string(reg2Data), string(ingest1Json), "receiver 2 registry should not contain '%s', but was: %s", string(ingest1Json), string(reg2Data))
276+
for i := range nReceivers {
277+
for j := range nReceivers {
278+
if i == j {
279+
continue
280+
}
281+
h1 := helpers[i]
282+
h2 := helpers[j]
283+
assert.NotEqualf(c, allMetaData[i], allMetaData[j], "meta data files between %v and %v should be different", h1, h2)
284+
assert.NotContainsf(c, allRegData[i], allRegData[j], "receiver %v registry should not contain data from %v registry", h1, h2)
285+
}
286+
}
307287
},
308288
})
309289
}
@@ -379,14 +359,14 @@ func TestReceiverDegraded(t *testing.T) {
379359
}
380360
}
381361

382-
func genSocketPath() string {
383-
randData := make([]byte, 16)
384-
for i := range len(randData) {
385-
randData[i] = uint8(rand.UintN(255)) //nolint:gosec // 0-255 fits in a uint8
386-
}
387-
socketName := base64.URLEncoding.EncodeToString(randData) + ".sock"
388-
socketDir := os.TempDir()
389-
return filepath.Join(socketDir, socketName)
362+
func genSocketPath(t *testing.T) string {
363+
t.Helper()
364+
socketName, err := uuid.NewV4()
365+
require.NoError(t, err)
366+
// Use os.TempDir() for short Unix socket paths
367+
sockPath := filepath.Join(os.TempDir(), socketName.String()+".sock")
368+
t.Cleanup(func() { _ = os.Remove(sockPath) })
369+
return sockPath
390370
}
391371

392372
func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoint string) bool {
@@ -396,8 +376,8 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
396376
}
397377
client := http.Client{
398378
Transport: &http.Transport{
399-
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
400-
return net.Dial("unix", socketPath)
379+
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
380+
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
401381
},
402382
},
403383
}
@@ -493,3 +473,18 @@ func TestReceiverHook(t *testing.T) {
493473
// one for beat metrics, one for input metrics and one for getting the registry.
494474
oteltest.TestReceiverHook(t, &cfg, NewFactory(), receiverSettings, 3)
495475
}
476+
477+
func hostFromSocket(socket string) string {
478+
if runtime.GOOS == "windows" {
479+
return "npipe:///" + filepath.Base(socket)
480+
}
481+
return "unix://" + socket
482+
}
483+
484+
func writeFile(t require.TestingT, path string, data string) {
485+
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
486+
require.NoErrorf(t, err, "Could not open file %s", path)
487+
defer f.Close()
488+
_, err = f.WriteString(data + "\n")
489+
require.NoErrorf(t, err, "Could not write %s to file %s", data, path)
490+
}

0 commit comments

Comments
 (0)