@@ -7,11 +7,9 @@ package fbreceiver
77import (
88 "bytes"
99 "context"
10- "encoding/base64"
1110 "encoding/json"
1211 "fmt"
1312 "io"
14- "math/rand/v2"
1513 "net"
1614 "net/http"
1715 "net/url"
@@ -21,9 +19,7 @@ import (
2119 "strings"
2220 "testing"
2321
24- "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
25- "github.com/elastic/elastic-agent-libs/mapstr"
26-
22+ "github.com/gofrs/uuid/v5"
2723 "github.com/stretchr/testify/assert"
2824 "github.com/stretchr/testify/require"
2925 "go.opentelemetry.io/collector/component"
@@ -32,16 +28,14 @@ import (
3228 "go.uber.org/zap"
3329 "go.uber.org/zap/zapcore"
3430 "go.uber.org/zap/zaptest/observer"
31+
32+ "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
33+ "github.com/elastic/elastic-agent-libs/mapstr"
3534)
3635
3736func TestNewReceiver (t * testing.T ) {
38- monitorSocket := genSocketPath ()
39- var monitorHost string
40- if runtime .GOOS == "windows" {
41- monitorHost = "npipe:///" + filepath .Base (monitorSocket )
42- } else {
43- monitorHost = "unix://" + monitorSocket
44- }
37+ monitorSocket := genSocketPath (t )
38+ monitorHost := hostFromSocket (monitorSocket )
4539 config := Config {
4640 Beatconfig : map [string ]any {
4741 "filebeat" : map [string ]any {
@@ -161,149 +155,141 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
161155 }
162156}
163157
164- func TestMultipleReceivers (t * testing.T ) {
165- // This test verifies that multiple receivers can be instantiated
166- // in isolation, started, and can ingest logs without interfering
167- // with each other.
168-
169- // Receivers need distinct home directories so wrap the config in a function.
170- config := func (monitorSocket string , homePath string , ingestPath string ) * Config {
171- var monitorHost string
172- if runtime .GOOS == "windows" {
173- monitorHost = "npipe:///" + filepath .Base (monitorSocket )
174- } else {
175- monitorHost = "unix://" + monitorSocket
176- }
177- return & Config {
178- Beatconfig : map [string ]any {
179- "filebeat" : map [string ]any {
180- "inputs" : []map [string ]any {
181- {
182- "type" : "benchmark" ,
183- "enabled" : true ,
184- "message" : "test" ,
185- "count" : 1 ,
186- },
187- {
188- "type" : "filestream" ,
189- "enabled" : true ,
190- "id" : "must-be-unique" ,
191- "paths" : []string {ingestPath },
192- "file_identity.native" : nil ,
193- },
158+ // multiReceiverConfig creates a Config for testing multiple receivers.
159+ // Each receiver gets a unique home path.
160+ func multiReceiverConfig (helper multiReceiverHelper ) * Config {
161+ return & Config {
162+ Beatconfig : map [string ]any {
163+ "filebeat" : map [string ]any {
164+ "inputs" : []map [string ]any {
165+ {
166+ "type" : "benchmark" ,
167+ "enabled" : true ,
168+ "message" : "test" ,
169+ "count" : 1 ,
194170 },
195- },
196- "logging" : map [string ]any {
197- "level" : "info" ,
198- "selectors" : []string {
199- "*" ,
171+ {
172+ "type" : "filestream" ,
173+ "enabled" : true ,
174+ "id" : "must-be-unique" ,
175+ "paths" : []string {helper .ingest },
176+ "file_identity.native" : nil ,
200177 },
201178 },
202- "path.home" : homePath ,
203- "http.enabled" : true ,
204- "http.host" : monitorHost ,
205179 },
206- }
180+ "logging" : map [string ]any {
181+ "level" : "info" ,
182+ "selectors" : []string {
183+ "*" ,
184+ },
185+ },
186+ "path.home" : helper .home ,
187+ "http.enabled" : true ,
188+ "http.host" : hostFromSocket (helper .monitorSocket ),
189+ },
190+ }
191+ }
192+
193+ type multiReceiverHelper struct {
194+ name string
195+ home string
196+ ingest string
197+ monitorSocket string
198+ }
199+
200+ func newMultiReceiverHelper (t * testing.T , number int ) multiReceiverHelper {
201+ return multiReceiverHelper {
202+ name : fmt .Sprintf ("r%d" , number ),
203+ home : t .TempDir (),
204+ ingest : filepath .Join (t .TempDir (), fmt .Sprintf ("test%d.log" , number )),
205+ monitorSocket : genSocketPath (t ),
207206 }
207+ }
208+
209+ // TestMultipleReceivers verifies that multiple receivers can be instantiated in isolation, started, and can ingest logs
210+ // without interfering with each other.
211+ func TestMultipleReceivers (t * testing.T ) {
212+ const nReceivers = 2
208213
209214 factory := NewFactory ()
210- monitorSocket1 := genSocketPath ()
211- monitorSocket2 := genSocketPath ()
212- dir1 := t .TempDir ()
213- dir2 := t .TempDir ()
214- ingest1 := filepath .Join (t .TempDir (), "test1.log" )
215- ingest2 := filepath .Join (t .TempDir (), "test2.log" )
215+
216+ helpers := make ([]multiReceiverHelper , nReceivers )
217+ configs := make ([]oteltest.ReceiverConfig , nReceivers )
218+ for i := range helpers {
219+ helper := newMultiReceiverHelper (t , i )
220+ helpers [i ] = helper
221+ configs [i ] = oteltest.ReceiverConfig {
222+ Name : helper .name ,
223+ Beat : "filebeat" ,
224+ Config : multiReceiverConfig (helper ),
225+ Factory : factory ,
226+ }
227+ }
228+
216229 oteltest .CheckReceivers (oteltest.CheckReceiversParams {
217230 T : t ,
218231 NumRestarts : 5 ,
219- Receivers : []oteltest.ReceiverConfig {
220- {
221- Name : "r1" ,
222- Beat : "filebeat" ,
223- Config : config (monitorSocket1 , dir1 , ingest1 ),
224- Factory : factory ,
225- },
226- {
227- Name : "r2" ,
228- Beat : "filebeat" ,
229- Config : config (monitorSocket2 , dir2 , ingest2 ),
230- Factory : factory ,
231- },
232- },
232+ Receivers : configs ,
233233 AssertFunc : func (c * assert.CollectT , logs map [string ][]mapstr.M , zapLogs * observer.ObservedLogs ) {
234- // Add data to be ingested with filestream
235- f1 , err := os .OpenFile (ingest1 , os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0o644 )
236- require .NoError (c , err )
237- _ , err = f1 .WriteString ("A log line\n " )
238- require .NoError (c , err )
239- f1 .Close ()
240- f2 , err := os .OpenFile (ingest2 , os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0o644 )
241- require .NoError (c , err )
242- _ , err = f2 .WriteString ("A log line\n " )
243- require .NoError (c , err )
244- f2 .Close ()
245-
246- require .Greater (c , len (logs ["r1" ]), 0 , "receiver r1 does not have any logs" )
247- require .Greater (c , len (logs ["r2" ]), 0 , "receiver r2 does not have any logs" )
234+ allMetaData := make ([]string , 0 , nReceivers )
235+ allRegData := make ([]string , 0 , nReceivers )
236+ for _ , helper := range helpers {
237+ writeFile (c , helper .ingest , "A log line" )
248238
239+ require .Greaterf (c , len (logs [helper .name ]), 0 , "receiver %v does not have any logs" , helper )
240+
241+ << << << < HEAD
249242 assert .Equal (c , "filebeatreceiver/r1" , logs ["r1" ][0 ].Flatten ()["agent.otelcol.component.id" ], "expected agent.otelcol.component.id field in r1 log record" )
250243 assert .Equal (c , "receiver" , logs ["r1" ][0 ].Flatten ()["agent.otelcol.component.kind" ], "expected agent.otelcol.component.kind field in r1 log record" )
251244 assert .Equal (c , "filebeatreceiver/r2" , logs ["r2" ][0 ].Flatten ()["agent.otelcol.component.id" ], "expected agent.otelcol.component.id field in r2 log record" )
252245 assert .Equal (c , "receiver" , logs ["r2" ][0 ].Flatten ()["agent.otelcol.component.kind" ], "expected agent.otelcol.component.kind field in r2 log record" )
246+ == == == =
247+ assert .Equalf (c , "test" , logs [helper .name ][0 ].Flatten ()["message" ], "expected %v message field to be 'test'" , helper )
248+ >> >> >> > 135 d04f52 (fbreceiver : refactor receiver tests for maintainability (#47969 ))
249+
250+ // Make sure that each receiver has a separate logger
251+ // instance and does not interfere with others. Previously, the
252+ // logger in Beats was global, causing logger fields to be
253+ // overwritten when multiple receivers started in the same process.
254+ startLogs := zapLogs .FilterMessageSnippet ("Beat ID" ).FilterField (zap .String ("otelcol.component.id" , "filebeatreceiver/" + helper .name ))
255+ assert .Equalf (c , 1 , startLogs .Len (), "%v should have a single start log" , helper )
256+
257+ metaPath := filepath .Join (helper .home , "/data/meta.json" )
258+ assert .FileExistsf (c , metaPath , "%s of %v should exist" , metaPath , helper )
259+ metaData , err := os .ReadFile (metaPath )
260+ assert .NoError (c , err )
261+ allMetaData = append (allMetaData , string (metaData ))
262+
263+ var lastError strings.Builder
264+ assert .Conditionf (c , func () bool {
265+ return getFromSocket (t , & lastError , helper .monitorSocket , "stats" )
266+ }, "failed to connect to monitoring socket of %v, stats endpoint, last error was: %s" , helper , & lastError )
267+ assert .Conditionf (c , func () bool {
268+ return getFromSocket (t , & lastError , helper .monitorSocket , "inputs" )
269+ }, "failed to connect to monitoring socket of %v, inputs endpoint, last error was: %s" , helper , & lastError )
270+
271+ ingestJson , err := json.Marshal (helper .ingest )
272+ assert .NoError (c , err )
273+
274+ regPath := filepath .Join (helper .home , "/data/registry/filebeat/log.json" )
275+ assert .FileExistsf (c , regPath , "receiver %v filebeat registry should exist" , helper )
276+ regData , err := os.ReadFile (regPath )
277+ allRegData = append (allRegData , string (regData ))
278+ assert .NoError (c , err )
279+ assert .Containsf (c , string (regData ), string (ingestJson ), "receiver %v registry should contain '%s', but was: %s" , helper , string (ingestJson ), string (regData ))
280+ }
253281
254- // Make sure that each receiver has a separate logger
255- // instance and does not interfere with others. Previously, the
256- // logger in Beats was global, causing logger fields to be
257- // overwritten when multiple receivers started in the same process.
258- r1StartLogs := zapLogs .FilterMessageSnippet ("Beat ID" ).FilterField (zap .String ("otelcol.component.id" , "filebeatreceiver/r1" ))
259- assert .Equal (c , 1 , r1StartLogs .Len (), "r1 should have a single start log" )
260- r2StartLogs := zapLogs .FilterMessageSnippet ("Beat ID" ).FilterField (zap .String ("otelcol.component.id" , "filebeatreceiver/r2" ))
261- assert .Equal (c , 1 , r2StartLogs .Len (), "r2 should have a single start log" )
262-
263- meta1Path := filepath .Join (dir1 , "/data/meta.json" )
264- assert .FileExists (c , meta1Path , "dir1/data/meta.json should exist" )
265- meta1Data , err := os .ReadFile (meta1Path )
266- assert .NoError (c , err )
267-
268- meta2Path := filepath .Join (dir2 , "/data/meta.json" )
269- assert .FileExists (c , meta2Path , "dir2/data/meta.json should exist" )
270- meta2Data , err := os .ReadFile (meta2Path )
271- assert .NoError (c , err )
272-
273- assert .NotEqual (c , meta1Data , meta2Data , "meta data files should be different" )
274-
275- var lastError strings.Builder
276- assert .Conditionf (c , func () bool {
277- return getFromSocket (t , & lastError , monitorSocket1 , "stats" )
278- }, "failed to connect to monitoring socket1, stats endpoint, last error was: %s" , & lastError )
279- assert .Conditionf (c , func () bool {
280- return getFromSocket (t , & lastError , monitorSocket1 , "inputs" )
281- }, "failed to connect to monitoring socket1, inputs endpoint, last error was: %s" , & lastError )
282- assert .Conditionf (c , func () bool {
283- return getFromSocket (t , & lastError , monitorSocket2 , "stats" )
284- }, "failed to connect to monitoring socket2, stats endpoint, last error was: %s" , & lastError )
285- assert .Conditionf (c , func () bool {
286- return getFromSocket (t , & lastError , monitorSocket2 , "inputs" )
287- }, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s" , & lastError )
288-
289- ingest1Json , err := json .Marshal (ingest1 )
290- require .NoError (c , err )
291- ingest2Json , err := json .Marshal (ingest2 )
292- require .NoError (c , err )
293-
294- reg1Path := filepath .Join (dir1 , "/data/registry/filebeat/log.json" )
295- require .FileExists (c , reg1Path , "receiver 1 filebeat registry should exist" )
296- reg1Data , err := os .ReadFile (reg1Path )
297- require .NoError (c , err )
298- require .Containsf (c , string (reg1Data ), string (ingest1Json ), "receiver 1 registry should contain '%s', but was: %s" , string (ingest1Json ), string (reg1Data ))
299- require .NotContainsf (c , string (reg1Data ), string (ingest2Json ), "receiver 1 registry should not contain '%s', but was: %s" , string (ingest2Json ), string (reg1Data ))
300-
301- reg2Path := filepath .Join (dir2 , "/data/registry/filebeat/log.json" )
302- require .FileExists (c , reg2Path , "receiver 2 filebeat registry should exist" )
303- reg2Data , err := os .ReadFile (reg2Path )
304- require .NoError (c , err )
305- require .Containsf (c , string (reg2Data ), string (ingest2Json ), "receiver 2 registry should contain '%s', but was: %s" , string (ingest2Json ), string (reg2Data ))
306- require .NotContainsf (c , string (reg2Data ), string (ingest1Json ), "receiver 2 registry should not contain '%s', but was: %s" , string (ingest1Json ), string (reg2Data ))
282+ for i := range nReceivers {
283+ for j := range nReceivers {
284+ if i == j {
285+ continue
286+ }
287+ h1 := helpers [i ]
288+ h2 := helpers [j ]
289+ assert .NotEqualf (c , allMetaData [i ], allMetaData [j ], "meta data files between %v and %v should be different" , h1 , h2 )
290+ assert .NotContainsf (c , allRegData [i ], allRegData [j ], "receiver %v registry should not contain data from %v registry" , h1 , h2 )
291+ }
292+ }
307293 },
308294 })
309295}
@@ -379,14 +365,14 @@ func TestReceiverDegraded(t *testing.T) {
379365 }
380366}
381367
382- func genSocketPath () string {
383- randData := make ([] byte , 16 )
384- for i := range len ( randData ) {
385- randData [ i ] = uint8 ( rand . UintN ( 255 )) //nolint:gosec // 0-255 fits in a uint8
386- }
387- socketName := base64 . URLEncoding . EncodeToString ( randData ) + ".sock"
388- socketDir : = os .TempDir ( )
389- return filepath . Join ( socketDir , socketName )
368+ func genSocketPath (t * testing. T ) string {
369+ t . Helper ( )
370+ socketName , err := uuid . NewV4 ()
371+ require . NoError ( t , err )
372+ // Use os.TempDir() for short Unix socket paths
373+ sockPath := filepath . Join ( os . TempDir (), socketName . String () + ".sock" )
374+ t . Cleanup ( func () { _ = os .Remove ( sockPath ) } )
375+ return sockPath
390376}
391377
392378func getFromSocket (t * testing.T , sb * strings.Builder , socketPath string , endpoint string ) bool {
@@ -396,8 +382,8 @@ func getFromSocket(t *testing.T, sb *strings.Builder, socketPath string, endpoin
396382 }
397383 client := http.Client {
398384 Transport : & http.Transport {
399- DialContext : func (_ context.Context , _ , _ string ) (net.Conn , error ) {
400- return net .Dial ( "unix" , socketPath )
385+ DialContext : func (ctx context.Context , _ , _ string ) (net.Conn , error ) {
386+ return ( & net.Dialer {}). DialContext ( ctx , "unix" , socketPath )
401387 },
402388 },
403389 }
@@ -493,3 +479,17 @@ func TestReceiverHook(t *testing.T) {
493479 // one for beat metrics, one for input metrics and one for getting the registry.
494480 oteltest .TestReceiverHook (t , & cfg , NewFactory (), receiverSettings , 3 )
495481}
482+
483+ func hostFromSocket (socket string ) string {
484+ if runtime .GOOS == "windows" {
485+ return "npipe:///" + filepath .Base (socket )
486+ }
487+ return "unix://" + socket
488+ }
489+
490+ func writeFile (t require.TestingT , path string , data string ) {
491+ f , err := os .OpenFile (path , os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0o644 )
492+ require .NoErrorf (t , err , "Could not open file %s" , path )
493+ _ , err = f .WriteString (data + "\n " )
494+ require .NoErrorf (t , err , "Could not write %s to file %s" , data , path )
495+ }
0 commit comments