|
42 | 42 | logger *zap.Logger |
43 | 43 | mb *metadata.MetricsBuilder |
44 | 44 | rb *metadata.ResourceBuilder |
45 | | - pipe *pipeline.DirectedPipeline |
| 45 | + pipes []*pipeline.DirectedPipeline |
46 | 46 | wg *sync.WaitGroup |
47 | 47 | cancel context.CancelFunc |
48 | 48 | entries []*entry.Entry |
@@ -103,20 +103,27 @@ func (nls *NginxLogScraper) ID() component.ID { |
103 | 103 | return component.NewID(metadata.Type) |
104 | 104 | } |
105 | 105 |
|
| 106 | +// nolint: unparam |
106 | 107 | func (nls *NginxLogScraper) Start(parentCtx context.Context, _ component.Host) error { |
107 | 108 | nls.logger.Info("NGINX access log scraper started") |
108 | 109 | ctx, cancel := context.WithCancel(parentCtx) |
109 | 110 | nls.cancel = cancel |
110 | 111 |
|
111 | | - var err error |
112 | | - nls.pipe, err = nls.initStanzaPipeline(nls.operators, nls.logger) |
113 | | - if err != nil { |
114 | | - return fmt.Errorf("init stanza pipeline: %w", err) |
| 112 | + for _, op := range nls.operators { |
| 113 | + nls.logger.Info("Initializing NGINX access log scraper pipeline", zap.Any("operator_id", op.ID())) |
| 114 | + pipe, err := nls.initStanzaPipeline([]operator.Config{op}, nls.logger) |
| 115 | + if err != nil { |
| 116 | + nls.logger.Error("Error initializing pipeline", zap.Any("id", op.ID()), zap.Any("error", err)) |
| 117 | + continue |
| 118 | + } |
| 119 | + nls.pipes = append(nls.pipes, pipe) |
115 | 120 | } |
116 | 121 |
|
117 | | - startError := nls.pipe.Start(storage.NewNopClient()) |
118 | | - if startError != nil { |
119 | | - return fmt.Errorf("stanza pipeline start: %w", startError) |
| 122 | + for _, pipe := range nls.pipes { |
| 123 | + startError := pipe.Start(storage.NewNopClient()) |
| 124 | + if startError != nil { |
| 125 | + nls.logger.Error("Error starting pipeline", zap.Any("error", startError)) |
| 126 | + } |
120 | 127 | } |
121 | 128 |
|
122 | 129 | nls.wg.Add(1) |
@@ -206,7 +213,12 @@ func (nls *NginxLogScraper) Shutdown(_ context.Context) error { |
206 | 213 | } |
207 | 214 | nls.wg.Wait() |
208 | 215 |
|
209 | | - return nls.pipe.Stop() |
| 216 | + var err error |
| 217 | + for _, pipe := range nls.pipes { |
| 218 | + err = pipe.Stop() |
| 219 | + } |
| 220 | + |
| 221 | + return err |
210 | 222 | } |
211 | 223 |
|
212 | 224 | func (nls *NginxLogScraper) initStanzaPipeline( |
|
0 commit comments