From 220423fadcc246cb22dc81bd524c6101ec5c70e3 Mon Sep 17 00:00:00 2001 From: bdular Date: Wed, 19 Mar 2025 22:10:00 +0100 Subject: [PATCH 1/3] chore: added compose for observability deployment --- docker-compose.observability.yaml | 38 ++++++++++++++++++++ otel-collector.yaml | 58 +++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 docker-compose.observability.yaml create mode 100644 otel-collector.yaml diff --git a/docker-compose.observability.yaml b/docker-compose.observability.yaml new file mode 100644 index 000000000..0557460f0 --- /dev/null +++ b/docker-compose.observability.yaml @@ -0,0 +1,38 @@ +services: + + # Complete observability stack. + grafana-lgtm-stack: + image: grafana/otel-lgtm:0.9.1 + hostname: lgtm-stack + environment: + - ENABLE_LOGS_OTELCOL=true + ports: + - "3000:3000" + volumes: + - prometheus:/prometheus + - loki:/data/loki + - grafana:/var/lib/grafana + - ./otel-collector.yaml:/otel-lgtm/otelcol-config.yaml # OTel collector config contains configuration for scraping Dkron + + dkron: + environment: + - DKRON_ENABLE_PROMETHEUS=true + - DKRON_OTEL_ENDPOINT=lgtm-stack:4317 + - OTEL_EXPORTER_OTLP_INSECURE=true + + dkron-server: + environment: + - DKRON_ENABLE_PROMETHEUS=true + - DKRON_OTEL_ENDPOINT=lgtm-stack:4317 + - OTEL_EXPORTER_OTLP_INSECURE=true + + dkron-agent: + environment: + - DKRON_ENABLE_PROMETHEUS=true + - DKRON_OTEL_ENDPOINT=lgtm-stack:4317 + - OTEL_EXPORTER_OTLP_INSECURE=true + +volumes: + prometheus: + loki: + grafana: diff --git a/otel-collector.yaml b/otel-collector.yaml new file mode 100644 index 000000000..30b9cf714 --- /dev/null +++ b/otel-collector.yaml @@ -0,0 +1,58 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + prometheus/collector: + config: + scrape_configs: + - job_name: "opentelemetry-collector" + static_configs: + - targets: [ "localhost:8888" ] + # Scrape dkron metrics + - job_name: "dkron" + static_configs: + - targets: [ "dkron:8080" ] + +processors: + batch: + +exporters: + otlphttp/metrics: + endpoint: http://localhost:9090/api/v1/otlp + tls: + insecure: true + otlphttp/traces: + endpoint: http://localhost:4418 + tls: + insecure: true + otlphttp/logs: + endpoint: http://localhost:3100/otlp + tls: + insecure: true + debug/metrics: + verbosity: detailed + debug/traces: + verbosity: detailed + debug/logs: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ otlphttp/traces ] + #exporters: [otlphttp/traces,debug/traces] + metrics: + receivers: [ otlp, prometheus/collector ] + processors: [ batch ] + exporters: [ otlphttp/metrics ] + #exporters: [otlphttp/metrics,debug/metrics] + logs: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ otlphttp/logs ] + #exporters: [otlphttp/logs,debug/logs] \ No newline at end of file From 5cf7894e5e1400ed34f673aa82cfffb72de51b89 Mon Sep 17 00:00:00 2001 From: bdular Date: Fri, 21 Mar 2025 15:37:51 +0100 Subject: [PATCH 2/3] feat: basic tracing --- dkron/agent.go | 23 +++++++++-- dkron/api.go | 59 +++++++++++++-------------- dkron/api_test.go | 4 +- dkron/config.go | 4 ++ dkron/fsm.go | 8 ++-- dkron/grpc.go | 26 ++++++------ dkron/grpc_client.go | 4 +- dkron/grpc_test.go | 20 +++++----- dkron/job.go | 7 ++-- dkron/job_test.go | 16 ++++---- dkron/leader.go | 8 +++- dkron/run.go | 10 ++++- dkron/storage.go | 19 ++++----- dkron/store.go | 94 ++++++++++++++++++++++++++++++++------------ dkron/store_test.go | 52 ++++++++++++------------ dkron/tracing.go | 43 ++++++++++++++++++++ dkron/ui.go | 2 +- go.mod | 34 +++++++++------- go.sum | 72 +++++++++++++++++++-------------- 19 files changed, 324 insertions(+), 181 deletions(-) create mode 100644 dkron/tracing.go diff --git a/dkron/agent.go b/dkron/agent.go index 145a44540..21d65e705 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -16,9 +16,9 @@ import ( "time" "github.com/devopsfaith/krakend-usage/client" - metrics "github.com/hashicorp/go-metrics" "github.com/distribworks/dkron/v4/plugin" proto "github.com/distribworks/dkron/v4/types" + "github.com/hashicorp/go-metrics" "github.com/hashicorp/go-uuid" "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" @@ -26,6 +26,8 @@ import ( "github.com/hashicorp/serf/serf" "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -124,6 +126,8 @@ type Agent struct { // logger is the log entry to use fo all logging calls logger *logrus.Entry + + tracer trace.Tracer } // ProcessorFactory is a function type that creates a new instance @@ -146,6 +150,7 @@ func NewAgent(config *Config, options ...AgentOption) *Agent { config: config, retryJoinCh: make(chan error), serverLookup: NewServerLookup(), + tracer: otel.Tracer("dkron-agent"), } for _, option := range options { @@ -189,6 +194,15 @@ func (a *Agent) Start() error { a.logger.Fatal("agent: Can not setup metrics") } + // Setup tracing + if a.config.OpenTelemetryEndpoint != "" { + a.logger.Info("agent: Initializing OpenTelemetry") + _, err := initTracer(context.Background(), a.config.OpenTelemetryEndpoint) + if err != nil { + a.logger.Fatal("agent: Can not setup tracing") + } + } + // Expose the node name expNode.Set(a.config.NodeName) @@ -388,7 +402,9 @@ func (a *Agent) setupRaft() error { if err != nil { return fmt.Errorf("recovery failed to parse peers.json: %v", err) } - store, err := NewStore(a.logger) + + storeTracer := otel.Tracer("dkron-store") + store, err := NewStore(a.logger, storeTracer) if err != nil { a.logger.WithError(err).Fatal("dkron: Error initializing store") } @@ -549,7 +565,8 @@ func (a *Agent) SetConfig(c *Config) { // StartServer launch a new dkron server process func (a *Agent) StartServer() { if a.Store == nil { - s, err := NewStore(a.logger) + storeTracer := otel.Tracer("dkron-store") + s, err := NewStore(a.logger, storeTracer) if err != nil { a.logger.WithError(err).Fatal("dkron: Error initializing store") } diff --git a/dkron/api.go b/dkron/api.go index dbc2d7ae3..9d4ba3400 100644 --- a/dkron/api.go +++ b/dkron/api.go @@ -7,6 +7,7 @@ import ( "net/http" "sort" "strconv" + "strings" "github.com/distribworks/dkron/v4/types" "github.com/gin-contrib/cors" @@ -16,7 +17,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "github.com/tidwall/buntdb" - status "google.golang.org/grpc/status" + "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" + "google.golang.org/grpc/status" ) const ( @@ -56,6 +58,11 @@ func (h *HTTPTransport) ServeHTTP() { config.AllowHeaders = []string{"*"} config.ExposeHeaders = []string{"*"} + // Skip tracing on auxiliary endpoints + filter := func(c *gin.Context) bool { + return !(strings.Contains(c.FullPath(), "metrics") || strings.Contains(c.FullPath(), "health") || strings.Contains(c.FullPath(), "ui")) + } + rootPath.Use(otelgin.Middleware("dkron-api", otelgin.WithGinFilter(filter))) // Adds OpenTelemetry tracing to HTTP API rootPath.Use(cors.New(config)) rootPath.Use(h.MetaMiddleware()) @@ -160,16 +167,14 @@ func (h *HTTPTransport) jobsHandler(c *gin.Context) { order := c.DefaultQuery("_order", "ASC") q := c.Query("q") - jobs, err := h.agent.Store.GetJobs( - &JobOptions{ - Metadata: metadata, - Sort: sort, - Order: order, - Query: q, - Status: c.Query("status"), - Disabled: c.Query("disabled"), - }, - ) + jobs, err := h.agent.Store.GetJobs(c.Request.Context(), &JobOptions{ + Metadata: metadata, + Sort: sort, + Order: order, + Query: q, + Status: c.Query("status"), + Disabled: c.Query("disabled"), + }) if err != nil { h.logger.WithError(err).Error("api: Unable to get jobs, store not reachable.") return @@ -199,7 +204,7 @@ func (h *HTTPTransport) jobsHandler(c *gin.Context) { func (h *HTTPTransport) jobGetHandler(c *gin.Context) { jobName := c.Param("job") - job, err := h.agent.Store.GetJob(jobName, nil) + job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil) if err != nil { h.logger.Error(err) } @@ -348,19 +353,17 @@ func (h *HTTPTransport) executionsHandler(c *gin.Context) { outputSizeLimit = -1 } - job, err := h.agent.Store.GetJob(jobName, nil) + job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil) if err != nil { _ = c.AbortWithError(http.StatusNotFound, err) return } - executions, err := h.agent.Store.GetExecutions(job.Name, - &ExecutionOptions{ - Sort: sort, - Order: order, - Timezone: job.GetTimeLocation(), - }, - ) + executions, err := h.agent.Store.GetExecutions(c.Request.Context(), job.Name, &ExecutionOptions{ + Sort: sort, + Order: order, + Timezone: job.GetTimeLocation(), + }) if err == buntdb.ErrNotFound { executions = make([]*Execution, 0) } else if err != nil { @@ -389,19 +392,17 @@ func (h *HTTPTransport) executionHandler(c *gin.Context) { jobName := c.Param("job") executionName := c.Param("execution") - job, err := h.agent.Store.GetJob(jobName, nil) + job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil) if err != nil { _ = c.AbortWithError(http.StatusNotFound, err) return } - executions, err := h.agent.Store.GetExecutions(job.Name, - &ExecutionOptions{ - Sort: "", - Order: "", - Timezone: job.GetTimeLocation(), - }, - ) + executions, err := h.agent.Store.GetExecutions(c.Request.Context(), job.Name, &ExecutionOptions{ + Sort: "", + Order: "", + Timezone: job.GetTimeLocation(), + }) if err != nil { h.logger.Error(err) @@ -457,7 +458,7 @@ func (h *HTTPTransport) leaveHandler(c *gin.Context) { func (h *HTTPTransport) jobToggleHandler(c *gin.Context) { jobName := c.Param("job") - job, err := h.agent.Store.GetJob(jobName, nil) + job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil) if err != nil { _ = c.AbortWithError(http.StatusNotFound, err) return diff --git a/dkron/api_test.go b/dkron/api_test.go index 43c243dd6..1e6852934 100644 --- a/dkron/api_test.go +++ b/dkron/api_test.go @@ -371,11 +371,11 @@ func TestAPIJobOutputTruncate(t *testing.T) { Output: "test " + strings.Repeat("longer output... ", 100), NodeName: "testNode2", } - _, err = a.Store.SetExecution(testExecution1) + _, err = a.Store.SetExecution(nil, testExecution1) if err != nil { t.Fatal(err) } - _, err = a.Store.SetExecution(testExecution2) + _, err = a.Store.SetExecution(nil, testExecution2) if err != nil { t.Fatal(err) } diff --git a/dkron/config.go b/dkron/config.go index b63874546..646493b24 100644 --- a/dkron/config.go +++ b/dkron/config.go @@ -180,6 +180,9 @@ type Config struct { // CronitorEndpoint is the endpoint to call for cronitor notifications. CronitorEndpoint string `mapstructure:"cronitor-endpoint"` + + // OpenTelemetryEndpoint is the gRPC endpoint to send OpenTelemetry traces to. If empty, no traces will be sent. + OpenTelemetryEndpoint string `mapstructure:"otel-endpoint"` } // DefaultBindPort is the default port that dkron will use for Serf communication @@ -327,6 +330,7 @@ Format there: https://golang.org/pkg/time/#ParseDuration`) cmdFlags.StringSlice("dog-statsd-tags", []string{}, "Datadog tags, specified as key:value") cmdFlags.String("statsd-addr", "", "Statsd address") cmdFlags.Bool("enable-prometheus", false, "Enable serving prometheus metrics") + cmdFlags.String("otel-endpoint", "", "OpenTelemetry gRPC endpoint") cmdFlags.Bool("disable-usage-stats", c.DisableUsageStats, "Disable sending anonymous usage stats") return cmdFlags diff --git a/dkron/fsm.go b/dkron/fsm.go index 3a447cbd3..f58fe2f73 100644 --- a/dkron/fsm.go +++ b/dkron/fsm.go @@ -82,7 +82,7 @@ func (d *dkronFSM) applySetJob(buf []byte) interface{} { return err } job := NewJobFromProto(&pj, d.logger) - if err := d.store.SetJob(job, false); err != nil { + if err := d.store.SetJob(nil, job, false); err != nil { return err } return nil @@ -93,7 +93,7 @@ func (d *dkronFSM) applyDeleteJob(buf []byte) interface{} { if err := proto.Unmarshal(buf, &djr); err != nil { return err } - job, err := d.store.DeleteJob(djr.GetJobName()) + job, err := d.store.DeleteJob(nil, djr.GetJobName()) if err != nil { return err } @@ -110,7 +110,7 @@ func (d *dkronFSM) applyExecutionDone(buf []byte) interface{} { d.logger.WithField("execution", execution.Key()). WithField("output", string(execution.Output)). Debug("fsm: Setting execution") - _, err := d.store.SetExecutionDone(execution) + _, err := d.store.SetExecutionDone(nil, execution) return err } @@ -121,7 +121,7 @@ func (d *dkronFSM) applySetExecution(buf []byte) interface{} { return err } execution := NewExecutionFromProto(&pbex) - key, err := d.store.SetExecution(execution) + key, err := d.store.SetExecution(nil, execution) if err != nil { return err } diff --git a/dkron/grpc.go b/dkron/grpc.go index 781d3e04d..17726e191 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -8,12 +8,13 @@ import ( "strings" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/distribworks/dkron/v4/plugin" "github.com/distribworks/dkron/v4/types" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" "github.com/sirupsen/logrus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -56,7 +57,8 @@ func NewGRPCServer(agent *Agent, logger *logrus.Entry) DkronGRPCServer { // Serve creates and start a new gRPC dkron server func (grpcs *GRPCServer) Serve(lis net.Listener) error { - grpcServer := grpc.NewServer() + serverOpts := grpc.StatsHandler(otelgrpc.NewServerHandler()) // Add otel support + grpcServer := grpc.NewServer(serverOpts) types.RegisterDkronServer(grpcServer, grpcs) as := NewAgentServer(grpcs.agent, grpcs.logger) @@ -136,7 +138,7 @@ func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *types.GetJobRequ defer metrics.MeasureSince([]string{"grpc", "get_job"}, time.Now()) grpcs.logger.WithField("job", getJobReq.JobName).Debug("grpc: Received GetJob") - j, err := grpcs.agent.Store.GetJob(getJobReq.JobName, nil) + j, err := grpcs.agent.Store.GetJob(ctx, getJobReq.JobName, nil) if err != nil { return nil, err } @@ -172,7 +174,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *types.E // This is the leader at this point, so process the execution, encode the value and apply the log to the cluster. // Get the defined output types for the job, and call them - job, err := grpcs.agent.Store.GetJob(execDoneReq.Execution.JobName, nil) + job, err := grpcs.agent.Store.GetJob(ctx, execDoneReq.Execution.JobName, nil) if err != nil { return nil, err } @@ -199,7 +201,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *types.E } // Retrieve the fresh, updated job from the store to work on stored values - job, err = grpcs.agent.Store.GetJob(job.Name, nil) + job, err = grpcs.agent.Store.GetJob(ctx, job.Name, nil) if err != nil { grpcs.logger.WithError(err).WithField("job", execDoneReq.Execution.JobName).Error("grpc: Error retrieving job from store") return nil, err @@ -226,7 +228,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *types.E time.Sleep(eb) - if _, err := grpcs.agent.Run(job.Name, execution); err != nil { + if _, err := grpcs.agent.Run(ctx, job.Name, execution); err != nil { return nil, err } @@ -236,11 +238,9 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *types.E }, nil } - exg, err := grpcs.agent.Store.GetExecutionGroup(execution, - &ExecutionOptions{ - Timezone: job.GetTimeLocation(), - }, - ) + exg, err := grpcs.agent.Store.GetExecutionGroup(ctx, execution, &ExecutionOptions{ + Timezone: job.GetTimeLocation(), + }) if err != nil { grpcs.logger.WithError(err).WithField("group", execution.Group).Error("grpc: Error getting execution group.") return nil, err @@ -255,7 +255,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *types.E // Check first if there's dependent jobs and then check for the job status to begin execution dependent jobs on success. if len(job.DependentJobs) > 0 && job.Status == StatusSuccess { for _, djn := range job.DependentJobs { - dj, err := grpcs.agent.Store.GetJob(djn, nil) + dj, err := grpcs.agent.Store.GetJob(ctx, djn, nil) if err != nil { return nil, err } @@ -289,7 +289,7 @@ func (grpcs *GRPCServer) Leave(ctx context.Context, in *emptypb.Empty) (*emptypb // RunJob runs a job in the cluster func (grpcs *GRPCServer) RunJob(ctx context.Context, req *types.RunJobRequest) (*types.RunJobResponse, error) { ex := NewExecution(req.JobName) - job, err := grpcs.agent.Run(req.JobName, ex) + job, err := grpcs.agent.Run(ctx, req.JobName, ex) if err != nil { return nil, err } diff --git a/dkron/grpc_client.go b/dkron/grpc_client.go index 78380e079..c2eedfbb3 100644 --- a/dkron/grpc_client.go +++ b/dkron/grpc_client.go @@ -5,9 +5,10 @@ import ( "io" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" "github.com/distribworks/dkron/v4/types" "github.com/sirupsen/logrus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" @@ -47,6 +48,7 @@ func NewGRPCClient(dialOpt grpc.DialOption, agent *Agent, logger *logrus.Entry) dialOpt: []grpc.DialOption{ dialOpt, grpc.WithBlock(), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // Add tracing to gRPC client }, agent: agent, logger: logger, diff --git a/dkron/grpc_test.go b/dkron/grpc_test.go index bbaeb786b..b748bf34f 100644 --- a/dkron/grpc_test.go +++ b/dkron/grpc_test.go @@ -50,7 +50,7 @@ func TestGRPCExecutionDone(t *testing.T) { Disabled: true, } - err = a.Store.SetJob(testJob, true) + err = a.Store.SetJob(nil, testJob, true) require.NoError(t, err) testChildJob := &Job{ @@ -61,7 +61,7 @@ func TestGRPCExecutionDone(t *testing.T) { Disabled: false, } - err = a.Store.SetJob(testChildJob, true) + err = a.Store.SetJob(nil, testChildJob, true) require.NoError(t, err) testExecution := &Execution{ @@ -81,7 +81,7 @@ func TestGRPCExecutionDone(t *testing.T) { err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) require.NoError(t, err) - execs, err := a.Store.GetExecutions("test", &ExecutionOptions{}) + execs, err := a.Store.GetExecutions(nil, "test", &ExecutionOptions{}) require.NoError(t, err) assert.Len(t, execs, 1) @@ -89,7 +89,7 @@ func TestGRPCExecutionDone(t *testing.T) { }) t.Run("Should run a dependent job", func(t *testing.T) { - execs, err := a.Store.GetExecutions("child-test", &ExecutionOptions{}) + execs, err := a.Store.GetExecutions(nil, "child-test", &ExecutionOptions{}) require.NoError(t, err) assert.Len(t, execs, 1) @@ -97,13 +97,13 @@ func TestGRPCExecutionDone(t *testing.T) { t.Run("Should store execution on a deleted job", func(t *testing.T) { // Test job with dependents no delete - _, err = a.Store.DeleteJob(testJob.Name) + _, err = a.Store.DeleteJob(nil, testJob.Name) require.Error(t, err) // Remove dependents and parent - _, err = a.Store.DeleteJob(testChildJob.Name) + _, err = a.Store.DeleteJob(nil, testChildJob.Name) require.NoError(t, err) - _, err = a.Store.DeleteJob(testJob.Name) + _, err = a.Store.DeleteJob(nil, testJob.Name) require.NoError(t, err) // Test store execution on a deleted job @@ -116,13 +116,13 @@ func TestGRPCExecutionDone(t *testing.T) { t.Run("Test ephemeral jobs", func(t *testing.T) { testJob.Ephemeral = true - err = a.Store.SetJob(testJob, true) + err = a.Store.SetJob(nil, testJob, true) require.NoError(t, err) err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) assert.NoError(t, err) - j, err := a.Store.GetJob("test", nil) + j, err := a.Store.GetJob(nil, "test", nil) assert.Error(t, err) assert.Nil(t, j) }) @@ -132,7 +132,7 @@ func TestGRPCExecutionDone(t *testing.T) { testJob.DependentJobs = []string{"non-existent"} testExecution.JobName = testJob.Name - err = a.Store.SetJob(testJob, true) + err = a.Store.SetJob(nil, testJob, true) require.NoError(t, err) err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) diff --git a/dkron/job.go b/dkron/job.go index a92372100..31eb44b8e 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -1,6 +1,7 @@ package dkron import ( + "context" "errors" "fmt" "regexp" @@ -267,7 +268,7 @@ func (j *Job) Run() { // Simple execution wrapper ex := NewExecution(j.Name) - if _, err := j.Agent.Run(j.Name, ex); err != nil { + if _, err := j.Agent.Run(context.Background(), j.Name, ex); err != nil { j.logger.WithError(err).Error("job: Error running job") } } @@ -279,7 +280,7 @@ func (j *Job) String() string { } // GetParent returns the parent job of a job -func (j *Job) GetParent(store *Store) (*Job, error) { +func (j *Job) GetParent(ctx context.Context, store *Store) (*Job, error) { if j.Name == j.ParentJob { return nil, ErrSameParent } @@ -288,7 +289,7 @@ func (j *Job) GetParent(store *Store) (*Job, error) { return nil, ErrNoParent } - parentJob, err := store.GetJob(j.ParentJob, nil) + parentJob, err := store.GetJob(ctx, j.ParentJob, nil) if err != nil { if err == buntdb.ErrNotFound { return nil, ErrParentJobNotFound diff --git a/dkron/job_test.go b/dkron/job_test.go index a144a23a4..c2e12eb2f 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -35,7 +35,7 @@ func TestJobGetParent(t *testing.T) { Schedule: "@every 2s", } - if err := s.SetJob(parentTestJob, true); err != nil { + if err := s.SetJob(nil, parentTestJob, true); err != nil { t.Fatalf("error creating job: %s", err) } @@ -46,31 +46,31 @@ func TestJobGetParent(t *testing.T) { ParentJob: "parent_test", } - err = s.SetJob(dependentTestJob, true) + err = s.SetJob(nil, dependentTestJob, true) assert.NoError(t, err) - parentTestJob, err = dependentTestJob.GetParent(s) + parentTestJob, err = dependentTestJob.GetParent(nil, s) assert.NoError(t, err) assert.Equal(t, []string{dependentTestJob.Name}, parentTestJob.DependentJobs) - ptj, err := dependentTestJob.GetParent(s) + ptj, err := dependentTestJob.GetParent(nil, s) assert.NoError(t, err) assert.Equal(t, parentTestJob.Name, ptj.Name) // Remove the parent job dependentTestJob.ParentJob = "" dependentTestJob.Schedule = "@every 2m" - err = s.SetJob(dependentTestJob, true) + err = s.SetJob(nil, dependentTestJob, true) assert.NoError(t, err) - dtj, _ := s.GetJob(dependentTestJob.Name, nil) + dtj, _ := s.GetJob(nil, dependentTestJob.Name, nil) assert.NoError(t, err) assert.Equal(t, "", dtj.ParentJob) - _, err = dtj.GetParent(s) + _, err = dtj.GetParent(nil, s) assert.EqualError(t, ErrNoParent, err.Error()) - ptj, err = s.GetJob(parentTestJob.Name, nil) + ptj, err = s.GetJob(nil, parentTestJob.Name, nil) assert.NoError(t, err) assert.Nil(t, ptj.DependentJobs) } diff --git a/dkron/leader.go b/dkron/leader.go index 14ac7bd4d..7b36e9722 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -1,13 +1,14 @@ package dkron import ( + "context" "fmt" "net" "strings" "sync" "time" - metrics "github.com/hashicorp/go-metrics" + "github.com/hashicorp/go-metrics" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) @@ -223,8 +224,11 @@ func (a *Agent) reconcileMember(member serf.Member) error { func (a *Agent) establishLeadership(stopCh chan struct{}) error { defer metrics.MeasureSince([]string{"dkron", "leader", "establish_leadership"}, time.Now()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + a.logger.Info("agent: Starting scheduler") - jobs, err := a.Store.GetJobs(nil) + jobs, err := a.Store.GetJobs(ctx, nil) if err != nil { return err } diff --git a/dkron/run.go b/dkron/run.go index 4703b3a63..a9057d847 100644 --- a/dkron/run.go +++ b/dkron/run.go @@ -1,15 +1,21 @@ package dkron import ( + "context" "fmt" "sync" "github.com/hashicorp/serf/serf" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // Run call the agents to run a job. Returns a job with its new status and next schedule. -func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { - job, err := a.Store.GetJob(jobName, nil) +func (a *Agent) Run(ctx context.Context, jobName string, ex *Execution) (*Job, error) { + ctx, span := a.tracer.Start(ctx, "agent.Run", trace.WithAttributes(attribute.String("job_name", jobName))) + defer span.End() + + job, err := a.Store.GetJob(ctx, jobName, nil) if err != nil { return nil, fmt.Errorf("agent: Run error retrieving job: %s from store: %w", jobName, err) } diff --git a/dkron/storage.go b/dkron/storage.go index 771b416be..1907b2ec7 100644 --- a/dkron/storage.go +++ b/dkron/storage.go @@ -1,6 +1,7 @@ package dkron import ( + "context" "io" ) @@ -9,15 +10,15 @@ import ( // minimum set of operations that are needed to have a working // dkron store. type Storage interface { - SetJob(job *Job, copyDependentJobs bool) error - DeleteJob(name string) (*Job, error) - SetExecution(execution *Execution) (string, error) - SetExecutionDone(execution *Execution) (bool, error) - GetJobs(options *JobOptions) ([]*Job, error) - GetJob(name string, options *JobOptions) (*Job, error) - GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error) - GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error) - GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) + SetJob(ctx context.Context, job *Job, copyDependentJobs bool) error + DeleteJob(ctx context.Context, name string) (*Job, error) + SetExecution(ctx context.Context, execution *Execution) (string, error) + SetExecutionDone(ctx context.Context, execution *Execution) (bool, error) + GetJobs(ctx context.Context, options *JobOptions) ([]*Job, error) + GetJob(ctx context.Context, name string, options *JobOptions) (*Job, error) + GetExecutions(ctx context.Context, jobName string, opts *ExecutionOptions) ([]*Execution, error) + GetExecutionGroup(ctx context.Context, execution *Execution, opts *ExecutionOptions) ([]*Execution, error) + GetGroupedExecutions(ctx context.Context, jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) Shutdown() error Snapshot(w io.WriteCloser) error Restore(r io.ReadCloser) error diff --git a/dkron/store.go b/dkron/store.go index fa0cdff3c..8d80b31ba 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -2,6 +2,7 @@ package dkron import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -15,6 +16,8 @@ import ( dkronpb "github.com/distribworks/dkron/v4/types" "github.com/sirupsen/logrus" "github.com/tidwall/buntdb" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" ) @@ -28,7 +31,7 @@ const ( var ( // ErrDependentJobs is returned when deleting a job that has dependent jobs - ErrDependentJobs = errors.New("store: could not delete job with dependent jobs, delete childs first") + ErrDependentJobs = errors.New("store: could not delete job with dependent jobs, delete children first") ) // Store is the local implementation of the Storage interface. @@ -36,8 +39,9 @@ var ( // BuntDB. type Store struct { db *buntdb.DB - lock *sync.Mutex // for + lock *sync.Mutex + tracer trace.Tracer logger *logrus.Entry } @@ -64,7 +68,7 @@ type kv struct { } // NewStore creates a new Storage instance. -func NewStore(logger *logrus.Entry) (*Store, error) { +func NewStore(logger *logrus.Entry, tracer trace.Tracer) (*Store, error) { db, err := buntdb.Open(":memory:") if err != nil { return nil, err @@ -85,6 +89,7 @@ func NewStore(logger *logrus.Entry) (*Store, error) { db: db, lock: &sync.Mutex{}, logger: logger, + tracer: tracer, } return store, nil @@ -114,7 +119,10 @@ func (s *Store) DB() *buntdb.DB { } // SetJob stores a job in the storage -func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { +func (s *Store) SetJob(ctx context.Context, job *Job, copyDependentJobs bool) error { + ctx, span := s.tracer.Start(ctx, "buntdb.set.job") + defer span.End() + var pbej dkronpb.Job var ej *Job @@ -124,7 +132,7 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { // Abort if parent not found before committing job to the store if job.ParentJob != "" { - if j, _ := s.GetJob(job.ParentJob, nil); j == nil { + if j, _ := s.GetJob(ctx, job.ParentJob, nil); j == nil { return ErrParentJobNotFound } } @@ -186,10 +194,10 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { // If the parent job changed update the parents of the old (if any) and new jobs if job.ParentJob != ej.ParentJob { - if err := s.removeFromParent(ej); err != nil { + if err := s.removeFromParent(ctx, ej); err != nil { return err } - if err := s.addToParent(job); err != nil { + if err := s.addToParent(ctx, job); err != nil { return err } } @@ -199,13 +207,16 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { // Removes the given job from its parent. // Does nothing if nil is passed as child. -func (s *Store) removeFromParent(child *Job) error { +func (s *Store) removeFromParent(ctx context.Context, child *Job) error { + ctx, span := s.tracer.Start(ctx, "buntdb.remove_from_parent") + defer span.End() + // Do nothing if no job was given or job has no parent if child == nil || child.ParentJob == "" { return nil } - parent, err := child.GetParent(s) + parent, err := child.GetParent(ctx, s) if err != nil { return err } @@ -219,7 +230,7 @@ func (s *Store) removeFromParent(child *Job) error { } } parent.DependentJobs = djs - if err := s.SetJob(parent, false); err != nil { + if err := s.SetJob(ctx, parent, false); err != nil { return err } @@ -227,19 +238,22 @@ func (s *Store) removeFromParent(child *Job) error { } // Adds the given job to its parent. -func (s *Store) addToParent(child *Job) error { +func (s *Store) addToParent(ctx context.Context, child *Job) error { + ctx, span := s.tracer.Start(ctx, "buntdb.add_to_parent") + defer span.End() + // Do nothing if job has no parent if child.ParentJob == "" { return nil } - parent, err := child.GetParent(s) + parent, err := child.GetParent(ctx, s) if err != nil { return err } parent.DependentJobs = append(parent.DependentJobs, child.Name) - if err := s.SetJob(parent, false); err != nil { + if err := s.SetJob(ctx, parent, false); err != nil { return err } @@ -248,7 +262,10 @@ func (s *Store) addToParent(child *Job) error { // SetExecutionDone saves the execution and updates the job with the corresponding // results -func (s *Store) SetExecutionDone(execution *Execution) (bool, error) { +func (s *Store) SetExecutionDone(ctx context.Context, execution *Execution) (bool, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.set.execution_done") + defer span.End() + err := s.db.Update(func(tx *buntdb.Tx) error { // Load the job from the store var pbj dkronpb.Job @@ -314,7 +331,10 @@ func (s *Store) jobHasMetadata(job *Job, metadata map[string]string) bool { } // GetJobs returns all jobs -func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) { +func (s *Store) GetJobs(ctx context.Context, options *JobOptions) ([]*Job, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.get.jobs") + defer span.End() + if options == nil { options = &JobOptions{ Sort: "name", @@ -359,7 +379,10 @@ func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) { } // GetJob finds and return a Job from the store -func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) { +func (s *Store) GetJob(ctx context.Context, name string, options *JobOptions) (*Job, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.get.job", trace.WithAttributes(attribute.String("job_name", name))) + defer span.End() + var pbj dkronpb.Job err := s.db.View(s.getJobTxFunc(name, &pbj)) @@ -399,7 +422,10 @@ func (s *Store) getJobTxFunc(name string, pbj *dkronpb.Job) func(tx *buntdb.Tx) // DeleteJob deletes the given job from the store, along with // all its executions and references to it. -func (s *Store) DeleteJob(name string) (*Job, error) { +func (s *Store) DeleteJob(ctx context.Context, name string) (*Job, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.delete.job", trace.WithAttributes(attribute.String("job_name", name))) + defer span.End() + var job *Job err := s.db.Update(func(tx *buntdb.Tx) error { // Get the job @@ -428,7 +454,7 @@ func (s *Store) DeleteJob(name string) (*Job, error) { // If the transaction succeeded, remove from parent if job.ParentJob != "" { - if err := s.removeFromParent(job); err != nil { + if err := s.removeFromParent(ctx, job); err != nil { return nil, err } } @@ -437,7 +463,10 @@ func (s *Store) DeleteJob(name string) (*Job, error) { } // GetExecutions returns the executions given a Job name. -func (s *Store) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error) { +func (s *Store) GetExecutions(ctx context.Context, jobName string, opts *ExecutionOptions) ([]*Execution, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.get.executions", trace.WithAttributes(attribute.String("job_name", jobName))) + defer span.End() + prefix := fmt.Sprintf("%s:%s:", executionsPrefix, jobName) kvs, err := s.list(prefix, true, opts) @@ -484,8 +513,11 @@ func (*Store) listTxFunc(prefix string, kvs *[]kv, found *bool, opts *ExecutionO } // GetExecutionGroup returns all executions in the same group of a given execution -func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error) { - res, err := s.GetExecutions(execution.JobName, opts) +func (s *Store) GetExecutionGroup(ctx context.Context, execution *Execution, opts *ExecutionOptions) ([]*Execution, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.get.executions_group") + defer span.End() + + res, err := s.GetExecutions(ctx, execution.JobName, opts) if err != nil { return nil, err } @@ -501,8 +533,11 @@ func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) // GetGroupedExecutions returns executions for a job grouped and with an ordered index // to facilitate access. -func (s *Store) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) { - execs, err := s.GetExecutions(jobName, opts) +func (s *Store) GetGroupedExecutions(ctx context.Context, jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.get.grouped_executions", trace.WithAttributes(attribute.String("job_name", jobName))) + defer span.End() + + execs, err := s.GetExecutions(ctx, jobName, opts) if err != nil { return nil, nil, err } @@ -556,7 +591,10 @@ func (*Store) setExecutionTxFunc(key string, pbe *dkronpb.Execution) func(tx *bu } // SetExecution Save a new execution and returns the key of the new saved item or an error. -func (s *Store) SetExecution(execution *Execution) (string, error) { +func (s *Store) SetExecution(ctx context.Context, execution *Execution) (string, error) { + ctx, span := s.tracer.Start(ctx, "buntdb.set.executions") + defer span.End() + pbe := execution.ToProto() key := fmt.Sprintf("%s:%s:%s", executionsPrefix, execution.JobName, execution.Key()) @@ -576,7 +614,7 @@ func (s *Store) SetExecution(execution *Execution) (string, error) { return "", err } - execs, err := s.GetExecutions(execution.JobName, &ExecutionOptions{}) + execs, err := s.GetExecutions(ctx, execution.JobName, &ExecutionOptions{}) if err != nil && err != buntdb.ErrNotFound { s.logger.WithError(err). WithField("job", execution.JobName). @@ -640,11 +678,17 @@ func (s *Store) Shutdown() error { // Snapshot creates a backup of the data stored in BuntDB func (s *Store) Snapshot(w io.WriteCloser) error { + _, span := s.tracer.Start(context.Background(), "buntdb.create.snapshot") + defer span.End() + return s.db.Save(w) } // Restore load data created with backup in to Bunt func (s *Store) Restore(r io.ReadCloser) error { + _, span := s.tracer.Start(context.Background(), "buntdb.restore.snapshot") + defer span.End() + return s.db.Load(r) } diff --git a/dkron/store_test.go b/dkron/store_test.go index 9e529749f..28f26f5c3 100644 --- a/dkron/store_test.go +++ b/dkron/store_test.go @@ -32,17 +32,17 @@ func TestStore(t *testing.T) { } // Check that we still get an empty job list - jobs, err := s.GetJobs(nil) + jobs, err := s.GetJobs(nil, nil) assert.NoError(t, err) assert.NotNil(t, jobs, "jobs nil, expecting empty slice") assert.Empty(t, jobs) - err = s.SetJob(testJob, true) + err = s.SetJob(nil, testJob, true) assert.NoError(t, err) - err = s.SetJob(testJob2, true) + err = s.SetJob(nil, testJob2, true) assert.NoError(t, err) - jobs, err = s.GetJobs(nil) + jobs, err = s.GetJobs(nil, nil) assert.NoError(t, err) assert.Len(t, jobs, 2) assert.Equal(t, "test", jobs[0].Name) @@ -56,7 +56,7 @@ func TestStore(t *testing.T) { NodeName: "testNode", } - _, err = s.SetExecution(testExecution) + _, err = s.SetExecution(nil, testExecution) require.NoError(t, err) testExecution2 := &Execution{ @@ -67,10 +67,10 @@ func TestStore(t *testing.T) { Output: "test", NodeName: "testNode", } - _, err = s.SetExecution(testExecution2) + _, err = s.SetExecution(nil, testExecution2) require.NoError(t, err) - execs, err := s.GetExecutions("test", &ExecutionOptions{ + execs, err := s.GetExecutions(nil, "test", &ExecutionOptions{ Sort: "started_at", Order: "DESC", }) @@ -80,13 +80,13 @@ func TestStore(t *testing.T) { assert.Equal(t, testExecution, execs[0]) assert.Len(t, execs, 1) - _, err = s.DeleteJob("test") + _, err = s.DeleteJob(nil, "test") assert.NoError(t, err) - _, err = s.DeleteJob("test") + _, err = s.DeleteJob(nil, "test") assert.EqualError(t, err, buntdb.ErrNotFound.Error()) - _, err = s.DeleteJob("test2") + _, err = s.DeleteJob(nil, "test2") assert.NoError(t, err) } @@ -164,11 +164,11 @@ func TestStore_ChildIsUpdatedAfterDeletingParentJob(t *testing.T) { storeJob(t, s, "parent1") storeChildJob(t, s, "child1", "parent1") - _, err := s.DeleteJob("parent1") + _, err := s.DeleteJob(nil, "parent1") assert.EqualError(t, err, ErrDependentJobs.Error()) deleteJob(t, s, "child1") - _, err = s.DeleteJob("parent1") + _, err = s.DeleteJob(nil, "parent1") assert.NoError(t, err) } @@ -185,18 +185,18 @@ func TestStore_GetJobsWithMetadata(t *testing.T) { var options JobOptions options.Metadata = make(map[string]string) options.Metadata["t1"] = "v1" - jobs, err := s.GetJobs(&options) + jobs, err := s.GetJobs(nil, &options) assert.NoError(t, err) assert.Equal(t, 2, len(jobs)) options.Metadata["t2"] = "v2" - jobs, err = s.GetJobs(&options) + jobs, err = s.GetJobs(nil, &options) assert.NoError(t, err) assert.Equal(t, 1, len(jobs)) assert.Equal(t, "job2", jobs[0].Name) options.Metadata["t3"] = "v3" - jobs, err = s.GetJobs(&options) + jobs, err = s.GetJobs(nil, &options) assert.NoError(t, err) assert.Equal(t, 0, len(jobs)) } @@ -218,7 +218,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 1, } - _, _ = s.SetExecution(ex1) + _, _ = s.SetExecution(nil, ex1) ex2 := &Execution{ JobName: "test", @@ -229,7 +229,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode2", Group: 1, } - _, _ = s.SetExecution(ex2) + _, _ = s.SetExecution(nil, ex2) ex3 := &Execution{ JobName: "test", @@ -240,7 +240,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 2, } - _, _ = s.SetExecution(ex3) + _, _ = s.SetExecution(nil, ex3) ex4 := &Execution{ JobName: "test", @@ -251,7 +251,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 2, } - _, _ = s.SetExecution(ex4) + _, _ = s.SetExecution(nil, ex4) ex5 := &Execution{ JobName: "test", @@ -261,7 +261,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 3, } - _, _ = s.SetExecution(ex5) + _, _ = s.SetExecution(nil, ex5) ex6 := &Execution{ JobName: "test", @@ -270,7 +270,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 4, } - _, _ = s.SetExecution(ex6) + _, _ = s.SetExecution(nil, ex6) // Tests status err = s.db.View(func(tx *buntdb.Tx) error { @@ -296,21 +296,21 @@ func Test_computeStatus(t *testing.T) { func storeJob(t *testing.T, s *Store, jobName string) { job := scaffoldJob() job.Name = jobName - require.NoError(t, s.SetJob(job, false)) + require.NoError(t, s.SetJob(nil, job, false)) } func storeJobWithMetadata(t *testing.T, s *Store, jobName string, metadata map[string]string) { job := scaffoldJob() job.Name = jobName job.Metadata = metadata - require.NoError(t, s.SetJob(job, false)) + require.NoError(t, s.SetJob(nil, job, false)) } func storeChildJob(t *testing.T, s *Store, jobName string, parentName string) { job := scaffoldJob() job.Name = jobName job.ParentJob = parentName - require.NoError(t, s.SetJob(job, false)) + require.NoError(t, s.SetJob(nil, job, false)) } func scaffoldJob() *Job { @@ -331,12 +331,12 @@ func setupStore(t *testing.T) *Store { } func loadJob(t *testing.T, s *Store, name string) *Job { - job, err := s.GetJob(name, nil) + job, err := s.GetJob(nil, name, nil) require.NoError(t, err) return job } func deleteJob(t *testing.T, s *Store, name string) { - _, err := s.DeleteJob(name) + _, err := s.DeleteJob(nil, name) require.NoError(t, err) } diff --git a/dkron/tracing.go b/dkron/tracing.go new file mode 100644 index 000000000..186c61bf4 --- /dev/null +++ b/dkron/tracing.go @@ -0,0 +1,43 @@ +package dkron + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func initTracer(ctx context.Context, endpoint string) (*sdktrace.TracerProvider, error) { + exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(endpoint)) + if err != nil { + return nil, err + } + + r, err := resource.New( + ctx, + resource.WithAttributes( + attribute.String("service.name", "dkron"), + attribute.String("service.version", "unknown"), + ), + resource.WithContainer(), + resource.WithContainerID(), + resource.WithOS(), + resource.WithProcessCommandArgs(), + ) + if err != nil { + return nil, err + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(r), + ) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + return tp, nil +} diff --git a/dkron/ui.go b/dkron/ui.go index afbdc0c2b..c6465d4d1 100644 --- a/dkron/ui.go +++ b/dkron/ui.go @@ -56,7 +56,7 @@ func (h *HTTPTransport) UI(r *gin.RouterGroup, aclEnabled bool) { if err == nil && p != "/" && p != "/index.html" { ctx.FileFromFS(p, http.FS(assets)) } else { - jobs, err := h.agent.Store.GetJobs(nil) + jobs, err := h.agent.Store.GetJobs(nil, nil) if err != nil { h.logger.Error(err) } diff --git a/go.mod b/go.mod index 315c278f0..a749b7f5a 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,12 @@ require ( github.com/stretchr/testify v1.10.0 github.com/tidwall/buntdb v1.3.2 github.com/xdg-go/scram v1.1.2 + go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.60.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 + go.opentelemetry.io/otel v1.35.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 + go.opentelemetry.io/otel/sdk v1.35.0 + go.opentelemetry.io/otel/trace v1.35.0 golang.org/x/net v0.37.0 google.golang.org/grpc v1.71.0 google.golang.org/protobuf v1.36.6 @@ -75,12 +81,12 @@ require ( github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/boltdb/bolt v1.3.1 // indirect github.com/bufbuild/protocompile v0.14.1 // indirect - github.com/bytedance/sonic v1.12.6 // indirect - github.com/bytedance/sonic/loader v0.2.1 // indirect + github.com/bytedance/sonic v1.12.10 // indirect + github.com/bytedance/sonic/loader v0.2.3 // indirect github.com/catalinc/hashcash v0.0.0-20161205220751-e6bc29ff4de9 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cloudwego/base64x v0.1.4 // indirect - github.com/cloudwego/iasm v0.2.0 // indirect + github.com/cloudwego/base64x v0.1.5 // indirect github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -96,8 +102,8 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.7 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.8 // indirect + github.com/gin-contrib/sse v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -105,9 +111,9 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.23.0 // indirect + github.com/go-playground/validator/v10 v10.25.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/goccy/go-json v0.10.4 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect @@ -121,6 +127,7 @@ require ( github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gophercloud/gophercloud v0.1.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-discover/provider/gce v0.0.0-20240829171124-547b9abd20f6 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect @@ -142,7 +149,7 @@ require ( github.com/joyent/triton-go v0.0.0-20180628001255-830d2b111e62 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.0 // indirect - github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/linode/linodego v0.7.1 // indirect @@ -206,14 +213,13 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/arch v0.12.0 // indirect + golang.org/x/arch v0.14.0 // indirect golang.org/x/crypto v0.36.0 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/oauth2 v0.28.0 // indirect diff --git a/go.sum b/go.sum index 719f1f082..62326d316 100644 --- a/go.sum +++ b/go.sum @@ -105,13 +105,15 @@ github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bufbuild/protocompile v0.14.1 h1:iA73zAf/fyljNjQKwYzUHD6AD4R8KMasmwa/FBatYVw= github.com/bufbuild/protocompile v0.14.1/go.mod h1:ppVdAIhbr2H8asPk6k4pY7t9zB1OU5DoEw9xY/FUi1c= -github.com/bytedance/sonic v1.12.6 h1:/isNmCUF2x3Sh8RAp/4mh4ZGkcFAX/hLrzrK3AvpRzk= -github.com/bytedance/sonic v1.12.6/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= +github.com/bytedance/sonic v1.12.10 h1:uVCQr6oS5669E9ZVW0HyksTLfNS7Q/9hV6IVS4nEMsI= +github.com/bytedance/sonic v1.12.10/go.mod h1:uVvFidNmlt9+wa31S1urfwwthTWteBgG0hWuoKAXTx8= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iCjjJv3+E= -github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.2.3 h1:yctD0Q3v2NOGfSWPLPvG2ggA2kV6TS6s4wioyEqssH0= +github.com/bytedance/sonic/loader v0.2.3/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/catalinc/hashcash v0.0.0-20161205220751-e6bc29ff4de9 h1:mzt00lI/krYDFH1qNfQdDZze2GjRaTeho7Ch9af/wsY= github.com/catalinc/hashcash v0.0.0-20161205220751-e6bc29ff4de9/go.mod h1:Qj15jt0Y3YvBTjOfWQ7WdgNtSE9WnbzIDpLcTcpQ1qw= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -122,9 +124,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= -github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= -github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= +github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk= @@ -185,16 +186,16 @@ github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/ github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fullstorydev/grpcurl v1.9.3 h1:PC1Xi3w+JAvEE2Tg2Gf2RfVgPbf9+tbuQr1ZkyVU3jk= github.com/fullstorydev/grpcurl v1.9.3/go.mod h1:/b4Wxe8bG6ndAjlfSUjwseQReUDUvBJiFEB7UllOlUE= -github.com/gabriel-vasile/mimetype v1.4.7 h1:SKFKl7kD0RiPdbht0s7hFtjl489WcQ1VyPW8ZzUMYCA= -github.com/gabriel-vasile/mimetype v1.4.7/go.mod h1:GDlAgAyIRT27BhFl53XNAFtfjzOkLaF35JdEG0P7LtU= +github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= +github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= github.com/getkin/kin-openapi v0.131.0 h1:NO2UeHnFKRYhZ8wg6Nyh5Cq7dHk4suQQr72a4pMrDxE= github.com/getkin/kin-openapi v0.131.0/go.mod h1:3OlG51PCYNsPByuiMB0t4fjnNlIDnaEDsjiKUV8nL58= github.com/gin-contrib/cors v1.7.4 h1:/fC6/wk7rCRtqKqki8lLr2Xq+hnV49aXDLIuSek9g4k= github.com/gin-contrib/cors v1.7.4/go.mod h1:vGc/APSgLMlQfEJV5NAzkrAHb0C8DetL3K6QZuvGii0= github.com/gin-contrib/expvar v1.0.2 h1:GnadrJS2F/LcoVaQL6xrcqAiUg5XRIzUdH4saPOKu+w= github.com/gin-contrib/expvar v1.0.2/go.mod h1:1mc8Bzt5Wy7zRLQ/iE2xF4hc8wPUmBMRb5yEPNSZvUQ= -github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E= +github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0= github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -228,15 +229,15 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o= -github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.25.0 h1:5Dh7cjvzR7BRZadnsVOzPhWsrwUr0nmsZJxEAnFLNO8= +github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= -github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM= -github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -319,6 +320,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -425,8 +428,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= -github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -633,6 +636,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= @@ -700,26 +704,36 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 h1:rgMkmiGfix9vFJDcDi1PK8WEQP4FLQwLDfhp5ZLpFeE= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0/go.mod h1:ijPqXp5P6IRRByFVVg9DY8P5HkxkHE5ARIa+86aXPf4= +go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.60.0 h1:jj/B7eX95/mOxim9g9laNZkOHKz/XCHG0G410SntRy4= +go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.60.0/go.mod h1:ZvRTVaYYGypytG0zRp2A60lpj//cMq3ZnxYdZaljVBM= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 h1:x7wzEgXfnzJcHDwStJT+mxOz4etr2EcexjqhBvmoakw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0/go.mod h1:rg+RlpR5dKwaS95IyyZqj5Wd4E13lk/msnTS0Xl9lJM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= -go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= -go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= -go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= -go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= -go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= -go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 h1:m639+BofXTvcY1q8CGs4ItwQarYtJPOWmVobfM1HpVI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0/go.mod h1:LjReUci/F4BUyv+y4dwnq3h/26iNOeC3wAIqgvTIZVo= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= -go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= -go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= -golang.org/x/arch v0.12.0 h1:UsYJhbzPYGsT0HbEdmYcqtCv8UNGvnaL561NnIUvaKg= -golang.org/x/arch v0.12.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/arch v0.14.0 h1:z9JUEZWr8x4rR0OU6c4/4t6E6jOZ8/QBS2bBYBm4tx4= +golang.org/x/arch v0.14.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= From e2c9fa4dd8bd9c0f55696c162ddb6ffd5e0ac91d Mon Sep 17 00:00:00 2001 From: bdular Date: Sun, 23 Mar 2025 10:03:28 +0100 Subject: [PATCH 3/3] fix: all nil contexts passed --- dkron/api_test.go | 6 +++-- dkron/fsm.go | 27 ++++++++++--------- dkron/grpc_test.go | 23 +++++++++------- dkron/job_test.go | 22 ++++++++------- dkron/store_test.go | 65 +++++++++++++++++++++++++-------------------- dkron/ui.go | 2 +- 6 files changed, 82 insertions(+), 63 deletions(-) diff --git a/dkron/api_test.go b/dkron/api_test.go index 1e6852934..7bfa3d48a 100644 --- a/dkron/api_test.go +++ b/dkron/api_test.go @@ -2,6 +2,7 @@ package dkron import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -355,6 +356,7 @@ func TestAPIJobOutputTruncate(t *testing.T) { assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Equal(t, string(body), "[]") + ctx := context.Background() testExecution1 := &Execution{ JobName: "test_job", StartedAt: time.Now().UTC(), @@ -371,11 +373,11 @@ func TestAPIJobOutputTruncate(t *testing.T) { Output: "test " + strings.Repeat("longer output... ", 100), NodeName: "testNode2", } - _, err = a.Store.SetExecution(nil, testExecution1) + _, err = a.Store.SetExecution(ctx, testExecution1) if err != nil { t.Fatal(err) } - _, err = a.Store.SetExecution(nil, testExecution2) + _, err = a.Store.SetExecution(ctx, testExecution2) if err != nil { t.Fatal(err) } diff --git a/dkron/fsm.go b/dkron/fsm.go index f58fe2f73..eb42e2568 100644 --- a/dkron/fsm.go +++ b/dkron/fsm.go @@ -1,6 +1,7 @@ package dkron import ( + "context" "io" dkronpb "github.com/distribworks/dkron/v4/types" @@ -57,15 +58,17 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} { d.logger.WithField("command", msgType).Debug("fsm: received command") + ctx := context.Background() + switch msgType { case SetJobType: - return d.applySetJob(buf[1:]) + return d.applySetJob(ctx, buf[1:]) case DeleteJobType: - return d.applyDeleteJob(buf[1:]) + return d.applyDeleteJob(ctx, buf[1:]) case ExecutionDoneType: - return d.applyExecutionDone(buf[1:]) + return d.applyExecutionDone(ctx, buf[1:]) case SetExecutionType: - return d.applySetExecution(buf[1:]) + return d.applySetExecution(ctx, buf[1:]) } // Check enterprise only message types. @@ -76,31 +79,31 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} { return nil } -func (d *dkronFSM) applySetJob(buf []byte) interface{} { +func (d *dkronFSM) applySetJob(ctx context.Context, buf []byte) interface{} { var pj dkronpb.Job if err := proto.Unmarshal(buf, &pj); err != nil { return err } job := NewJobFromProto(&pj, d.logger) - if err := d.store.SetJob(nil, job, false); err != nil { + if err := d.store.SetJob(ctx, job, false); err != nil { return err } return nil } -func (d *dkronFSM) applyDeleteJob(buf []byte) interface{} { +func (d *dkronFSM) applyDeleteJob(ctx context.Context, buf []byte) interface{} { var djr dkronpb.DeleteJobRequest if err := proto.Unmarshal(buf, &djr); err != nil { return err } - job, err := d.store.DeleteJob(nil, djr.GetJobName()) + job, err := d.store.DeleteJob(ctx, djr.GetJobName()) if err != nil { return err } return job } -func (d *dkronFSM) applyExecutionDone(buf []byte) interface{} { +func (d *dkronFSM) applyExecutionDone(ctx context.Context, buf []byte) interface{} { var execDoneReq dkronpb.ExecutionDoneRequest if err := proto.Unmarshal(buf, &execDoneReq); err != nil { return err @@ -110,18 +113,18 @@ func (d *dkronFSM) applyExecutionDone(buf []byte) interface{} { d.logger.WithField("execution", execution.Key()). WithField("output", string(execution.Output)). Debug("fsm: Setting execution") - _, err := d.store.SetExecutionDone(nil, execution) + _, err := d.store.SetExecutionDone(ctx, execution) return err } -func (d *dkronFSM) applySetExecution(buf []byte) interface{} { +func (d *dkronFSM) applySetExecution(ctx context.Context, buf []byte) interface{} { var pbex dkronpb.Execution if err := proto.Unmarshal(buf, &pbex); err != nil { return err } execution := NewExecutionFromProto(&pbex) - key, err := d.store.SetExecution(nil, execution) + key, err := d.store.SetExecution(ctx, execution) if err != nil { return err } diff --git a/dkron/grpc_test.go b/dkron/grpc_test.go index b748bf34f..cf4738ec3 100644 --- a/dkron/grpc_test.go +++ b/dkron/grpc_test.go @@ -1,6 +1,7 @@ package dkron import ( + "context" "io/ioutil" "os" "testing" @@ -50,7 +51,9 @@ func TestGRPCExecutionDone(t *testing.T) { Disabled: true, } - err = a.Store.SetJob(nil, testJob, true) + ctx := context.Background() + + err = a.Store.SetJob(ctx, testJob, true) require.NoError(t, err) testChildJob := &Job{ @@ -61,7 +64,7 @@ func TestGRPCExecutionDone(t *testing.T) { Disabled: false, } - err = a.Store.SetJob(nil, testChildJob, true) + err = a.Store.SetJob(ctx, testChildJob, true) require.NoError(t, err) testExecution := &Execution{ @@ -81,7 +84,7 @@ func TestGRPCExecutionDone(t *testing.T) { err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) require.NoError(t, err) - execs, err := a.Store.GetExecutions(nil, "test", &ExecutionOptions{}) + execs, err := a.Store.GetExecutions(ctx, "test", &ExecutionOptions{}) require.NoError(t, err) assert.Len(t, execs, 1) @@ -89,7 +92,7 @@ func TestGRPCExecutionDone(t *testing.T) { }) t.Run("Should run a dependent job", func(t *testing.T) { - execs, err := a.Store.GetExecutions(nil, "child-test", &ExecutionOptions{}) + execs, err := a.Store.GetExecutions(ctx, "child-test", &ExecutionOptions{}) require.NoError(t, err) assert.Len(t, execs, 1) @@ -97,13 +100,13 @@ func TestGRPCExecutionDone(t *testing.T) { t.Run("Should store execution on a deleted job", func(t *testing.T) { // Test job with dependents no delete - _, err = a.Store.DeleteJob(nil, testJob.Name) + _, err = a.Store.DeleteJob(ctx, testJob.Name) require.Error(t, err) // Remove dependents and parent - _, err = a.Store.DeleteJob(nil, testChildJob.Name) + _, err = a.Store.DeleteJob(ctx, testChildJob.Name) require.NoError(t, err) - _, err = a.Store.DeleteJob(nil, testJob.Name) + _, err = a.Store.DeleteJob(ctx, testJob.Name) require.NoError(t, err) // Test store execution on a deleted job @@ -116,13 +119,13 @@ func TestGRPCExecutionDone(t *testing.T) { t.Run("Test ephemeral jobs", func(t *testing.T) { testJob.Ephemeral = true - err = a.Store.SetJob(nil, testJob, true) + err = a.Store.SetJob(ctx, testJob, true) require.NoError(t, err) err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) assert.NoError(t, err) - j, err := a.Store.GetJob(nil, "test", nil) + j, err := a.Store.GetJob(ctx, "test", nil) assert.Error(t, err) assert.Nil(t, j) }) @@ -132,7 +135,7 @@ func TestGRPCExecutionDone(t *testing.T) { testJob.DependentJobs = []string{"non-existent"} testExecution.JobName = testJob.Name - err = a.Store.SetJob(nil, testJob, true) + err = a.Store.SetJob(ctx, testJob, true) require.NoError(t, err) err = rc.ExecutionDone(a.advertiseRPCAddr(), testExecution) diff --git a/dkron/job_test.go b/dkron/job_test.go index c2e12eb2f..fb73612bb 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -1,6 +1,7 @@ package dkron import ( + "context" "encoding/json" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "google.golang.org/grpc" ) @@ -24,10 +26,12 @@ func getTestLogger() *logrus.Entry { func TestJobGetParent(t *testing.T) { log := getTestLogger() - s, err := NewStore(log) + s, err := NewStore(log, otel.Tracer("test")) defer s.Shutdown() // nolint: errcheck require.NoError(t, err) + ctx := context.Background() + parentTestJob := &Job{ Name: "parent_test", Executor: "shell", @@ -35,7 +39,7 @@ func TestJobGetParent(t *testing.T) { Schedule: "@every 2s", } - if err := s.SetJob(nil, parentTestJob, true); err != nil { + if err := s.SetJob(ctx, parentTestJob, true); err != nil { t.Fatalf("error creating job: %s", err) } @@ -46,31 +50,31 @@ func TestJobGetParent(t *testing.T) { ParentJob: "parent_test", } - err = s.SetJob(nil, dependentTestJob, true) + err = s.SetJob(ctx, dependentTestJob, true) assert.NoError(t, err) - parentTestJob, err = dependentTestJob.GetParent(nil, s) + parentTestJob, err = dependentTestJob.GetParent(ctx, s) assert.NoError(t, err) assert.Equal(t, []string{dependentTestJob.Name}, parentTestJob.DependentJobs) - ptj, err := dependentTestJob.GetParent(nil, s) + ptj, err := dependentTestJob.GetParent(ctx, s) assert.NoError(t, err) assert.Equal(t, parentTestJob.Name, ptj.Name) // Remove the parent job dependentTestJob.ParentJob = "" dependentTestJob.Schedule = "@every 2m" - err = s.SetJob(nil, dependentTestJob, true) + err = s.SetJob(ctx, dependentTestJob, true) assert.NoError(t, err) - dtj, _ := s.GetJob(nil, dependentTestJob.Name, nil) + dtj, _ := s.GetJob(ctx, dependentTestJob.Name, nil) assert.NoError(t, err) assert.Equal(t, "", dtj.ParentJob) - _, err = dtj.GetParent(nil, s) + _, err = dtj.GetParent(ctx, s) assert.EqualError(t, ErrNoParent, err.Error()) - ptj, err = s.GetJob(nil, parentTestJob.Name, nil) + ptj, err = s.GetJob(ctx, parentTestJob.Name, nil) assert.NoError(t, err) assert.Nil(t, ptj.DependentJobs) } diff --git a/dkron/store_test.go b/dkron/store_test.go index 28f26f5c3..bc200a2f8 100644 --- a/dkron/store_test.go +++ b/dkron/store_test.go @@ -1,17 +1,19 @@ package dkron import ( + "context" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/buntdb" + "go.opentelemetry.io/otel" ) func TestStore(t *testing.T) { log := getTestLogger() - s, err := NewStore(log) + s, err := NewStore(log, otel.Tracer("test")) require.NoError(t, err) defer s.Shutdown() // nolint: errcheck @@ -31,18 +33,19 @@ func TestStore(t *testing.T) { Disabled: true, } + ctx := context.Background() // Check that we still get an empty job list - jobs, err := s.GetJobs(nil, nil) + jobs, err := s.GetJobs(ctx, nil) assert.NoError(t, err) assert.NotNil(t, jobs, "jobs nil, expecting empty slice") assert.Empty(t, jobs) - err = s.SetJob(nil, testJob, true) + err = s.SetJob(ctx, testJob, true) assert.NoError(t, err) - err = s.SetJob(nil, testJob2, true) + err = s.SetJob(ctx, testJob2, true) assert.NoError(t, err) - jobs, err = s.GetJobs(nil, nil) + jobs, err = s.GetJobs(ctx, nil) assert.NoError(t, err) assert.Len(t, jobs, 2) assert.Equal(t, "test", jobs[0].Name) @@ -56,7 +59,7 @@ func TestStore(t *testing.T) { NodeName: "testNode", } - _, err = s.SetExecution(nil, testExecution) + _, err = s.SetExecution(ctx, testExecution) require.NoError(t, err) testExecution2 := &Execution{ @@ -67,10 +70,10 @@ func TestStore(t *testing.T) { Output: "test", NodeName: "testNode", } - _, err = s.SetExecution(nil, testExecution2) + _, err = s.SetExecution(ctx, testExecution2) require.NoError(t, err) - execs, err := s.GetExecutions(nil, "test", &ExecutionOptions{ + execs, err := s.GetExecutions(ctx, "test", &ExecutionOptions{ Sort: "started_at", Order: "DESC", }) @@ -80,13 +83,13 @@ func TestStore(t *testing.T) { assert.Equal(t, testExecution, execs[0]) assert.Len(t, execs, 1) - _, err = s.DeleteJob(nil, "test") + _, err = s.DeleteJob(ctx, "test") assert.NoError(t, err) - _, err = s.DeleteJob(nil, "test") + _, err = s.DeleteJob(ctx, "test") assert.EqualError(t, err, buntdb.ErrNotFound.Error()) - _, err = s.DeleteJob(nil, "test2") + _, err = s.DeleteJob(ctx, "test2") assert.NoError(t, err) } @@ -160,15 +163,16 @@ func TestStore_JobBecomesIndependentJob(t *testing.T) { func TestStore_ChildIsUpdatedAfterDeletingParentJob(t *testing.T) { s := setupStore(t) + ctx := context.Background() storeJob(t, s, "parent1") storeChildJob(t, s, "child1", "parent1") - _, err := s.DeleteJob(nil, "parent1") + _, err := s.DeleteJob(ctx, "parent1") assert.EqualError(t, err, ErrDependentJobs.Error()) deleteJob(t, s, "child1") - _, err = s.DeleteJob(nil, "parent1") + _, err = s.DeleteJob(ctx, "parent1") assert.NoError(t, err) } @@ -182,31 +186,34 @@ func TestStore_GetJobsWithMetadata(t *testing.T) { metadata["t2"] = "v2" storeJobWithMetadata(t, s, "job2", metadata) + ctx := context.Background() + var options JobOptions options.Metadata = make(map[string]string) options.Metadata["t1"] = "v1" - jobs, err := s.GetJobs(nil, &options) + jobs, err := s.GetJobs(ctx, &options) assert.NoError(t, err) assert.Equal(t, 2, len(jobs)) options.Metadata["t2"] = "v2" - jobs, err = s.GetJobs(nil, &options) + jobs, err = s.GetJobs(ctx, &options) assert.NoError(t, err) assert.Equal(t, 1, len(jobs)) assert.Equal(t, "job2", jobs[0].Name) options.Metadata["t3"] = "v3" - jobs, err = s.GetJobs(nil, &options) + jobs, err = s.GetJobs(ctx, &options) assert.NoError(t, err) assert.Equal(t, 0, len(jobs)) } func Test_computeStatus(t *testing.T) { log := getTestLogger() - s, err := NewStore(log) + s, err := NewStore(log, otel.Tracer("test")) require.NoError(t, err) n := time.Now() + ctx := context.Background() // Prepare executions ex1 := &Execution{ @@ -218,7 +225,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 1, } - _, _ = s.SetExecution(nil, ex1) + _, _ = s.SetExecution(ctx, ex1) ex2 := &Execution{ JobName: "test", @@ -229,7 +236,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode2", Group: 1, } - _, _ = s.SetExecution(nil, ex2) + _, _ = s.SetExecution(ctx, ex2) ex3 := &Execution{ JobName: "test", @@ -240,7 +247,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 2, } - _, _ = s.SetExecution(nil, ex3) + _, _ = s.SetExecution(ctx, ex3) ex4 := &Execution{ JobName: "test", @@ -251,7 +258,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 2, } - _, _ = s.SetExecution(nil, ex4) + _, _ = s.SetExecution(ctx, ex4) ex5 := &Execution{ JobName: "test", @@ -261,7 +268,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 3, } - _, _ = s.SetExecution(nil, ex5) + _, _ = s.SetExecution(ctx, ex5) ex6 := &Execution{ JobName: "test", @@ -270,7 +277,7 @@ func Test_computeStatus(t *testing.T) { NodeName: "testNode1", Group: 4, } - _, _ = s.SetExecution(nil, ex6) + _, _ = s.SetExecution(ctx, ex6) // Tests status err = s.db.View(func(tx *buntdb.Tx) error { @@ -296,21 +303,21 @@ func Test_computeStatus(t *testing.T) { func storeJob(t *testing.T, s *Store, jobName string) { job := scaffoldJob() job.Name = jobName - require.NoError(t, s.SetJob(nil, job, false)) + require.NoError(t, s.SetJob(context.Background(), job, false)) } func storeJobWithMetadata(t *testing.T, s *Store, jobName string, metadata map[string]string) { job := scaffoldJob() job.Name = jobName job.Metadata = metadata - require.NoError(t, s.SetJob(nil, job, false)) + require.NoError(t, s.SetJob(context.Background(), job, false)) } func storeChildJob(t *testing.T, s *Store, jobName string, parentName string) { job := scaffoldJob() job.Name = jobName job.ParentJob = parentName - require.NoError(t, s.SetJob(nil, job, false)) + require.NoError(t, s.SetJob(context.Background(), job, false)) } func scaffoldJob() *Job { @@ -325,18 +332,18 @@ func scaffoldJob() *Job { func setupStore(t *testing.T) *Store { log := getTestLogger() - s, err := NewStore(log) + s, err := NewStore(log, otel.Tracer("test")) require.NoError(t, err) return s } func loadJob(t *testing.T, s *Store, name string) *Job { - job, err := s.GetJob(nil, name, nil) + job, err := s.GetJob(context.Background(), name, nil) require.NoError(t, err) return job } func deleteJob(t *testing.T, s *Store, name string) { - _, err := s.DeleteJob(nil, name) + _, err := s.DeleteJob(context.Background(), name) require.NoError(t, err) } diff --git a/dkron/ui.go b/dkron/ui.go index c6465d4d1..a0b0b5e21 100644 --- a/dkron/ui.go +++ b/dkron/ui.go @@ -56,7 +56,7 @@ func (h *HTTPTransport) UI(r *gin.RouterGroup, aclEnabled bool) { if err == nil && p != "/" && p != "/index.html" { ctx.FileFromFS(p, http.FS(assets)) } else { - jobs, err := h.agent.Store.GetJobs(nil, nil) + jobs, err := h.agent.Store.GetJobs(ctx.Request.Context(), nil) if err != nil { h.logger.Error(err) }