5353 }
5454)
5555
56+ var pluginLogOrigin = slog .String ("log_origin" , "otel_collector_plugin.go" )
57+
5658var (
5759 _ bus.Plugin = (* Collector )(nil )
5860 initMutex = & sync.Mutex {}
@@ -101,14 +103,16 @@ func (oc *Collector) GetState() otelcol.State {
101103
102104// Init initializes and starts the plugin
103105func (oc * Collector ) Init (ctx context.Context , mp bus.MessagePipeInterface ) error {
104- slog .InfoContext (ctx , "Starting OTel Collector plugin" )
106+ slog .InfoContext (ctx , "Starting OTel Collector plugin" , pluginLogOrigin )
105107
106108 var runCtx context.Context
107109 runCtx , oc .cancel = context .WithCancel (ctx )
108110
109111 if ! oc .config .AreReceiversConfigured () {
110112 slog .InfoContext (runCtx , "No receivers configured for OTel Collector. " +
111- "Waiting to discover a receiver before starting OTel collector." )
113+ "Waiting to discover a receiver before starting OTel collector." ,
114+ pluginLogOrigin ,
115+ )
112116
113117 return nil
114118 }
@@ -128,7 +132,7 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
128132
129133 bootErr := oc .bootup (runCtx )
130134 if bootErr != nil {
131- slog .ErrorContext (runCtx , "Unable to start OTel Collector" , "error" , bootErr )
135+ slog .ErrorContext (runCtx , "Unable to start OTel Collector" , "error" , bootErr , pluginLogOrigin )
132136 }
133137
134138 return nil
@@ -138,37 +142,40 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
138142func (oc * Collector ) processReceivers (ctx context.Context , receivers []config.OtlpReceiver ) {
139143 for _ , receiver := range receivers {
140144 if receiver .OtlpTLSConfig == nil {
141- slog .WarnContext (ctx , "OTel receiver is configured without TLS. Connections are unencrypted." )
145+ slog .WarnContext (ctx , "OTel receiver is configured without TLS. Connections are unencrypted." ,
146+ pluginLogOrigin )
147+
142148 continue
143149 }
144150
145151 if receiver .OtlpTLSConfig .GenerateSelfSignedCert {
146152 slog .WarnContext (ctx ,
147153 "Self-signed certificate for OTel receiver requested, " +
148154 "this is not recommended for production environments." ,
155+ pluginLogOrigin ,
149156 )
150157
151158 if receiver .OtlpTLSConfig .ExistingCert {
152159 slog .WarnContext (ctx ,
153- "Certificate file already exists, skipping self-signed certificate generation" ,
154- )
160+ "Certificate file already exists, skipping self-signed certificate generation" , pluginLogOrigin )
155161 }
156162 } else {
157- slog .WarnContext (ctx , "OTel receiver is configured without TLS. Connections are unencrypted." )
163+ slog .WarnContext (ctx , "OTel receiver is configured without TLS. Connections are unencrypted." ,
164+ pluginLogOrigin )
158165 }
159166 }
160167}
161168
162169func (oc * Collector ) bootup (ctx context.Context ) error {
163- slog .InfoContext (ctx , "Starting OTel collector" )
170+ slog .InfoContext (ctx , "Starting OTel collector" , pluginLogOrigin )
164171 errChan := make (chan error )
165172
166173 go func () {
167174 appErr := oc .service .Run (ctx )
168175 if appErr != nil {
169176 errChan <- appErr
170177 }
171- slog .InfoContext (ctx , "OTel collector run finished" )
178+ slog .InfoContext (ctx , "OTel collector run finished" , pluginLogOrigin )
172179 }()
173180
174181 for {
@@ -204,10 +211,10 @@ func (oc *Collector) Info() *bus.Info {
204211
205212// Close the plugin.
206213func (oc * Collector ) Close (ctx context.Context ) error {
207- slog .InfoContext (ctx , "Closing OTel Collector plugin" )
214+ slog .InfoContext (ctx , "Closing OTel Collector plugin" , pluginLogOrigin )
208215
209216 if ! oc .stopped {
210- slog .InfoContext (ctx , "Shutting down OTel Collector" , "state" , oc .service .GetState ())
217+ slog .InfoContext (ctx , "Shutting down OTel Collector" , "state" , oc .service .GetState (), pluginLogOrigin )
211218 oc .service .Shutdown ()
212219 oc .cancel ()
213220
@@ -222,9 +229,12 @@ func (oc *Collector) Close(ctx context.Context) error {
222229 })
223230
224231 if err != nil {
225- slog .ErrorContext (ctx , "Failed to shutdown OTel Collector" , "error" , err , "state" , oc .service .GetState ())
232+ slog .ErrorContext (ctx , "Failed to shutdown OTel Collector" ,
233+ "error" , err , "state" , oc .service .GetState (),
234+ pluginLogOrigin ,
235+ )
226236 } else {
227- slog .InfoContext (ctx , "OTel Collector shutdown" , "state" , oc .service .GetState ())
237+ slog .InfoContext (ctx , "OTel Collector shutdown" , "state" , oc .service .GetState (), pluginLogOrigin )
228238 oc .stopped = true
229239 }
230240 }
@@ -240,7 +250,7 @@ func (oc *Collector) Process(ctx context.Context, msg *bus.Message) {
240250 case bus .ResourceUpdateTopic :
241251 oc .handleResourceUpdate (ctx , msg )
242252 default :
243- slog .DebugContext (ctx , "OTel collector plugin unknown topic" , "topic" , msg .Topic )
253+ slog .DebugContext (ctx , "OTel collector plugin unknown topic" , "topic" , msg .Topic , pluginLogOrigin )
244254 }
245255}
246256
@@ -258,17 +268,19 @@ func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Messa
258268
259269 nginxConfigContext , ok := msg .Data .(* model.NginxConfigContext )
260270 if ! ok {
261- slog .ErrorContext (ctx , "Unable to cast message payload to *model.NginxConfigContext" , "payload" , msg .Data )
271+ slog .ErrorContext (ctx , "Unable to cast message payload to *model.NginxConfigContext" ,
272+ "payload" , msg .Data , pluginLogOrigin )
273+
262274 return
263275 }
264276
265277 reloadCollector := oc .checkForNewReceivers (nginxConfigContext )
266278
267279 if reloadCollector {
268- slog .InfoContext (ctx , "Reloading OTel collector config" )
280+ slog .InfoContext (ctx , "Reloading OTel collector config" , pluginLogOrigin )
269281 err := writeCollectorConfig (oc .config .Collector )
270282 if err != nil {
271- slog .ErrorContext (ctx , "Failed to write OTel Collector config" , "error" , err )
283+ slog .ErrorContext (ctx , "Failed to write OTel Collector config" , "error" , err , pluginLogOrigin )
272284 return
273285 }
274286
@@ -282,18 +294,18 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
282294
283295 resourceUpdateContext , ok := msg .Data .(* v1.Resource )
284296 if ! ok {
285- slog .ErrorContext (ctx , "Unable to cast message payload to *v1.Resource" , "payload" , msg .Data )
297+ slog .ErrorContext (ctx , "Unable to cast message payload to *v1.Resource" , "payload" , msg .Data , pluginLogOrigin )
286298 return
287299 }
288300
289301 resourceProcessorUpdated := oc .updateResourceProcessor (resourceUpdateContext )
290302 headersSetterExtensionUpdated := oc .updateHeadersSetterExtension (ctx , resourceUpdateContext )
291303
292304 if resourceProcessorUpdated || headersSetterExtensionUpdated {
293- slog .InfoContext (ctx , "Reloading OTel collector config" )
305+ slog .InfoContext (ctx , "Reloading OTel collector config" , pluginLogOrigin )
294306 err := writeCollectorConfig (oc .config .Collector )
295307 if err != nil {
296- slog .ErrorContext (ctx , "Failed to write OTel Collector config" , "error" , err )
308+ slog .ErrorContext (ctx , "Failed to write OTel Collector config" , "error" , err , pluginLogOrigin )
297309 return
298310 }
299311
@@ -343,9 +355,9 @@ func (oc *Collector) updateHeadersSetterExtension(
343355 }
344356
345357 if ! isUUIDHeaderSet {
346- slog .DebugContext (
347- ctx , "Adding uuid header to OTel collector" ,
358+ slog .DebugContext (ctx , "Adding uuid header to OTel collector" ,
348359 "uuid" , resourceUpdateContext .GetResourceId (),
360+ pluginLogOrigin ,
349361 )
350362 oc .config .Collector .Extensions .HeadersSetter .Headers = append (
351363 oc .config .Collector .Extensions .HeadersSetter .Headers ,
@@ -366,14 +378,14 @@ func (oc *Collector) updateHeadersSetterExtension(
366378func (oc * Collector ) restartCollector (ctx context.Context ) {
367379 err := oc .Close (ctx )
368380 if err != nil {
369- slog .ErrorContext (ctx , "Failed to shutdown OTel Collector" , "error" , err )
381+ slog .ErrorContext (ctx , "Failed to shutdown OTel Collector" , "error" , err , pluginLogOrigin )
370382 return
371383 }
372384
373385 settings := OTelCollectorSettings (oc .config )
374386 oTelCollector , err := otelcol .NewCollector (settings )
375387 if err != nil {
376- slog .ErrorContext (ctx , "Failed to create OTel Collector" , "error" , err )
388+ slog .ErrorContext (ctx , "Failed to create OTel Collector" , "error" , err , pluginLogOrigin )
377389 return
378390 }
379391 oc .service = oTelCollector
@@ -382,13 +394,14 @@ func (oc *Collector) restartCollector(ctx context.Context) {
382394 runCtx , oc .cancel = context .WithCancel (ctx )
383395
384396 if ! oc .stopped {
385- slog .ErrorContext (ctx , "Unable to restart OTel collector, failed to stop collector" )
397+ slog .ErrorContext (ctx , "Unable to restart OTel collector, failed to stop collector" , pluginLogOrigin )
398+
386399 return
387400 }
388401
389402 bootErr := oc .bootup (runCtx )
390403 if bootErr != nil {
391- slog .ErrorContext (runCtx , "Unable to start OTel Collector" , "error" , bootErr )
404+ slog .ErrorContext (runCtx , "Unable to start OTel Collector" , "error" , bootErr , pluginLogOrigin )
392405 }
393406}
394407
0 commit comments