Skip to content

Commit 04b535a

Browse files
authored
Fix to allow multiple access logs to collect metrics (#1100)
* add nil check * fix race conditions * fix race conditions * fix race conditions * fix issue when two formats are set for the same access log * clean up * format * change log * change log * fix multiple access logs * PR feedback * PR feedback
1 parent 0de3ba3 commit 04b535a

File tree

2 files changed

+24
-18
lines changed

2 files changed

+24
-18
lines changed

internal/collector/nginxossreceiver/internal/scraper/accesslog/nginx_log_scraper.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package accesslog
77

88
import (
99
"context"
10+
"errors"
1011
"fmt"
1112
"strconv"
1213
"sync"
@@ -42,7 +43,7 @@ type (
4243
logger *zap.Logger
4344
mb *metadata.MetricsBuilder
4445
rb *metadata.ResourceBuilder
45-
pipe *pipeline.DirectedPipeline
46+
pipes []*pipeline.DirectedPipeline
4647
wg *sync.WaitGroup
4748
cancel context.CancelFunc
4849
entries []*entry.Entry
@@ -103,20 +104,27 @@ func (nls *NginxLogScraper) ID() component.ID {
103104
return component.NewID(metadata.Type)
104105
}
105106

107+
// nolint: unparam
106108
func (nls *NginxLogScraper) Start(parentCtx context.Context, _ component.Host) error {
107109
nls.logger.Info("NGINX access log scraper started")
108110
ctx, cancel := context.WithCancel(parentCtx)
109111
nls.cancel = cancel
110112

111-
var err error
112-
nls.pipe, err = nls.initStanzaPipeline(nls.operators, nls.logger)
113-
if err != nil {
114-
return fmt.Errorf("init stanza pipeline: %w", err)
113+
for _, op := range nls.operators {
114+
nls.logger.Info("Initializing NGINX access log scraper pipeline", zap.Any("operator_id", op.ID()))
115+
pipe, err := nls.initStanzaPipeline([]operator.Config{op}, nls.logger)
116+
if err != nil {
117+
nls.logger.Error("Error initializing pipeline", zap.Any("operator_id", op.ID()), zap.Any("error", err))
118+
continue
119+
}
120+
nls.pipes = append(nls.pipes, pipe)
115121
}
116122

117-
startError := nls.pipe.Start(storage.NewNopClient())
118-
if startError != nil {
119-
return fmt.Errorf("stanza pipeline start: %w", startError)
123+
for _, pipe := range nls.pipes {
124+
startError := pipe.Start(storage.NewNopClient())
125+
if startError != nil {
126+
nls.logger.Error("Error starting pipeline", zap.Any("error", startError))
127+
}
120128
}
121129

122130
nls.wg.Add(1)
@@ -206,7 +214,14 @@ func (nls *NginxLogScraper) Shutdown(_ context.Context) error {
206214
}
207215
nls.wg.Wait()
208216

209-
return nls.pipe.Stop()
217+
var err error
218+
for _, pipe := range nls.pipes {
219+
if stopErr := pipe.Stop(); stopErr != nil {
220+
err = errors.Join(err, stopErr)
221+
}
222+
}
223+
224+
return err
210225
}
211226

212227
func (nls *NginxLogScraper) initStanzaPipeline(

internal/collector/nginxossreceiver/internal/scraper/accesslog/nginx_log_scraper_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,6 @@ func TestAccessLogScraper(t *testing.T) {
8282
pmetrictest.IgnoreResourceAttributeValue("instance.id")))
8383
}
8484

85-
func TestAccessLogScraperError(t *testing.T) {
86-
t.Run("include config missing", func(tt *testing.T) {
87-
logScraper := NewScraper(receivertest.NewNopSettings(component.Type{}), &config.Config{})
88-
err := logScraper.Start(context.Background(), componenttest.NewNopHost())
89-
require.Error(tt, err)
90-
assert.Contains(tt, err.Error(), "init stanza pipeline")
91-
})
92-
}
93-
9485
// Copies the contents of one file to another with the given delay. Used to simulate writing log entries to a log file.
9586
// Reason for nolint: we must use testify's assert instead of require,
9687
// for more info see https://github.com/stretchr/testify/issues/772#issuecomment-945166599

0 commit comments

Comments
 (0)