Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ package runtime
import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

Expand All @@ -61,7 +61,6 @@ import (
"github.com/grafana/alloy/internal/runtime/internal/controller"
"github.com/grafana/alloy/internal/runtime/internal/worker"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/runtime/tracing"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
Expand Down Expand Up @@ -122,7 +121,7 @@ type Options struct {

// Runtime is the Alloy system.
type Runtime struct {
log log.Logger
log *slog.Logger
tracer *tracing.Tracer
opts controllerOptions

Expand Down Expand Up @@ -168,7 +167,7 @@ type controllerOptions struct {
// given modReg.
func newController(o controllerOptions) (*Runtime, error) {
var (
logger = log.With(o.Logger, "controller_id", o.ControllerID)
logger = o.Logger.Slog().With("controller_id", o.ControllerID)
tracer = o.Tracer
workerPool = o.WorkerPool
)
Comment thread
kgeckhart marked this conversation as resolved.
Expand All @@ -183,7 +182,7 @@ func newController(o controllerOptions) (*Runtime, error) {
}

if workerPool == nil {
level.Info(logger).Log("msg", "no worker pool provided, creating a default pool")
logger.Info("no worker pool provided, creating a default pool")
workerPool = worker.NewDefaultWorkerPool()
}

Expand Down Expand Up @@ -266,12 +265,12 @@ func newController(o controllerOptions) (*Runtime, error) {
// canceled. Run must only be called once.
func (f *Runtime) Run(ctx context.Context) {
defer func() {
level.Debug(f.log).Log("msg", "Alloy controller exiting")
f.log.Debug("Alloy controller exiting")
f.loader.Cleanup(!f.opts.IsModule)
f.sched.Stop()
}()

level.Debug(f.log).Log("msg", "Running alloy controller")
f.log.Debug("Running alloy controller")

for {
select {
Expand All @@ -285,9 +284,9 @@ func (f *Runtime) Run(ctx context.Context) {
all := f.updateQueue.DequeueAll()
f.loader.EvaluateDependants(ctx, all)
case <-f.loadFinished:
level.Info(f.log).Log("msg", "scheduling loaded components and services")
f.log.Info("scheduling loaded components and services")
if err := f.sched.Synchronize(f.loader.Graph()); err != nil {
level.Error(f.log).Log("msg", "failed to load components and services", "err", err)
f.log.Error("failed to load components and services", "err", err)
}
f.loadComplete.Store(true)
}
Expand All @@ -304,7 +303,7 @@ func (f *Runtime) Run(ctx context.Context) {
func (f *Runtime) LoadSource(source *Source, args map[string]any, configPath string) error {
modulePath, err := util.ExtractDirPath(configPath)
if err != nil {
level.Warn(f.log).Log("msg", "failed to extract directory path from configPath", "configPath", configPath, "err", err)
f.log.Warn("failed to extract directory path from configPath", "configPath", configPath, "err", err)
}
return f.applyLoaderConfig(controller.ApplyOptions{
Args: args,
Expand Down
10 changes: 5 additions & 5 deletions internal/runtime/internal/controller/component_references.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package controller

import (
"github.com/go-kit/log"
"log/slog"

"github.com/grafana/alloy/internal/dag"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
astutil "github.com/grafana/alloy/internal/util/ast"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/diag"
Expand All @@ -13,7 +13,7 @@ import (

// ComponentReferences returns the list of references a component is making to
// other components.
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]astutil.Reference, diag.Diagnostics) {
func ComponentReferences(cn dag.Node, g *dag.Graph, l *slog.Logger, scope *vm.Scope, minStability featuregate.Stability) ([]astutil.Reference, diag.Diagnostics) {
var (
traversals []astutil.Traversal

Expand Down Expand Up @@ -51,11 +51,11 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop

if componentRefMatch {
if scope.IsStdlibIdentifiers(t[0].Name) {
level.Warn(l).Log("msg", "a component is shadowing an existing stdlib name", "component", ref.Target.NodeID(), "stdlib name", t[0].Name)
l.Warn("a component is shadowing an existing stdlib name", "component", ref.Target.NodeID(), "stdlib_name", t[0].Name)
}
refs = append(refs, ref)
} else if scope.IsStdlibDeprecated(t[0].Name) {
level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name)
l.Warn("this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name)
} else if funcName := t.String(); scope.IsStdlibExperimental(funcName) {
if err := featuregate.CheckAllowed(featuregate.StabilityExperimental, minStability, funcName); err != nil {
diags = append(diags, diag.Diagnostic{
Expand Down
44 changes: 21 additions & 23 deletions internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"path"
"reflect"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/prometheus/storage"
Expand All @@ -26,7 +26,6 @@ import (
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/nodeconf/foreach"
"github.com/grafana/alloy/internal/runtime/internal/worker"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/runtime/tracing"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
Expand All @@ -38,7 +37,7 @@ import (

// The Loader builds and evaluates ComponentNodes from Alloy blocks.
type Loader struct {
log log.Logger
logger *slog.Logger
tracer trace.TracerProvider
globals ComponentGlobals
services []service.Service
Expand Down Expand Up @@ -92,9 +91,8 @@ func NewLoader(opts LoaderOptions) (*Loader, error) {
reg = component.NewDefaultRegistry(opts.ComponentGlobals.MinStability, opts.ComponentGlobals.EnableCommunityComps)
}

logger := log.With(globals.Logger, "controller_path", parent, "controller_id", id)
l := &Loader{
log: logger,
logger: globals.Logger.Slog().With("controller_path", parent, "controller_id", id),
tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID),
globals: globals,
services: services,
Expand Down Expand Up @@ -199,12 +197,12 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {
spanCtx, span := tracer.Start(context.Background(), "GraphEvaluate", trace.WithSpanKind(trace.SpanKindInternal))
defer span.End()

logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())
level.Info(logger).Log("msg", "starting complete graph evaluation")
logger := l.logger.With("trace_id", span.SpanContext().TraceID().String())
logger.Info("starting complete graph evaluation")
defer func() {
span.SetStatus(codes.Ok, "")

level.Info(logger).Log("msg", "finished complete graph evaluation", "duration", time.Since(start))
logger.Info("finished complete graph evaluation", "duration", time.Since(start))
}()

l.cache.ClearModuleExports()
Expand All @@ -217,7 +215,7 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {

start := time.Now()
defer func() {
level.Info(logger).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start))
logger.Info("finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start))
}()

var err error
Expand Down Expand Up @@ -308,7 +306,7 @@ func (l *Loader) Cleanup(stopWorkerPool bool) {
// Wait at most 5 seconds for currently evaluating components to finish.
err := l.workerPool.Stop(time.Second * 5)
if err != nil {
level.Warn(l.log).Log("msg", "timed out stopping worker pool", "err", err)
l.logger.Warn("timed out stopping worker pool", "err", err)
}
}
if l.globals.Registerer == nil {
Expand Down Expand Up @@ -663,7 +661,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {

// Finally, wire component references.
l.cache.mut.RLock()
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.GetContext(), l.globals.MinStability)
refs, nodeDiags := ComponentReferences(n, g, l.logger, l.cache.GetContext(), l.globals.MinStability)
l.cache.mut.RUnlock()
setDataFlowEdges(n, refs)
for _, ref := range refs {
Expand Down Expand Up @@ -769,7 +767,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN
// Make sure we're in-sync with the current exports of parent.
err := l.cache.CacheExports(parentNode.ID(), parentNode.Exports())
if err != nil {
level.Error(l.log).Log("msg", "failed to cache exports during evaluation", "err", err)
l.logger.Error("failed to cache exports during evaluation", "err", err)
}
case *ImportConfigNode:
// Update the scope with the imported content.
Expand Down Expand Up @@ -803,8 +801,8 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN
l.concurrentEvalFn(nodeRef, dependantCtx, tracer, parentRef)
})
if err != nil {
level.Warn(l.log).Log(
"msg", "failed to submit node for evaluation - will retry",
l.logger.Warn(
"failed to submit node for evaluation - will retry",
"err", err,
"node_id", n.NodeID(),
"originator_id", parent.Node.NodeID(),
Expand All @@ -819,8 +817,8 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN
}
}
if err != nil && !retryBackoff.Ongoing() {
level.Error(l.log).Log(
"msg", "retry attempts exhausted when submitting node for evaluation to the worker pool - "+
l.logger.Error(
"retry attempts exhausted when submitting node for evaluation to the worker pool - "+
"this could be a deadlock, performance bottleneck or severe overload leading to goroutine starvation",
"err", err,
"node_id", n.NodeID(),
Expand Down Expand Up @@ -853,7 +851,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
defer func() {
duration := time.Since(start)
l.cm.onComponentEvaluationDone(n.NodeID(), duration)
level.Debug(l.log).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", duration)
l.logger.Debug("finished node evaluation", "node_id", n.NodeID(), "duration", duration)
}()

var err error
Expand All @@ -865,7 +863,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
ectx := l.cache.GetContext()
evalErr := n.Evaluate(ectx)

err = l.postEvaluate(l.log, n, evalErr)
err = l.postEvaluate(l.logger, n, evalErr)

// Additional post-evaluation steps necessary for module exports.
if exp, ok := n.(*ExportConfigNode); ok {
Expand Down Expand Up @@ -897,7 +895,7 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr

// evaluate constructs the final context for the BlockNode and
// evaluates it. mut must be held when calling evaluate.
func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error {
func (l *Loader) evaluate(logger *slog.Logger, bn BlockNode) error {
ectx := l.cache.GetContext()
err := bn.Evaluate(ectx)
return l.postEvaluate(logger, bn, err)
Expand All @@ -908,18 +906,18 @@ func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error {
// The evaluation err is passed as an argument to allow shadowing it with an error that could be more relevant to the user
// but cannot be determined before the evaluation (for example, we must evaluate the argument node to see if it's optional before
// raising an error when a value is missing). When err is not nil, this function must return an error.
func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error {
func (l *Loader) postEvaluate(logger *slog.Logger, bn BlockNode, err error) error {
switch c := bn.(type) {
case ComponentNode:
// Always update the cached exports, since that it might change when a component gets re-evaluated.
// We also want to cache it in case of an error
err2 := l.cache.CacheExports(c.ID(), c.Exports())
if err2 != nil {
if err != nil {
level.Error(logger).Log("msg", "evaluation and exports caching failed", "eval err", err, "caching err", err2)
logger.Error("evaluation and exports caching failed", "eval_err", err, "caching_err", err2)
return errors.Join(err, err2)
} else {
level.Error(logger).Log("msg", "failed to cache exports after evaluation", "err", err2)
logger.Error("failed to cache exports after evaluation", "err", err2)
return err2
}
}
Expand All @@ -938,7 +936,7 @@ func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error
}

if err != nil {
level.Error(logger).Log("msg", "failed to evaluate config", "node", bn.NodeID(), "err", err)
logger.Error("failed to evaluate config", "node", bn.NodeID(), "err", err)
return err
}
return nil
Expand Down
Loading
Loading