Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/devopsfaith/krakend-usage/client"
typesv1 "github.com/distribworks/dkron/v4/gen/proto/types/v1"
"github.com/distribworks/dkron/v4/plugin"
goplugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-metrics"
goplugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
Expand Down Expand Up @@ -885,6 +885,77 @@ func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture {
return a.raft.Apply(cmd, raftTimeout)
}

func activeExecutionKeys(executions []*typesv1.Execution) map[string]struct{} {
keys := make(map[string]struct{}, len(executions))
for _, execution := range executions {
keys[execution.Key()] = struct{}{}
}

return keys
}

func (a *Agent) markExecutionDone(execution *Execution) error {
execDoneReq := &typesv1.ExecutionDoneRequest{
Execution: execution.ToProto(),
}

cmd, err := Encode(ExecutionDoneType, execDoneReq)
if err != nil {
return err
}

af := a.RaftApply(cmd)
if af == nil {
return errors.New("raft apply unavailable")
}

return af.Error()
}

func (a *Agent) cleanupStaleRunningExecutions(ctx context.Context, jobName string, activeExecutionKeys map[string]struct{}, logger *logrus.Entry, staleLogMessage string) ([]*Execution, error) {
runningExecs, err := a.Store.GetRunningExecutions(ctx, jobName)
if err != nil {
return nil, err
}

remainingRunning := make([]*Execution, 0)
now := time.Now().UTC()

for _, exec := range runningExecs {
if _, ok := activeExecutionKeys[exec.Key()]; ok {
continue
}

runningFor := now.Sub(exec.StartedAt)
if runningFor <= DefaultStaleExecutionThreshold {
remainingRunning = append(remainingRunning, exec)
continue
}

logger.WithFields(logrus.Fields{
"job": jobName,
"execution": exec.Key(),
"node": exec.NodeName,
"started_at": exec.StartedAt,
"running_for": runningFor.String(),
}).Warn(staleLogMessage)

exec.FinishedAt = now
exec.Success = false
exec.Output += "\nExecution marked as failed: detected as stale (not active on any node)"

if err := a.markExecutionDone(exec); err != nil {
logger.WithError(err).WithFields(logrus.Fields{
"execution": exec.Key(),
"node": exec.NodeName,
}).Error("agent: Error applying stale execution cleanup")
remainingRunning = append(remainingRunning, exec)
}
}

return remainingRunning, nil
}

