Skip to content

Commit 1b69bf3

Browse files
committed
merge main
2 parents b77612c + e1649fa commit 1b69bf3

23 files changed

+763
-466
lines changed

internal/collector/otel_collector_plugin.go

Lines changed: 60 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ var (
6060
)
6161

6262
// NewCollector is the constructor for the Collector plugin.
63-
func New(conf *config.Config) (*Collector, error) {
63+
func NewCollector(conf *config.Config) (*Collector, error) {
6464
initMutex.Lock()
6565

6666
defer initMutex.Unlock()
@@ -194,7 +194,7 @@ func (oc *Collector) Subscriptions() []string {
194194
}
195195

196196
// Process receivers and log warning for sub-optimal configurations
197-
func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) {
197+
func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) {
198198
for _, receiver := range receivers {
199199
if receiver.OtlpTLSConfig == nil {
200200
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
@@ -317,12 +317,13 @@ func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource)
317317
resourceProcessorUpdated := false
318318

319319
if oc.config.Collector.Processors.Resource == nil {
320-
oc.config.Collector.Processors.Resource = &config.Resource{
320+
oc.config.Collector.Processors.Resource = make(map[string]*config.Resource)
321+
oc.config.Collector.Processors.Resource["default"] = &config.Resource{
321322
Attributes: make([]config.ResourceAttribute, 0),
322323
}
323324
}
324325

325-
if oc.config.Collector.Processors.Resource != nil &&
326+
if oc.config.Collector.Processors.Resource["default"] != nil &&
326327
resourceUpdateContext.GetResourceId() != "" {
327328
resourceProcessorUpdated = oc.updateResourceAttributes(
328329
[]config.ResourceAttribute{
@@ -431,7 +432,7 @@ func (oc *Collector) checkForNewReceivers(ctx context.Context, nginxConfigContex
431432
}
432433

433434
if oc.config.IsFeatureEnabled(pkgConfig.FeatureLogsNap) {
434-
tcplogReceiversFound := oc.updateTcplogReceivers(nginxConfigContext)
435+
tcplogReceiversFound := oc.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
435436
if tcplogReceiversFound {
436437
reloadCollector = true
437438
}
@@ -541,62 +542,64 @@ func (oc *Collector) updateExistingNginxOSSReceiver(
541542
return nginxReceiverFound, reloadCollector
542543
}
543544

544-
func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
545+
func (oc *Collector) updateNginxAppProtectTcplogReceivers(nginxConfigContext *model.NginxConfigContext) bool {
545546
newTcplogReceiverAdded := false
547+
548+
if oc.config.Collector.Receivers.TcplogReceivers == nil {
549+
oc.config.Collector.Receivers.TcplogReceivers = make(map[string]*config.TcplogReceiver)
550+
}
551+
546552
if nginxConfigContext.NAPSysLogServer != "" {
547553
if !oc.doesTcplogReceiverAlreadyExist(nginxConfigContext.NAPSysLogServer) {
548-
oc.config.Collector.Receivers.TcplogReceivers = append(
549-
oc.config.Collector.Receivers.TcplogReceivers,
550-
config.TcplogReceiver{
551-
ListenAddress: nginxConfigContext.NAPSysLogServer,
552-
Operators: []config.Operator{
553-
// regex captures the priority number from the log line
554-
{
555-
Type: "regex_parser",
556-
Fields: map[string]string{
557-
"regex": "^<(?P<priority>\\d+)>",
558-
"parse_from": "body",
559-
"parse_to": "attributes",
560-
},
561-
},
562-
// filter drops all logs that have a severity above 4
563-
// https://docs.secureauth.com/0902/en/how-to-read-a-syslog-message.html#severity-code-table
564-
{
565-
Type: "filter",
566-
Fields: map[string]string{
567-
"expr": "'int(attributes.priority) % 8 > 4'",
568-
"drop_ratio": "1.0",
569-
},
570-
},
571-
{
572-
Type: "add",
573-
Fields: map[string]string{
574-
"field": "body",
575-
"value": timestampConversionExpression,
576-
},
554+
oc.config.Collector.Receivers.TcplogReceivers["nginx_app_protect"] = &config.TcplogReceiver{
555+
ListenAddress: nginxConfigContext.NAPSysLogServer,
556+
Operators: []config.Operator{
557+
// regex captures the priority number from the log line
558+
{
559+
Type: "regex_parser",
560+
Fields: map[string]string{
561+
"regex": "^<(?P<priority>\\d+)>",
562+
"parse_from": "body",
563+
"parse_to": "attributes",
564+
},
565+
},
566+
// filter drops all logs that have a severity above 4
567+
// https://docs.secureauth.com/0902/en/how-to-read-a-syslog-message.html#severity-code-table
568+
{
569+
Type: "filter",
570+
Fields: map[string]string{
571+
"expr": "'int(attributes.priority) % 8 > 4'",
572+
"drop_ratio": "1.0",
573+
},
574+
},
575+
{
576+
Type: "add",
577+
Fields: map[string]string{
578+
"field": "body",
579+
"value": timestampConversionExpression,
577580
},
578-
{
579-
Type: "syslog_parser",
580-
Fields: map[string]string{
581-
"protocol": "rfc3164",
582-
},
581+
},
582+
{
583+
Type: "syslog_parser",
584+
Fields: map[string]string{
585+
"protocol": "rfc3164",
583586
},
584-
{
585-
Type: "remove",
586-
Fields: map[string]string{
587-
"field": "attributes.message",
588-
},
587+
},
588+
{
589+
Type: "remove",
590+
Fields: map[string]string{
591+
"field": "attributes.message",
589592
},
590-
{
591-
Type: "add",
592-
Fields: map[string]string{
593-
"field": "resource[\"instance.id\"]",
594-
"value": nginxConfigContext.InstanceID,
595-
},
593+
},
594+
{
595+
Type: "add",
596+
Fields: map[string]string{
597+
"field": "resource[\"instance.id\"]",
598+
"value": nginxConfigContext.InstanceID,
596599
},
597600
},
598601
},
599-
)
602+
}
600603

601604
newTcplogReceiverAdded = true
602605
}
@@ -610,23 +613,13 @@ func (oc *Collector) updateTcplogReceivers(nginxConfigContext *model.NginxConfig
610613
func (oc *Collector) areNapReceiversDeleted(nginxConfigContext *model.NginxConfigContext) bool {
611614
listenAddressesToBeDeleted := oc.configDeletedNapReceivers(nginxConfigContext)
612615
if len(listenAddressesToBeDeleted) != 0 {
613-
oc.deleteNapReceivers(listenAddressesToBeDeleted)
616+
delete(oc.config.Collector.Receivers.TcplogReceivers, "nginx_app_protect")
614617
return true
615618
}
616619

617620
return false
618621
}
619622

620-
func (oc *Collector) deleteNapReceivers(listenAddressesToBeDeleted map[string]bool) {
621-
filteredReceivers := (oc.config.Collector.Receivers.TcplogReceivers)[:0]
622-
for _, receiver := range oc.config.Collector.Receivers.TcplogReceivers {
623-
if !listenAddressesToBeDeleted[receiver.ListenAddress] {
624-
filteredReceivers = append(filteredReceivers, receiver)
625-
}
626-
}
627-
oc.config.Collector.Receivers.TcplogReceivers = filteredReceivers
628-
}
629-
630623
func (oc *Collector) configDeletedNapReceivers(nginxConfigContext *model.NginxConfigContext) map[string]bool {
631624
elements := make(map[string]bool)
632625

@@ -662,16 +655,16 @@ func (oc *Collector) updateResourceAttributes(
662655
) (actionUpdated bool) {
663656
actionUpdated = false
664657

665-
if oc.config.Collector.Processors.Resource.Attributes != nil {
658+
if oc.config.Collector.Processors.Resource["default"].Attributes != nil {
666659
OUTER:
667660
for _, toAdd := range attributesToAdd {
668-
for _, action := range oc.config.Collector.Processors.Resource.Attributes {
661+
for _, action := range oc.config.Collector.Processors.Resource["default"].Attributes {
669662
if action.Key == toAdd.Key {
670663
continue OUTER
671664
}
672665
}
673-
oc.config.Collector.Processors.Resource.Attributes = append(
674-
oc.config.Collector.Processors.Resource.Attributes,
666+
oc.config.Collector.Processors.Resource["default"].Attributes = append(
667+
oc.config.Collector.Processors.Resource["default"].Attributes,
675668
toAdd,
676669
)
677670
actionUpdated = true

internal/collector/otel_collector_plugin_test.go

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestCollector_New(t *testing.T) {
6767

6868
for _, tt := range tests {
6969
t.Run(tt.name, func(t *testing.T) {
70-
collector, err := New(tt.config)
70+
collector, err := NewCollector(tt.config)
7171

7272
if tt.expectedError != nil {
7373
require.Error(t, err)
@@ -114,7 +114,7 @@ func TestCollector_Init(t *testing.T) {
114114
conf.Collector.Receivers = config.Receivers{}
115115
}
116116

117-
collector, err = New(conf)
117+
collector, err = NewCollector(conf)
118118
require.NoError(t, err, "NewCollector should not return an error with valid config")
119119

120120
collector.service = createFakeCollector()
@@ -133,7 +133,7 @@ func TestCollector_InitAndClose(t *testing.T) {
133133
conf := types.OTelConfig(t)
134134
conf.Collector.Log.Path = ""
135135

136-
collector, err := New(conf)
136+
collector, err := NewCollector(conf)
137137
require.NoError(t, err, "NewCollector should not return an error with valid config")
138138

139139
ctx := context.Background()
@@ -293,7 +293,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) {
293293
conf.Collector.Extensions.HeadersSetter = nil
294294
conf.Collector.Exporters.PrometheusExporter = nil
295295

296-
collector, err := New(conf)
296+
collector, err := NewCollector(conf)
297297
require.NoError(tt, err, "NewCollector should not return an error with valid config")
298298

299299
collector.service = createFakeCollector()
@@ -349,12 +349,14 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
349349
Data: protos.HostResource(),
350350
},
351351
processors: config.Processors{
352-
Resource: &config.Resource{
353-
Attributes: []config.ResourceAttribute{
354-
{
355-
Key: "resource.id",
356-
Action: "insert",
357-
Value: "1234",
352+
Resource: map[string]*config.Resource{
353+
"default": {
354+
Attributes: []config.ResourceAttribute{
355+
{
356+
Key: "resource.id",
357+
Action: "insert",
358+
Value: "1234",
359+
},
358360
},
359361
},
360362
},
@@ -376,7 +378,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) {
376378

377379
for _, test := range tests {
378380
t.Run(test.name, func(tt *testing.T) {
379-
collector, err := New(conf)
381+
collector, err := NewCollector(conf)
380382
require.NoError(tt, err, "NewCollector should not return an error with valid config")
381383

382384
collector.service = createFakeCollector()
@@ -437,7 +439,7 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) {
437439

438440
for _, test := range tests {
439441
t.Run(test.name, func(tt *testing.T) {
440-
collector, err := New(conf)
442+
collector, err := NewCollector(conf)
441443
require.NoError(tt, err, "NewCollector should not return an error with valid config")
442444

443445
collector.service = createFakeCollector()
@@ -559,7 +561,7 @@ func TestCollector_updateExistingNginxOSSReceiver(t *testing.T) {
559561
for _, test := range tests {
560562
t.Run(test.name, func(tt *testing.T) {
561563
conf.Collector.Receivers = test.existingReceivers
562-
collector, err := New(conf)
564+
collector, err := NewCollector(conf)
563565
require.NoError(tt, err, "NewCollector should not return an error with valid config")
564566

565567
collector.service = createFakeCollector()
@@ -650,7 +652,7 @@ func TestCollector_updateExistingNginxPlusReceiver(t *testing.T) {
650652
for _, test := range tests {
651653
t.Run(test.name, func(tt *testing.T) {
652654
conf.Collector.Receivers = test.existingReceivers
653-
collector, err := New(conf)
655+
collector, err := NewCollector(conf)
654656
require.NoError(tt, err, "NewCollector should not return an error with valid config")
655657

656658
collector.service = createFakeCollector()
@@ -705,31 +707,32 @@ func TestCollector_updateResourceAttributes(t *testing.T) {
705707

706708
for _, test := range tests {
707709
t.Run(test.name, func(tt *testing.T) {
708-
collector, err := New(conf)
710+
collector, err := NewCollector(conf)
709711
require.NoError(tt, err, "NewCollector should not return an error with valid config")
710712

711713
collector.service = createFakeCollector()
712714

713715
// set up Actions
714-
conf.Collector.Processors.Resource = &config.Resource{Attributes: test.setup}
716+
conf.Collector.Processors.Resource = make(map[string]*config.Resource)
717+
conf.Collector.Processors.Resource["default"] = &config.Resource{Attributes: test.setup}
715718

716719
reloadRequired := collector.updateResourceAttributes(test.attributes)
717720
assert.Equal(tt,
718721
test.expectedAttribs,
719-
conf.Collector.Processors.Resource.Attributes)
722+
conf.Collector.Processors.Resource["default"].Attributes)
720723
assert.Equal(tt, test.expectedReloadRequired, reloadRequired)
721724
})
722725
}
723726
}
724727

725-
func TestCollector_updateTcplogReceivers(t *testing.T) {
728+
func TestCollector_updateNginxAppProtectTcplogReceivers(t *testing.T) {
726729
conf := types.OTelConfig(t)
727730
conf.Collector.Log.Path = ""
728731
conf.Collector.Processors.Batch = nil
729732
conf.Collector.Processors.Attribute = nil
730733
conf.Collector.Processors.Resource = nil
731734
conf.Collector.Processors.LogsGzip = nil
732-
collector, err := New(conf)
735+
collector, err := NewCollector(conf)
733736
require.NoError(t, err)
734737

735738
nginxConfigContext := &model.NginxConfigContext{
@@ -738,38 +741,42 @@ func TestCollector_updateTcplogReceivers(t *testing.T) {
738741

739742
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
740743

741-
t.Run("Test 1: New TcplogReceiver added", func(tt *testing.T) {
742-
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
744+
t.Run("Test 1: NewCollector TcplogReceiver added", func(tt *testing.T) {
745+
tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
743746

744747
assert.True(tt, tcplogReceiverAdded)
745748
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers, 1)
746-
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
747-
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers[0].Operators, 6)
749+
assert.Equal(tt, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
750+
assert.Len(tt, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 6)
748751
})
749752

750-
// Calling updateTcplogReceivers shouldn't update the TcplogReceivers slice
753+
// Calling updateNginxAppProtectTcplogReceivers shouldn't update the TcplogReceivers slice
751754
// since there is already a receiver with the same ListenAddress
752755
t.Run("Test 2: TcplogReceiver already exists", func(tt *testing.T) {
753-
tcplogReceiverAdded := collector.updateTcplogReceivers(nginxConfigContext)
756+
tcplogReceiverAdded := collector.updateNginxAppProtectTcplogReceivers(nginxConfigContext)
754757
assert.False(t, tcplogReceiverAdded)
755758
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
756-
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
757-
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 6)
759+
assert.Equal(t, "localhost:151", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
760+
assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 6)
758761
})
759762

760763
t.Run("Test 3: TcplogReceiver deleted", func(tt *testing.T) {
761-
tcplogReceiverDeleted := collector.updateTcplogReceivers(&model.NginxConfigContext{})
764+
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(&model.NginxConfigContext{})
762765
assert.True(t, tcplogReceiverDeleted)
763766
assert.Empty(t, conf.Collector.Receivers.TcplogReceivers)
764767
})
765768

766-
t.Run("Test 4: New tcplogReceiver added and deleted another", func(tt *testing.T) {
767-
tcplogReceiverDeleted := collector.
768-
updateTcplogReceivers(&model.NginxConfigContext{NAPSysLogServer: "localhost:152"})
769+
t.Run("Test 4: NewCollector tcplogReceiver added and deleted another", func(tt *testing.T) {
770+
tcplogReceiverDeleted := collector.updateNginxAppProtectTcplogReceivers(
771+
&model.NginxConfigContext{
772+
NAPSysLogServer: "localhost:152",
773+
},
774+
)
775+
769776
assert.True(t, tcplogReceiverDeleted)
770777
assert.Len(t, conf.Collector.Receivers.TcplogReceivers, 1)
771-
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers[0].ListenAddress)
772-
assert.Len(t, conf.Collector.Receivers.TcplogReceivers[0].Operators, 6)
778+
assert.Equal(t, "localhost:152", conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].ListenAddress)
779+
assert.Len(t, conf.Collector.Receivers.TcplogReceivers["nginx_app_protect"].Operators, 6)
773780
})
774781
}
775782

0 commit comments

Comments
 (0)