Skip to content

Feature: Tracing via OpenTelemetry #1709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (
"time"

"github.com/devopsfaith/krakend-usage/client"
metrics "github.com/hashicorp/go-metrics"
"github.com/distribworks/dkron/v4/plugin"
proto "github.com/distribworks/dkron/v4/types"
"github.com/hashicorp/go-metrics"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand Down Expand Up @@ -124,6 +126,8 @@ type Agent struct {

// logger is the log entry to use fo all logging calls
logger *logrus.Entry

tracer trace.Tracer
}

// ProcessorFactory is a function type that creates a new instance
Expand All @@ -146,6 +150,7 @@ func NewAgent(config *Config, options ...AgentOption) *Agent {
config: config,
retryJoinCh: make(chan error),
serverLookup: NewServerLookup(),
tracer: otel.Tracer("dkron-agent"),
}

for _, option := range options {
Expand Down Expand Up @@ -189,6 +194,15 @@ func (a *Agent) Start() error {
a.logger.Fatal("agent: Can not setup metrics")
}

// Setup tracing
if a.config.OpenTelemetryEndpoint != "" {
a.logger.Info("agent: Initializing OpenTelemetry")
_, err := initTracer(context.Background(), a.config.OpenTelemetryEndpoint)
if err != nil {
a.logger.Fatal("agent: Can not setup tracing")
}
}

// Expose the node name
expNode.Set(a.config.NodeName)

Expand Down Expand Up @@ -388,7 +402,9 @@ func (a *Agent) setupRaft() error {
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
store, err := NewStore(a.logger)

storeTracer := otel.Tracer("dkron-store")
store, err := NewStore(a.logger, storeTracer)
if err != nil {
a.logger.WithError(err).Fatal("dkron: Error initializing store")
}
Expand Down Expand Up @@ -549,7 +565,8 @@ func (a *Agent) SetConfig(c *Config) {
// StartServer launch a new dkron server process
func (a *Agent) StartServer() {
if a.Store == nil {
s, err := NewStore(a.logger)
storeTracer := otel.Tracer("dkron-store")
s, err := NewStore(a.logger, storeTracer)
if err != nil {
a.logger.WithError(err).Fatal("dkron: Error initializing store")
}
Expand Down
59 changes: 30 additions & 29 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"sort"
"strconv"
"strings"

"github.com/distribworks/dkron/v4/types"
"github.com/gin-contrib/cors"
Expand All @@ -16,7 +17,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/tidwall/buntdb"
status "google.golang.org/grpc/status"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -56,6 +58,11 @@ func (h *HTTPTransport) ServeHTTP() {
config.AllowHeaders = []string{"*"}
config.ExposeHeaders = []string{"*"}

// Skip tracing on auxiliary endpoints
filter := func(c *gin.Context) bool {
return !(strings.Contains(c.FullPath(), "metrics") || strings.Contains(c.FullPath(), "health") || strings.Contains(c.FullPath(), "ui"))
}
rootPath.Use(otelgin.Middleware("dkron-api", otelgin.WithGinFilter(filter))) // Adds OpenTelemetry tracing to HTTP API
rootPath.Use(cors.New(config))
rootPath.Use(h.MetaMiddleware())

Expand Down Expand Up @@ -160,16 +167,14 @@ func (h *HTTPTransport) jobsHandler(c *gin.Context) {
order := c.DefaultQuery("_order", "ASC")
q := c.Query("q")

jobs, err := h.agent.Store.GetJobs(
&JobOptions{
Metadata: metadata,
Sort: sort,
Order: order,
Query: q,
Status: c.Query("status"),
Disabled: c.Query("disabled"),
},
)
jobs, err := h.agent.Store.GetJobs(c.Request.Context(), &JobOptions{
Metadata: metadata,
Sort: sort,
Order: order,
Query: q,
Status: c.Query("status"),
Disabled: c.Query("disabled"),
})
if err != nil {
h.logger.WithError(err).Error("api: Unable to get jobs, store not reachable.")
return
Expand Down Expand Up @@ -199,7 +204,7 @@ func (h *HTTPTransport) jobsHandler(c *gin.Context) {
func (h *HTTPTransport) jobGetHandler(c *gin.Context) {
jobName := c.Param("job")

job, err := h.agent.Store.GetJob(jobName, nil)
job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil)
if err != nil {
h.logger.Error(err)
}
Expand Down Expand Up @@ -348,19 +353,17 @@ func (h *HTTPTransport) executionsHandler(c *gin.Context) {
outputSizeLimit = -1
}

job, err := h.agent.Store.GetJob(jobName, nil)
job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil)
if err != nil {
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

executions, err := h.agent.Store.GetExecutions(job.Name,
&ExecutionOptions{
Sort: sort,
Order: order,
Timezone: job.GetTimeLocation(),
},
)
executions, err := h.agent.Store.GetExecutions(c.Request.Context(), job.Name, &ExecutionOptions{
Sort: sort,
Order: order,
Timezone: job.GetTimeLocation(),
})
if err == buntdb.ErrNotFound {
executions = make([]*Execution, 0)
} else if err != nil {
Expand Down Expand Up @@ -389,19 +392,17 @@ func (h *HTTPTransport) executionHandler(c *gin.Context) {
jobName := c.Param("job")
executionName := c.Param("execution")

job, err := h.agent.Store.GetJob(jobName, nil)
job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil)
if err != nil {
_ = c.AbortWithError(http.StatusNotFound, err)
return
}

executions, err := h.agent.Store.GetExecutions(job.Name,
&ExecutionOptions{
Sort: "",
Order: "",
Timezone: job.GetTimeLocation(),
},
)
executions, err := h.agent.Store.GetExecutions(c.Request.Context(), job.Name, &ExecutionOptions{
Sort: "",
Order: "",
Timezone: job.GetTimeLocation(),
})

if err != nil {
h.logger.Error(err)
Expand Down Expand Up @@ -457,7 +458,7 @@ func (h *HTTPTransport) leaveHandler(c *gin.Context) {
func (h *HTTPTransport) jobToggleHandler(c *gin.Context) {
jobName := c.Param("job")

job, err := h.agent.Store.GetJob(jobName, nil)
job, err := h.agent.Store.GetJob(c.Request.Context(), jobName, nil)
if err != nil {
_ = c.AbortWithError(http.StatusNotFound, err)
return
Expand Down
6 changes: 4 additions & 2 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dkron

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -355,6 +356,7 @@ func TestAPIJobOutputTruncate(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, string(body), "[]")

ctx := context.Background()
testExecution1 := &Execution{
JobName: "test_job",
StartedAt: time.Now().UTC(),
Expand All @@ -371,11 +373,11 @@ func TestAPIJobOutputTruncate(t *testing.T) {
Output: "test " + strings.Repeat("longer output... ", 100),
NodeName: "testNode2",
}
_, err = a.Store.SetExecution(testExecution1)
_, err = a.Store.SetExecution(ctx, testExecution1)
if err != nil {
t.Fatal(err)
}
_, err = a.Store.SetExecution(testExecution2)
_, err = a.Store.SetExecution(ctx, testExecution2)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions dkron/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ type Config struct {

// CronitorEndpoint is the endpoint to call for cronitor notifications.
CronitorEndpoint string `mapstructure:"cronitor-endpoint"`

// OpenTelemetryEndpoint is the gRPC endpoint to send OpenTelemetry traces to. If empty, no traces will be sent.
OpenTelemetryEndpoint string `mapstructure:"otel-endpoint"`
}

// DefaultBindPort is the default port that dkron will use for Serf communication
Expand Down Expand Up @@ -327,6 +330,7 @@ Format there: https://golang.org/pkg/time/#ParseDuration`)
cmdFlags.StringSlice("dog-statsd-tags", []string{}, "Datadog tags, specified as key:value")
cmdFlags.String("statsd-addr", "", "Statsd address")
cmdFlags.Bool("enable-prometheus", false, "Enable serving prometheus metrics")
cmdFlags.String("otel-endpoint", "", "OpenTelemetry gRPC endpoint")
cmdFlags.Bool("disable-usage-stats", c.DisableUsageStats, "Disable sending anonymous usage stats")

return cmdFlags
Expand Down
27 changes: 15 additions & 12 deletions dkron/fsm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dkron

import (
"context"
"io"

dkronpb "github.com/distribworks/dkron/v4/types"
Expand Down Expand Up @@ -57,15 +58,17 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} {

d.logger.WithField("command", msgType).Debug("fsm: received command")

ctx := context.Background()

switch msgType {
case SetJobType:
return d.applySetJob(buf[1:])
return d.applySetJob(ctx, buf[1:])
case DeleteJobType:
return d.applyDeleteJob(buf[1:])
return d.applyDeleteJob(ctx, buf[1:])
case ExecutionDoneType:
return d.applyExecutionDone(buf[1:])
return d.applyExecutionDone(ctx, buf[1:])
case SetExecutionType:
return d.applySetExecution(buf[1:])
return d.applySetExecution(ctx, buf[1:])
}

// Check enterprise only message types.
Expand All @@ -76,31 +79,31 @@ func (d *dkronFSM) Apply(l *raft.Log) interface{} {
return nil
}

func (d *dkronFSM) applySetJob(buf []byte) interface{} {
func (d *dkronFSM) applySetJob(ctx context.Context, buf []byte) interface{} {
var pj dkronpb.Job
if err := proto.Unmarshal(buf, &pj); err != nil {
return err
}
job := NewJobFromProto(&pj, d.logger)
if err := d.store.SetJob(job, false); err != nil {
if err := d.store.SetJob(ctx, job, false); err != nil {
return err
}
return nil
}

func (d *dkronFSM) applyDeleteJob(buf []byte) interface{} {
func (d *dkronFSM) applyDeleteJob(ctx context.Context, buf []byte) interface{} {
var djr dkronpb.DeleteJobRequest
if err := proto.Unmarshal(buf, &djr); err != nil {
return err
}
job, err := d.store.DeleteJob(djr.GetJobName())
job, err := d.store.DeleteJob(ctx, djr.GetJobName())
if err != nil {
return err
}
return job
}

func (d *dkronFSM) applyExecutionDone(buf []byte) interface{} {
func (d *dkronFSM) applyExecutionDone(ctx context.Context, buf []byte) interface{} {
var execDoneReq dkronpb.ExecutionDoneRequest
if err := proto.Unmarshal(buf, &execDoneReq); err != nil {
return err
Expand All @@ -110,18 +113,18 @@ func (d *dkronFSM) applyExecutionDone(buf []byte) interface{} {
d.logger.WithField("execution", execution.Key()).
WithField("output", string(execution.Output)).
Debug("fsm: Setting execution")
_, err := d.store.SetExecutionDone(execution)
_, err := d.store.SetExecutionDone(ctx, execution)

return err
}

func (d *dkronFSM) applySetExecution(buf []byte) interface{} {
func (d *dkronFSM) applySetExecution(ctx context.Context, buf []byte) interface{} {
var pbex dkronpb.Execution
if err := proto.Unmarshal(buf, &pbex); err != nil {
return err
}
execution := NewExecutionFromProto(&pbex)
key, err := d.store.SetExecution(execution)
key, err := d.store.SetExecution(ctx, execution)
if err != nil {
return err
}
Expand Down
Loading
Loading