Skip to content

Commit 7ed83e0

Browse files
committed
Add support for configuring mutliple OTel pipelines
1 parent a4918b3 commit 7ed83e0

File tree

13 files changed

+636
-417
lines changed

13 files changed

+636
-417
lines changed

internal/collector/otel_collector_plugin.go

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var (
5959
initMutex = &sync.Mutex{}
6060
)
6161

62-
// NewCollector is the constructor for the Collector plugin.
62+
// New is the constructor for the Collector plugin.
6363
func New(conf *config.Config) (*Collector, error) {
6464
initMutex.Lock()
6565

@@ -194,7 +194,7 @@ func (oc *Collector) Subscriptions() []string {
194194
}
195195

196196
// Process receivers and log warning for sub-optimal configurations
197-
func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) {
197+
func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) {
198198
for _, receiver := range receivers {
199199
if receiver.OtlpTLSConfig == nil {
200200
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
@@ -317,12 +317,13 @@ func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource)
317317
resourceProcessorUpdated := false
318318

319319
if oc.config.Collector.Processors.Resource == nil {
320-
oc.config.Collector.Processors.Resource = &config.Resource{
320+
oc.config.Collector.Processors.Resource = make(map[string]*config.Resource)
321+
oc.config.Collector.Processors.Resource["default"] = &config.Resource{
321322
Attributes: make([]config.ResourceAttribute, 0),
322323
}
323324
}
324325

325-
if oc.config.Collector.Processors.Resource != nil &&
326+
if oc.config.Collector.Processors.Resource["default"] != nil &&
326327
resourceUpdateContext.GetResourceId() != "" {
327328
resourceProcessorUpdated = oc.updateResourceAttributes(
328329
[]config.ResourceAttribute{
@@ -431,7 +432,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex
431432
}
432433

433434
if oc.config.IsFeatureEnabled(pkgConfig.FeatureLogsNap) {
434-
tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext)
435+
tcplogReceiversFound := oc.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
435436
if tcplogReceiversFound {
436437
reloadCollector = true
437438
}
@@ -541,49 +542,51 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
541542
return nginxReceiverFound, reloadCollector
542543
}
543544

544-
func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
545+
func (oc *Collector) updateNginxAppProtectTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
545546
newTcplogReceiverAdded := false
547+
548+
if oc.config.Collector.Receivers.TcplogReceivers == nil {
549+
oc.config.Collector.Receivers.TcplogReceivers = make(map[string]*config.TcplogReceiver)
550+
}
551+
546552
if nginxConfigContext.NAPSysLogServers != nil {
547553
napLoop:
548554
for _, napSysLogServer := range nginxConfigContext.NAPSysLogServers {
549555
if oc.doesTcplogReceiverAlreadyExist(napSysLogServer) {
550556
continue napLoop
551557
}
552558

553-
oc.config.Collector.Receivers.TcplogReceivers = append(
554-
oc.config.Collector.Receivers.TcplogReceivers,
555-
config.TcplogReceiver{
556-
ListenAddress: napSysLogServer,
557-
Operators: []config.Operator{
558-
{
559-
Type: "add",
560-
Fields: map[string]string{
561-
"field": "body",
562-
"value": timestampConversionExpression,
563-
},
559+
oc.config.Collector.Receivers.TcplogReceivers["nginx_app_protect"] = &config.TcplogReceiver{
560+
ListenAddress: napSysLogServer,
561+
Operators: []config.Operator{
562+
{
563+
Type: "add",
564+
Fields: map[string]string{
565+
"field": "body",
566+
"value": timestampConversionExpression,
564567
},
565-
{
566-
Type: "syslog_parser",
567-
Fields: map[string]string{
568-
"protocol": "rfc3164",
569-
},
568+
},
569+
{
570+
Type: "syslog_parser",
571+
Fields: map[string]string{
572+
"protocol": "rfc3164",
570573
},
571-
{
572-
Type: "remove",
573-
Fields: map[string]string{
574-
"field": "attributes.message",
575-
},
574+
},
575+
{
576+
Type: "remove",
577+
Fields: map[string]string{
578+
"field": "attributes.message",
576579
},
577-
{
578-
Type: "add",
579-
Fields: map[string]string{
580-
"field": "resource[\"instance.id\"]",
581-
"value": nginxConfigContext.InstanceID,
582-
},
580+
},
581+
{
582+
Type: "add",
583+
Fields: map[string]string{
584+
"field": "resource[\"instance.id\"]",
585+
"value": nginxConfigContext.InstanceID,
583586
},
584587
},
585588
},
586-
)
589+
}
587590

588591
newTcplogReceiverAdded = true
589592
}
@@ -597,23 +600,13 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig
597600
func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool {
598601
listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext)
599602
if len(listenAddressesToBeDeleted) != 0 {
600-
oc.deleteNapReceivers(listenAddressesToBeDeleted)
603+
delete(oc.config.Collector.Receivers.TcplogReceivers, "nginx_app_protect")
601604
return true
602605
}
603606

604607
return false
605608
}
606609

607-
func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) {
608-
filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0]
609-
for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers {
610-
if !listenAddressesToBeDeleted[receiver.ListenAddress] {
611-
filteredReceivers = append(filteredReceivers, receiver)
612-
}
613-
}
614-
oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers
615-
}
616-
617610
func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
618611
elements := make(map[string]bool)
619612

@@ -651,16 +644,16 @@ func (oc *Collector) updateResourceAttributes(
651644
) (actionUpdated bool) {
652645
actionUpdated = false
653646

654-
if oc.config.Collector.Processors.Resource.Attributes != nil {
647+
if oc.config.Collector.Processors.Resource["default"].Attributes != nil {
655648
OUTER:
656649
for _, toAdd := range attributesToAdd {
657-
for _, action := range oc.config.Collector.Processors.Resource.Attributes {
650+
for _, action := range oc.config.Collector.Processors.Resource["default"].Attributes {
658651
if action.Key == toAdd.Key {
659652
continue OUTER
660653
}
661654
}
662-
oc.config.Collector.Processors.Resource.Attributes = append(
663-
oc.config.Collector.Processors.Resource.Attributes,
655+
oc.config.Collector.Processors.Resource["default"].Attributes = append(
656+
oc.config.Collector.Processors.Resource["default"].Attributes,
664657
toAdd,
665658
)
666659
actionUpdated = true

internal/collector/otel_collector_plugin_test.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,14 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
349349
Data: protos.HostResource(),
350350
},
351351
processors: config.Processors{
352-
Resource: &config.Resource{
353-
Attributes: []config.ResourceAttribute{
354-
{
355-
Key: "resource.id",
356-
Action: "insert",
357-
Value: "1234",
352+
Resource: map[string]*config.Resource{
353+
"default": {
354+
Attributes: []config.ResourceAttribute{
355+
{
356+
Key: "resource.id",
357+
Action: "insert",
358+
Value: "1234",
359+
},
358360
},
359361
},
360362
},
@@ -711,18 +713,19 @@ func TestCollector_updateResourceAttributes(t *testing.T) {
711713
collector.service = createFakeCollector()
712714

713715
// set up Actions
714-
conf.Collector.Processors.Resource = &config.Resource{Attributes: test.setup}
716+
conf.Collector.Processors.Resource = make(map[string]*config.Resource)
717+
conf.Collector.Processors.Resource["default"] = &config.Resource{Attributes: test.setup}
715718

716719
reloadRequired := collector.updateResourceAttributes(test.attributes)
717720
assert.Equal(tt,
718721
test.expectedAttribs,
719-
conf.Collector.Processors.Resource.Attributes)
722+
conf.Collector.Processors.Resource["default"].Attributes)
720723
assert.Equal(tt, test.expectedReloadRequired, reloadRequired)
721724
})
722725
}
723726
}
724727

725-
func TestCollector_updateTcplogReceivers(t *testing.T) {
728+
func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) {
726729
conf := types.OTelConfig(t)
727730
conf.Collector.Log.Path = ""
728731
conf.Collector.Processors.Batch = nil
@@ -741,38 +744,43 @@ func TestCollector_updateTcplogReceivers(t *testing.T) {
741744
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
742745

743746
t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) {
744-
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
747+
tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
745748

746749
assert.True(tt, tcplogReceiverAdded)
747750
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1)
748-
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
749-
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
751+
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
752+
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4)
750753
})
751754

752-
// Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice
755+
// Calling updateNginxAppProtectTcplogReceivers shouldn't update the TcplogReceivers slice
753756
// since there is already a receiver with the same ListenAddress
754757
t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) {
755-
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
758+
tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
756759
assert.False(t, tcplogReceiverAdded)
757760
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
758-
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
759-
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
761+
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
762+
assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4)
760763
})
761764

762765
t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) {
763-
tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{})
766+
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(&model.NginxConfigContext{})
764767
assert.True(t, tcplogReceiverDeleted)
765768
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
766769
})
767770

768771
t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) {
769-
tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServers: []string{
770-
"localhost:152",
771-
}})
772+
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(
773+
&model.NginxConfigContext{
774+
NAPSysLogServers: []string{
775+
"localhost:152",
776+
},
777+
},
778+
)
779+
772780
assert.True(t, tcplogReceiverDeleted)
773781
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
774-
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
775-
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 4)
782+
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
783+
assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 4)
776784
})
777785
}
778786

0 commit comments

Comments
 (0)