// GetRunningJobs returns amount of active jobs of the local agent
func (a *Agent) GetRunningJobs() int {
job := 0
Expand Down
60 changes: 11 additions & 49 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,54 +448,16 @@ func (j *Job) isRunnable(logger *logrus.Entry) bool {
// Check persistent storage for running executions
// This catches executions that might be running on nodes after a leader change
ctx := context.Background()
runningExecs, err := j.Agent.Store.GetRunningExecutions(ctx, j.Name)
runningExecs, err := j.Agent.cleanupStaleRunningExecutions(ctx, j.Name, activeExecutionKeys(exs), logger, "job: Cleaning up stale execution from storage")
if err != nil {
logger.WithError(err).Error("job: Error querying for running executions in storage")
return false
}

for _, exec := range runningExecs {
// Execution is in storage but not in active memory.
// If it has been running longer than the stale threshold, clean it up.
runningFor := time.Now().UTC().Sub(exec.StartedAt)
if runningFor > DefaultStaleExecutionThreshold {
logger.WithFields(logrus.Fields{
"job": j.Name,
"execution": exec.Key(),
"node": exec.NodeName,
"started_at": exec.StartedAt,
"running_for": runningFor.String(),
}).Warn("job: Cleaning up stale execution from storage")

exec.FinishedAt = time.Now().UTC()
exec.Success = false
exec.Output += "\nExecution marked as failed: detected as stale (not active on any node)"

execDoneReq := &proto.ExecutionDoneRequest{
Execution: exec.ToProto(),
}
cmd, err := Encode(ExecutionDoneType, execDoneReq)
if err != nil {
logger.WithError(err).WithFields(logrus.Fields{
"execution": exec.Key(),
"node": exec.NodeName,
}).Error("job: Error encoding stale execution cleanup")
continue
}
af := j.Agent.RaftApply(cmd)
if af != nil {
if err := af.Error(); err != nil {
logger.WithError(err).WithFields(logrus.Fields{
"execution": exec.Key(),
"node": exec.NodeName,
}).Error("job: Error applying stale execution cleanup")
}
}
continue
}

// Execution is not in active memory but hasn't exceeded the stale threshold.
// Conservatively block to avoid potential concurrent execution.
runningFor := time.Now().UTC().Sub(exec.StartedAt)
logger.WithFields(logrus.Fields{
"job": j.Name,
"concurrency": j.Concurrency,
Expand Down Expand Up @@ -652,34 +614,34 @@ func validateMemoryLimit(limit string) error {

// Try to parse with units
limit = strings.ToUpper(strings.TrimSpace(limit))

// Extract the numeric part and unit
var numStr string
var unit string

// Find where the number ends and unit begins
i := 0
for i < len(limit) && (limit[i] >= '0' && limit[i] <= '9' || limit[i] == '.') {
i++
}

if i == 0 {
return fmt.Errorf("invalid memory limit format: %s", limit)
}

numStr = limit[:i]
unit = limit[i:]

// Parse the numeric part
value, err := strconv.ParseFloat(numStr, 64)
if err != nil {
return fmt.Errorf("invalid numeric value in memory limit: %s", numStr)
}

if value <= 0 {
return fmt.Errorf("memory limit must be greater than 0")
}

// Validate and convert unit to bytes
var multiplier int64
switch unit {
Expand All @@ -696,12 +658,12 @@ func validateMemoryLimit(limit string) error {
default:
return fmt.Errorf("unsupported memory unit: %s (supported: B, KB, MB, GB, TB)", unit)
}

// Check for overflow
bytes := int64(value * float64(multiplier))
if bytes <= 0 {
return fmt.Errorf("memory limit too large or causes overflow")
}

return nil
}
29 changes: 29 additions & 0 deletions dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,38 @@ func (a *Agent) establishLeadership(stopCh chan struct{}) error {
if err != nil {
return err
}

activeExecutions, err := a.GetActiveExecutions()
if err != nil {
a.logger.WithError(err).Warn("leader: Failed to query active executions during startup reconciliation")
} else if err := a.reconcileRunningExecutionOrphans(ctx, jobs, activeExecutionKeys(activeExecutions)); err != nil {
a.logger.WithError(err).Warn("leader: Failed to reconcile running execution orphans")
}

return a.sched.Start(jobs, a)
}

func (a *Agent) reconcileRunningExecutionOrphans(ctx context.Context, jobs []*Job, activeExecutionKeys map[string]struct{}) error {
for _, job := range jobs {
runningExecs, err := a.cleanupStaleRunningExecutions(ctx, job.Name, activeExecutionKeys, a.logger, "leader: Cleaning up stale execution from storage during startup reconciliation")
if err != nil {
return err
}

for _, exec := range runningExecs {
a.logger.WithFields(map[string]interface{}{
"job": job.Name,
"execution": exec.Key(),
"node": exec.NodeName,
"started_at": exec.StartedAt,
"running_for": time.Since(exec.StartedAt).String(),
}).Info("leader: Leaving running execution in storage during startup reconciliation")
}
}

return nil
}

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (a *Agent) revokeLeadership() error {
Expand Down
112 changes: 112 additions & 0 deletions dkron/leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package dkron

import (
"context"
"testing"
"time"

"github.com/hashicorp/serf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReconcileRunningExecutionOrphansCleansStaleExecutions(t *testing.T) {
a := startTestLeaderAgent(t)
defer func() {
_ = a.Stop()
}()

ctx := context.Background()
job := scaffoldJob()
job.Name = "stale-running-job"
require.NoError(t, a.Store.SetJob(ctx, job, false))

staleExecution := &Execution{
JobName: job.Name,
StartedAt: time.Now().UTC().Add(-DefaultStaleExecutionThreshold - time.Minute),
FinishedAt: time.Time{},
Success: false,
Output: "running",
NodeName: a.config.NodeName,
Group: time.Now().UTC().Add(-DefaultStaleExecutionThreshold - time.Minute).UnixNano(),
Attempt: 1,
}

_, err := a.Store.SetExecution(ctx, staleExecution)
require.NoError(t, err)

err = a.reconcileRunningExecutionOrphans(ctx, []*Job{job}, map[string]struct{}{})
require.NoError(t, err)

runningExecs, err := a.Store.GetRunningExecutions(ctx, job.Name)
require.NoError(t, err)
assert.Empty(t, runningExecs)

storedExecution, err := a.Store.GetExecution(ctx, job.Name, staleExecution.Key())
require.NoError(t, err)
assert.False(t, storedExecution.FinishedAt.IsZero())
assert.False(t, storedExecution.Success)
assert.Contains(t, storedExecution.Output, "Execution marked as failed: detected as stale")
}

func TestReconcileRunningExecutionOrphansLeavesRecentExecutions(t *testing.T) {
a := startTestLeaderAgent(t)
defer func() {
_ = a.Stop()
}()

ctx := context.Background()
job := scaffoldJob()
job.Name = "recent-running-job"
require.NoError(t, a.Store.SetJob(ctx, job, false))

recentExecution := &Execution{
JobName: job.Name,
StartedAt: time.Now().UTC().Add(-10 * time.Minute),
FinishedAt: time.Time{},
Success: false,
Output: "running",
NodeName: a.config.NodeName,
Group: time.Now().UTC().Add(-10 * time.Minute).UnixNano(),
Attempt: 1,
}

_, err := a.Store.SetExecution(ctx, recentExecution)
require.NoError(t, err)

err = a.reconcileRunningExecutionOrphans(ctx, []*Job{job}, map[string]struct{}{})
require.NoError(t, err)

runningExecs, err := a.Store.GetRunningExecutions(ctx, job.Name)
require.NoError(t, err)
assert.Len(t, runningExecs, 1)
assert.Equal(t, recentExecution.Key(), runningExecs[0].Key())

storedExecution, err := a.Store.GetExecution(ctx, job.Name, recentExecution.Key())
require.NoError(t, err)
assert.True(t, storedExecution.FinishedAt.IsZero())
}

func startTestLeaderAgent(t *testing.T) *Agent {
t.Helper()

ip, returnFn := testutil.TakeIP()
t.Cleanup(returnFn)

c := DefaultConfig()
c.BindAddr = ip.String()
c.NodeName = "leader-test-" + time.Now().UTC().Format("150405.000000000")
c.Server = true
c.LogLevel = logLevel
c.DevMode = true
c.HTTPAddr = "127.0.0.1:0"

a := NewAgent(c)
require.NoError(t, a.Start())

require.Eventually(t, func() bool {
return a.IsLeader()
}, 5*time.Second, 100*time.Millisecond)

return a
}
Loading