diff --git a/internal/runtime/alloy.go b/internal/runtime/alloy.go index 2e443cd1523..c508a8bde9a 100644 --- a/internal/runtime/alloy.go +++ b/internal/runtime/alloy.go @@ -48,10 +48,10 @@ package runtime import ( "context" "fmt" + "log/slog" "sync" "time" - "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -61,7 +61,6 @@ import ( "github.com/grafana/alloy/internal/runtime/internal/controller" "github.com/grafana/alloy/internal/runtime/internal/worker" "github.com/grafana/alloy/internal/runtime/logging" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/util" @@ -122,7 +121,7 @@ type Options struct { // Runtime is the Alloy system. type Runtime struct { - log log.Logger + log *slog.Logger tracer *tracing.Tracer opts controllerOptions @@ -168,7 +167,7 @@ type controllerOptions struct { // given modReg. func newController(o controllerOptions) (*Runtime, error) { var ( - logger = log.With(o.Logger, "controller_id", o.ControllerID) + logger = o.Logger.Slog().With("controller_id", o.ControllerID) tracer = o.Tracer workerPool = o.WorkerPool ) @@ -183,7 +182,7 @@ func newController(o controllerOptions) (*Runtime, error) { } if workerPool == nil { - level.Info(logger).Log("msg", "no worker pool provided, creating a default pool") + logger.Info("no worker pool provided, creating a default pool") workerPool = worker.NewDefaultWorkerPool() } @@ -266,12 +265,12 @@ func newController(o controllerOptions) (*Runtime, error) { // canceled. Run must only be called once. func (f *Runtime) Run(ctx context.Context) { defer func() { - level.Debug(f.log).Log("msg", "Alloy controller exiting") + f.log.Debug("Alloy controller exiting") f.loader.Cleanup(!f.opts.IsModule) f.sched.Stop() }() - level.Debug(f.log).Log("msg", "Running alloy controller") + f.log.Debug("Running alloy controller") for { select { @@ -285,9 +284,9 @@ func (f *Runtime) Run(ctx context.Context) { all := f.updateQueue.DequeueAll() f.loader.EvaluateDependants(ctx, all) case <-f.loadFinished: - level.Info(f.log).Log("msg", "scheduling loaded components and services") + f.log.Info("scheduling loaded components and services") if err := f.sched.Synchronize(f.loader.Graph()); err != nil { - level.Error(f.log).Log("msg", "failed to load components and services", "err", err) + f.log.Error("failed to load components and services", "err", err) } f.loadComplete.Store(true) } @@ -304,7 +303,7 @@ func (f *Runtime) Run(ctx context.Context) { func (f *Runtime) LoadSource(source *Source, args map[string]any, configPath string) error { modulePath, err := util.ExtractDirPath(configPath) if err != nil { - level.Warn(f.log).Log("msg", "failed to extract directory path from configPath", "configPath", configPath, "err", err) + f.log.Warn("failed to extract directory path from configPath", "configPath", configPath, "err", err) } return f.applyLoaderConfig(controller.ApplyOptions{ Args: args, diff --git a/internal/runtime/internal/controller/component_references.go b/internal/runtime/internal/controller/component_references.go index 4c22f7d5a8a..6b5f27788ab 100644 --- a/internal/runtime/internal/controller/component_references.go +++ b/internal/runtime/internal/controller/component_references.go @@ -1,10 +1,10 @@ package controller import ( - "github.com/go-kit/log" + "log/slog" + "github.com/grafana/alloy/internal/dag" "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/internal/runtime/logging/level" astutil "github.com/grafana/alloy/internal/util/ast" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/diag" @@ -13,7 +13,7 @@ import ( // ComponentReferences returns the list of references a component is making to // other components. -func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]astutil.Reference, diag.Diagnostics) { +func ComponentReferences(cn dag.Node, g *dag.Graph, l *slog.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]astutil.Reference, diag.Diagnostics) { var ( traversals []astutil.Traversal @@ -51,11 +51,11 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop if componentRefMatch { if scope.IsStdlibIdentifiers(t[0].Name) { - level.Warn(l).Log("msg", "a component is shadowing an existing stdlib name", "component", ref.Target.NodeID(), "stdlib name", t[0].Name) + l.Warn("a component is shadowing an existing stdlib name", "component", ref.Target.NodeID(), "stdlib_name", t[0].Name) } refs = append(refs, ref) } else if scope.IsStdlibDeprecated(t[0].Name) { - level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name) + l.Warn("this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name) } else if funcName := t.String(); scope.IsStdlibExperimental(funcName) { if err := featuregate.CheckAllowed(featuregate.StabilityExperimental, minStability, funcName); err != nil { diags = append(diags, diag.Diagnostic{ diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go index 694a377988f..1519765f3c9 100644 --- a/internal/runtime/internal/controller/loader.go +++ b/internal/runtime/internal/controller/loader.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" + "log/slog" "path" "reflect" "strings" "sync" "time" - "github.com/go-kit/log" "github.com/grafana/dskit/backoff" "github.com/hashicorp/go-multierror" "github.com/prometheus/prometheus/storage" @@ -26,7 +26,6 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/nodeconf/foreach" "github.com/grafana/alloy/internal/runtime/internal/worker" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/util" @@ -38,7 +37,7 @@ import ( // The Loader builds and evaluates ComponentNodes from Alloy blocks. type Loader struct { - log log.Logger + logger *slog.Logger tracer trace.TracerProvider globals ComponentGlobals services []service.Service @@ -92,9 +91,8 @@ func NewLoader(opts LoaderOptions) (*Loader, error) { reg = component.NewDefaultRegistry(opts.ComponentGlobals.MinStability, opts.ComponentGlobals.EnableCommunityComps) } - logger := log.With(globals.Logger, "controller_path", parent, "controller_id", id) l := &Loader{ - log: logger, + logger: globals.Logger.Slog().With("controller_path", parent, "controller_id", id), tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), globals: globals, services: services, @@ -199,12 +197,12 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics { spanCtx, span := tracer.Start(context.Background(), "GraphEvaluate", trace.WithSpanKind(trace.SpanKindInternal)) defer span.End() - logger := log.With(l.log, "trace_id", span.SpanContext().TraceID()) - level.Info(logger).Log("msg", "starting complete graph evaluation") + logger := l.logger.With("trace_id", span.SpanContext().TraceID().String()) + logger.Info("starting complete graph evaluation") defer func() { span.SetStatus(codes.Ok, "") - level.Info(logger).Log("msg", "finished complete graph evaluation", "duration", time.Since(start)) + logger.Info("finished complete graph evaluation", "duration", time.Since(start)) }() l.cache.ClearModuleExports() @@ -217,7 +215,7 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics { start := time.Now() defer func() { - level.Info(logger).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start)) + logger.Info("finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start)) }() var err error @@ -308,7 +306,7 @@ func (l *Loader) Cleanup(stopWorkerPool bool) { // Wait at most 5 seconds for currently evaluating components to finish. err := l.workerPool.Stop(time.Second * 5) if err != nil { - level.Warn(l.log).Log("msg", "timed out stopping worker pool", "err", err) + l.logger.Warn("timed out stopping worker pool", "err", err) } } if l.globals.Registerer == nil { @@ -663,7 +661,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { // Finally, wire component references. l.cache.mut.RLock() - refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.GetContext(), l.globals.MinStability) + refs, nodeDiags := ComponentReferences(n, g, l.logger, l.cache.GetContext(), l.globals.MinStability) l.cache.mut.RUnlock() setDataFlowEdges(n, refs) for _, ref := range refs { @@ -769,7 +767,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN // Make sure we're in-sync with the current exports of parent. err := l.cache.CacheExports(parentNode.ID(), parentNode.Exports()) if err != nil { - level.Error(l.log).Log("msg", "failed to cache exports during evaluation", "err", err) + l.logger.Error("failed to cache exports during evaluation", "err", err) } case *ImportConfigNode: // Update the scope with the imported content. @@ -803,8 +801,8 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN l.concurrentEvalFn(nodeRef, dependantCtx, tracer, parentRef) }) if err != nil { - level.Warn(l.log).Log( - "msg", "failed to submit node for evaluation - will retry", + l.logger.Warn( + "failed to submit node for evaluation - will retry", "err", err, "node_id", n.NodeID(), "originator_id", parent.Node.NodeID(), @@ -819,8 +817,8 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN } } if err != nil && !retryBackoff.Ongoing() { - level.Error(l.log).Log( - "msg", "retry attempts exhausted when submitting node for evaluation to the worker pool - "+ + l.logger.Error( + "retry attempts exhausted when submitting node for evaluation to the worker pool - "+ "this could be a deadlock, performance bottleneck or severe overload leading to goroutine starvation", "err", err, "node_id", n.NodeID(), @@ -853,7 +851,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr defer func() { duration := time.Since(start) l.cm.onComponentEvaluationDone(n.NodeID(), duration) - level.Debug(l.log).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", duration) + l.logger.Debug("finished node evaluation", "node_id", n.NodeID(), "duration", duration) }() var err error @@ -865,7 +863,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr ectx := l.cache.GetContext() evalErr := n.Evaluate(ectx) - err = l.postEvaluate(l.log, n, evalErr) + err = l.postEvaluate(l.logger, n, evalErr) // Additional post-evaluation steps necessary for module exports. if exp, ok := n.(*ExportConfigNode); ok { @@ -897,7 +895,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr // evaluate constructs the final context for the BlockNode and // evaluates it. mut must be held when calling evaluate. -func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error { +func (l *Loader) evaluate(logger *slog.Logger, bn BlockNode) error { ectx := l.cache.GetContext() err := bn.Evaluate(ectx) return l.postEvaluate(logger, bn, err) @@ -908,7 +906,7 @@ func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error { // The evaluation err is passed as an argument to allow shadowing it with an error that could be more relevant to the user // but cannot be determined before the evaluation (for example, we must evaluate the argument node to see if it's optional before // raising an error when a value is missing). When err is not nil, this function must return an error. -func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error { +func (l *Loader) postEvaluate(logger *slog.Logger, bn BlockNode, err error) error { switch c := bn.(type) { case ComponentNode: // Always update the cached exports, since that it might change when a component gets re-evaluated. @@ -916,10 +914,10 @@ func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error err2 := l.cache.CacheExports(c.ID(), c.Exports()) if err2 != nil { if err != nil { - level.Error(logger).Log("msg", "evaluation and exports caching failed", "eval err", err, "caching err", err2) + logger.Error("evaluation and exports caching failed", "eval_err", err, "caching_err", err2) return errors.Join(err, err2) } else { - level.Error(logger).Log("msg", "failed to cache exports after evaluation", "err", err2) + logger.Error("failed to cache exports after evaluation", "err", err2) return err2 } } @@ -938,7 +936,7 @@ func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error } if err != nil { - level.Error(logger).Log("msg", "failed to evaluate config", "node", bn.NodeID(), "err", err) + logger.Error("failed to evaluate config", "node", bn.NodeID(), "err", err) return err } return nil diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go index af69c5c477f..aae85c6d547 100644 --- a/internal/runtime/internal/controller/node_config_foreach.go +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "fmt" "hash/fnv" + "log/slog" "path" "reflect" "strings" @@ -13,13 +14,11 @@ import ( "time" "unicode" - "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/nodeconf/foreach" "github.com/grafana/alloy/internal/runner" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" @@ -37,7 +36,7 @@ type ForeachConfigNode struct { componentName string moduleController ModuleController - logger log.Logger + logger *slog.Logger // customReg is the customComponentRegistry of the current loader. // We pass it so that the foreach children have access to modules. @@ -81,7 +80,7 @@ func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals, custom block: block, componentName: block.GetBlockName(), id: BlockComponentID(block), - logger: log.With(globals.Logger, "component_path", globals.ControllerID, "component_id", nodeID), + logger: globals.Logger.Slog().With("component_path", globals.ControllerID, "component_id", nodeID), moduleControllerFactory: globals.NewModuleController, moduleControllerOpts: ModuleControllerOpts{Id: globalID}, customReg: customReg, @@ -221,7 +220,7 @@ func (fn *ForeachConfigNode) evaluate(scope *vm.Scope) error { } if created && args.HashStringId && id != nil && reflect.TypeOf(id).Kind() == reflect.String { - level.Debug(fn.logger).Log("msg", "a new foreach pipeline was created", "value", id, "fingerprint", customComponentID) + fn.logger.Debug("a new foreach pipeline was created", "value", id, "fingerprint", customComponentID) } // Expose the current scope + the collection item that correspond to the child. @@ -295,7 +294,7 @@ func (fn *ForeachConfigNode) Run(ctx context.Context) error { tasks = append(tasks, &forEachChild{ id: customComponentID, cc: customComponent, - logger: log.With(fn.logger, "foreach_path", fn.nodeID, "child_id", customComponentID), + logger: fn.logger.With("foreach_path", fn.nodeID, "child_id", customComponentID), healthUpdate: fn.setRunHealth, }) } @@ -321,7 +320,7 @@ func (fn *ForeachConfigNode) run(ctx context.Context, updateTasks func() error) case <-fn.forEachChildrenUpdateChan: err := updateTasks() if err != nil { - level.Error(fn.logger).Log("msg", "error encountered while updating foreach children", "err", err) + fn.logger.Error("error encountered while updating foreach children", "err", err) fn.setRunHealth(component.HealthTypeUnhealthy, fmt.Sprintf("error encountered while updating foreach children: %s", err)) // the error is not fatal, the node can still run in unhealthy mode } else { @@ -392,14 +391,14 @@ type forEachChildRunner struct { type forEachChild struct { cc CustomComponent id string - logger log.Logger + logger *slog.Logger healthUpdate func(t component.HealthType, msg string) } func (fr *forEachChildRunner) Run(ctx context.Context) { err := fr.child.cc.Run(ctx) if err != nil { - level.Error(fr.child.logger).Log("msg", "foreach child stopped running", "err", err) + fr.child.logger.Error("foreach child stopped running", "err", err) fr.child.healthUpdate(component.HealthTypeUnhealthy, fmt.Sprintf("foreach child stopped running: %s", err)) } } @@ -441,7 +440,7 @@ func objectFingerprint(id any, hashId bool) string { } } -func collectionItemID(item any, key string, logger log.Logger) (any, bool) { +func collectionItemID(item any, key string, logger *slog.Logger) (any, bool) { switch value := item.(type) { case map[string]any: // Inline object literals with simple values. @@ -475,12 +474,12 @@ func collectionItemID(item any, key string, logger log.Logger) (any, bool) { // Example: collection = discovery.kubernetes.pods.targets return collectionItemIDFromCapsule(value, key, logger) default: - level.Debug(logger).Log("msg", "unsupported collection item type encountered in foreach", "item", fmt.Sprintf("%#v", item)) + logger.Debug("unsupported collection item type encountered in foreach", "item", fmt.Sprintf("%#v", item)) return nil, false } } -func collectionItemIDFromCapsule(value syntax.ConvertibleIntoCapsule, key string, logger log.Logger) (any, bool) { +func collectionItemIDFromCapsule(value syntax.ConvertibleIntoCapsule, key string, logger *slog.Logger) (any, bool) { var obj map[string]syntax.Value if err := value.ConvertInto(&obj); err == nil { val, ok := obj[key] @@ -494,8 +493,8 @@ func collectionItemIDFromCapsule(value syntax.ConvertibleIntoCapsule, key string return nil, false } -func logMissingCollectionID(logger log.Logger, key string) { - level.Warn(logger).Log("msg", "specified id not found in collection item", "id", key) +func logMissingCollectionID(logger *slog.Logger, key string) { + logger.Warn("specified id not found in collection item", "id", key) } func replaceNonAlphaNumeric(s string) string { diff --git a/internal/runtime/internal/controller/node_config_import.go b/internal/runtime/internal/controller/node_config_import.go index 47e88f18afb..5c4fd908ad3 100644 --- a/internal/runtime/internal/controller/node_config_import.go +++ b/internal/runtime/internal/controller/node_config_import.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "hash/fnv" + "log/slog" "maps" "path" "path/filepath" @@ -19,7 +20,6 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/nodeconf/importsource" "github.com/grafana/alloy/internal/runner" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/parser" @@ -43,7 +43,7 @@ type ImportConfigNode struct { registry *prometheus.Registry OnBlockNodeUpdate func(cn BlockNode) // notifies the controller or the parent for reevaluation - logger log.Logger + logger *slog.Logger importChildrenUpdateChan chan struct{} // used to trigger an update of the running children @@ -83,10 +83,10 @@ func NewImportConfigNode(block *ast.BlockStmt, globals ComponentGlobals, sourceT globals: globals, block: block, OnBlockNodeUpdate: globals.OnBlockNodeUpdate, + logger: globals.Logger.Slog().With("config_path", path.Dir(globalID), "config_id", path.Base(globalID)), importChildrenUpdateChan: make(chan struct{}, 1), } managedOpts := getImportManagedOptions(globals, cn) - cn.logger = managedOpts.Logger cn.source = importsource.NewImportSource(sourceType, managedOpts, vm.New(block.Body), cn.onContentUpdate) return cn } @@ -217,7 +217,7 @@ func (cn *ImportConfigNode) onContentUpdate(importedContent map[string]string) { for f, ic := range importedContent { parsedImportedContent, err := parser.ParseFile(cn.label, []byte(ic)) if err != nil { - level.Error(cn.logger).Log("msg", "failed to parse file on update", "file", f, "err", err) + cn.logger.Error("failed to parse file on update", "file", f, "err", err) cn.setContentHealth(component.HealthTypeUnhealthy, fmt.Sprintf("imported content from %q cannot be parsed: %s", f, err)) return } @@ -225,7 +225,7 @@ func (cn *ImportConfigNode) onContentUpdate(importedContent map[string]string) { // populate importedDeclares and importConfigNodesChildren err = cn.processImportedContent(parsedImportedContent) if err != nil { - level.Error(cn.logger).Log("msg", "failed to process imported content", "file", f, "err", err) + cn.logger.Error("failed to process imported content", "file", f, "err", err) cn.setContentHealth(component.HealthTypeUnhealthy, fmt.Sprintf("imported content from %q is invalid: %s", f, err)) return } @@ -234,7 +234,7 @@ func (cn *ImportConfigNode) onContentUpdate(importedContent map[string]string) { // evaluate the importConfigNodesChildren that have been created err := cn.evaluateChildren() if err != nil { - level.Error(cn.logger).Log("msg", "failed to evaluate nested import", "err", err) + cn.logger.Error("failed to evaluate nested import", "err", err) cn.setContentHealth(component.HealthTypeUnhealthy, fmt.Sprintf("nested import block failed to evaluate: %s", err)) return } @@ -278,7 +278,7 @@ func (cn *ImportConfigNode) processImportedContent(content *ast.File) error { // processDeclareBlock stores the declare definition in the importedDeclares. func (cn *ImportConfigNode) processDeclareBlock(stmt *ast.BlockStmt) { if _, ok := cn.importedDeclares[stmt.Label]; ok { - level.Error(cn.logger).Log("msg", "declare block redefined", "name", stmt.Label) + cn.logger.Error("declare block redefined", "name", stmt.Label) return } cn.importedDeclares[stmt.Label] = stmt.Body @@ -365,7 +365,7 @@ func (cn *ImportConfigNode) Run(ctx context.Context) error { err := updateTasks() if err != nil { - level.Error(cn.logger).Log("msg", "import failed to run nested imports", "err", err) + cn.logger.Error("import failed to run nested imports", "err", err) cn.setRunHealth(component.HealthTypeUnhealthy, fmt.Sprintf("error encountered while running nested import blocks: %s", err)) // the error is not fatal, the node can still run in unhealthy mode } @@ -391,7 +391,7 @@ func (cn *ImportConfigNode) run(errChan chan error, updateTasks func() error) er case <-cn.importChildrenUpdateChan: err := updateTasks() if err != nil { - level.Error(cn.logger).Log("msg", "error encountered while updating nested import blocks", "err", err) + cn.logger.Error("error encountered while updating nested import blocks", "err", err) cn.setRunHealth(component.HealthTypeUnhealthy, fmt.Sprintf("error encountered while updating nested import blocks: %s", err)) // the error is not fatal, the node can still run in unhealthy mode } else { @@ -455,7 +455,7 @@ type childRunner struct { func (cr *childRunner) Run(ctx context.Context) { err := cr.node.Run(ctx) if err != nil { - level.Error(cr.node.logger).Log("msg", "nested import stopped running", "err", err) + cr.node.logger.Error("nested import stopped running", "err", err) cr.node.setRunHealth(component.HealthTypeUnhealthy, fmt.Sprintf("nested import stopped running: %s", err)) } } diff --git a/internal/runtime/internal/controller/node_config_logging.go b/internal/runtime/internal/controller/node_config_logging.go index 3945e25890f..8dc588cb45a 100644 --- a/internal/runtime/internal/controller/node_config_logging.go +++ b/internal/runtime/internal/controller/node_config_logging.go @@ -5,7 +5,6 @@ import ( "strings" "sync" - "github.com/go-kit/log" "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" @@ -16,7 +15,7 @@ var _ BlockNode = (*LoggingConfigNode)(nil) type LoggingConfigNode struct { nodeID string componentName string - l log.Logger + l *logging.Logger mut sync.RWMutex block *ast.BlockStmt // Current Alloy blocks to derive config from @@ -65,7 +64,7 @@ func (cn *LoggingConfigNode) Evaluate(scope *vm.Scope) error { } } - if err := cn.l.(*logging.Logger).Update(args); err != nil { + if err := cn.l.Update(args); err != nil { return fmt.Errorf("could not update logger: %w", err) } diff --git a/internal/runtime/internal/controller/node_custom_component.go b/internal/runtime/internal/controller/node_custom_component.go index 34aa4db7796..993b5b5d98a 100644 --- a/internal/runtime/internal/controller/node_custom_component.go +++ b/internal/runtime/internal/controller/node_custom_component.go @@ -8,8 +8,6 @@ import ( "sync" "time" - "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/ast" @@ -31,7 +29,6 @@ type CustomComponentNode struct { nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. moduleController ModuleController OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate - logger log.Logger importNamespace string customComponentName string @@ -107,7 +104,6 @@ func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, getConfi componentName := b.GetBlockName() importNamespace, customComponentName := ExtractImportAndDeclare(componentName) - parent, node := splitPath(globalID) cn := &CustomComponentNode{ id: id, @@ -119,7 +115,6 @@ func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, getConfi customComponentName: customComponentName, moduleController: globals.NewModuleController(ModuleControllerOpts{Id: globalID}), OnBlockNodeUpdate: globals.OnBlockNodeUpdate, - logger: log.With(globals.Logger, "component_path", parent, "component_id", node), getConfig: getConfig, block: b, diff --git a/internal/runtime/internal/controller/scheduler.go b/internal/runtime/internal/controller/scheduler.go index 431bbc04a02..4928c6994d5 100644 --- a/internal/runtime/internal/controller/scheduler.go +++ b/internal/runtime/internal/controller/scheduler.go @@ -3,14 +3,12 @@ package controller import ( "context" "fmt" + "log/slog" "slices" "sync" "time" - "github.com/go-kit/log" - "github.com/grafana/alloy/internal/dag" - "github.com/grafana/alloy/internal/runtime/logging/level" ) var ( @@ -28,7 +26,7 @@ type RunnableNode interface { // Scheduler runs components. type Scheduler struct { running sync.WaitGroup - logger log.Logger + logger *slog.Logger taskShutdownDeadline time.Duration tasksMut sync.Mutex @@ -39,7 +37,7 @@ type Scheduler struct { // components which are running. // // Call Stop to stop the Scheduler and all running components. -func NewScheduler(logger log.Logger, taskShutdownDeadline time.Duration) *Scheduler { +func NewScheduler(logger *slog.Logger, taskShutdownDeadline time.Duration) *Scheduler { return &Scheduler{ logger: logger, taskShutdownDeadline: taskShutdownDeadline, @@ -105,16 +103,16 @@ func (s *Scheduler) Synchronize(g *dag.Graph) error { onDone: func(err error) { defer s.running.Done() if err != nil { - level.Error(s.logger).Log("msg", "node exited with error", "node", id, "err", err) + s.logger.Error("node exited with error", "node", id, "err", err) } else { - level.Info(s.logger).Log("msg", "node exited without error", "node", id) + s.logger.Info("node exited without error", "node", id) } s.tasksMut.Lock() defer s.tasksMut.Unlock() delete(s.tasks, id) }, - logger: log.With(s.logger, "taskID", id), + logger: s.logger.With("taskID", id), taskShutdownDeadline: s.taskShutdownDeadline, }) @@ -175,7 +173,7 @@ type task struct { type taskOptions struct { runnable RunnableNode onDone func(error) - logger log.Logger + logger *slog.Logger taskShutdownDeadline time.Duration } @@ -194,7 +192,7 @@ func newTask(groupID, rank int, opts taskOptions) *task { } func (t *task) Start() { - level.Debug(t.opts.logger).Log("msg", "Starting task", "id", t.opts.runnable.NodeID()) + t.opts.logger.Debug("Starting task", "id", t.opts.runnable.NodeID()) go func() { err := t.opts.runnable.Run(t.ctx) @@ -209,7 +207,7 @@ func (t *task) Start() { } func (t *task) Stop() { - level.Debug(t.opts.logger).Log("msg", "Stopping task", "id", t.opts.runnable.NodeID()) + t.opts.logger.Debug("Stopping task", "id", t.opts.runnable.NodeID()) t.cancel() deadlineDuration := t.opts.taskShutdownDeadline @@ -225,7 +223,7 @@ func (t *task) Stop() { case <-t.exited: return // Task exited normally. case <-time.After(TaskShutdownWarningTimeout): - level.Warn(t.opts.logger).Log("msg", "task shutdown is taking longer than expected") + t.opts.logger.Warn("task shutdown is taking longer than expected") case <-deadlineCtx.Done(): t.doneOnce.Do(func() { t.opts.onDone(fmt.Errorf("task shutdown deadline exceeded")) diff --git a/internal/runtime/internal/controller/scheduler_test.go b/internal/runtime/internal/controller/scheduler_test.go index 4fd49ead2bb..327839d2ea3 100644 --- a/internal/runtime/internal/controller/scheduler_test.go +++ b/internal/runtime/internal/controller/scheduler_test.go @@ -9,18 +9,18 @@ import ( "testing/synctest" "time" - "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/dag" "github.com/grafana/alloy/internal/runtime/internal/controller" + "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) func TestScheduler_Synchronize(t *testing.T) { - logger := log.NewNopLogger() + logger := logging.NewSlogNop() t.Run("Can start new jobs", func(t *testing.T) { var started, finished sync.WaitGroup started.Add(3) @@ -124,7 +124,12 @@ func TestScheduler_Synchronize(t *testing.T) { synctest.Test(t, func(t *testing.T) { // Create a thread-safe buffer to capture log output var logBuffer syncBuffer - logger := log.NewLogfmtLogger(&logBuffer) + alloyLogger, err := logging.New(&logBuffer, logging.Options{ + Level: logging.LevelDebug, + Format: logging.FormatLogfmt, + }) + require.NoError(t, err) + logger := alloyLogger.Slog() runFunc := func(ctx context.Context) error { <-ctx.Done() @@ -138,7 +143,7 @@ func TestScheduler_Synchronize(t *testing.T) { g.Add(&fakeRunnable{ID: "blocking-component", Component: mockComponent{RunFunc: runFunc}}) // Start a component - err := sched.Synchronize(g) + err = sched.Synchronize(g) require.NoError(t, err) syncDone := make(chan struct{}) diff --git a/internal/runtime/logging/logger.go b/internal/runtime/logging/logger.go index 6b368bc7ccd..997c14f24cf 100644 --- a/internal/runtime/logging/logger.go +++ b/internal/runtime/logging/logger.go @@ -61,6 +61,11 @@ func NewNop() *Logger { return l } +// NewSlogNop returns a slog logger backed by a handler that never logs. +func NewSlogNop() *slog.Logger { + return slog.New(nopSlogHandler{}) +} + // NewDeferred creates a new logger with the default log level and format. // The logger is not updated during initialization. func NewDeferred(w io.Writer) (*Logger, error) { @@ -94,6 +99,20 @@ func NewDeferred(w io.Writer) (*Logger, error) { // updated. func (l *Logger) Handler() slog.Handler { return l.deferredSlog } +// Slog returns a [slog.Logger]. The returned logger remains valid if l is +// updated. +func (l *Logger) Slog() *slog.Logger { return slog.New(l.deferredSlog) } + +type nopSlogHandler struct{} + +func (nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false } + +func (nopSlogHandler) Handle(context.Context, slog.Record) error { return nil } + +func (nopSlogHandler) WithAttrs([]slog.Attr) slog.Handler { return nopSlogHandler{} } + +func (nopSlogHandler) WithGroup(string) slog.Handler { return nopSlogHandler{} } + // Update re-configures the options used for the logger. func (l *Logger) Update(o Options) error { l.bufferMut.Lock() diff --git a/internal/runtime/module.go b/internal/runtime/module.go index 66587cff677..1441b53c445 100644 --- a/internal/runtime/module.go +++ b/internal/runtime/module.go @@ -3,6 +3,7 @@ package runtime import ( "context" "fmt" + "log/slog" "path" "sync" @@ -14,7 +15,6 @@ import ( "github.com/grafana/alloy/internal/runtime/internal/controller" "github.com/grafana/alloy/internal/runtime/internal/worker" "github.com/grafana/alloy/internal/runtime/logging" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/scanner" @@ -23,6 +23,7 @@ import ( type moduleController struct { mut sync.RWMutex o *moduleControllerOptions + logger *slog.Logger modules map[string]struct{} } @@ -34,6 +35,7 @@ var ( func newModuleController(o *moduleControllerOptions) controller.ModuleController { return &moduleController{ o: o, + logger: o.Logger.Slog(), modules: map[string]struct{}{}, } } @@ -92,7 +94,7 @@ func (m *moduleController) addModule(mod *module) error { m.mut.Lock() defer m.mut.Unlock() if err := m.o.ModuleRegistry.Register(mod.o.ID, mod); err != nil { - level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.ID, "err", err) + m.logger.Error("error registering module", "id", mod.o.ID, "err", err) return err } m.modules[mod.o.ID] = struct{}{}