Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerF
v1.POST("/pause", h.pauseHandler)
v1.POST("/unpause", h.unpauseHandler)

v1.GET("/stats", h.statsHandler)

v1.POST("/jobs", h.jobCreateOrUpdateHandler)
v1.PATCH("/jobs", h.jobCreateOrUpdateHandler)
// Place fallback routes last
Expand Down Expand Up @@ -547,3 +549,20 @@ func (h *HTTPTransport) pauseStatusHandler(c *gin.Context) {
paused := h.agent.IsNewJobsPaused()
renderJSON(c, http.StatusOK, gin.H{"paused": paused})
}

func (h *HTTPTransport) statsHandler(c *gin.Context) {
daysStr := c.DefaultQuery("days", "30")
days, err := strconv.Atoi(daysStr)
if err != nil {
days = 30
}

stats, err := h.agent.Store.GetExecutionStats(c.Request.Context(), days)
if err != nil {
h.logger.WithError(err).Error("api: Unable to get execution stats")
c.AbortWithStatus(http.StatusInternalServerError)
return
}

renderJSON(c, http.StatusOK, stats)
}
Comment on lines +553 to +568
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate and cap the days query parameter.
Line 554 accepts any value; negative or very large inputs can lead to unexpected ranges or excessive work. Consider enforcing a positive range and a reasonable maximum (e.g., 365).

🔧 Suggested bounds handling
-	daysStr := c.DefaultQuery("days", "30")
-	days, err := strconv.Atoi(daysStr)
-	if err != nil {
-		days = 30
-	}
+	const (
+		defaultStatsDays = 30
+		maxStatsDays     = 365
+	)
+	daysStr := c.DefaultQuery("days", strconv.Itoa(defaultStatsDays))
+	days, err := strconv.Atoi(daysStr)
+	if err != nil || days <= 0 {
+		days = defaultStatsDays
+	}
+	if days > maxStatsDays {
+		days = maxStatsDays
+	}
🤖 Prompt for AI Agents
In `@dkron/api.go` around lines 553 - 568, In statsHandler, validate the days
query param (daysStr) after parsing: if strconv.Atoi fails or the parsed days is
<= 0 return a 400 Bad Request with a clear error message; otherwise cap days to
a reasonable maximum (e.g., maxDays := 365) before calling
h.agent.Store.GetExecutionStats so extremely large values are reduced. Update
the handler to use the validated/capped days value and keep the existing error
logging for the GetExecutionStats call.

23 changes: 23 additions & 0 deletions dkron/execution_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dkron

import "time"

// ExecutionStat represents aggregated execution statistics for a specific time period
type ExecutionStat struct {
// Date is the date for which this stat is recorded (truncated to day)
Date time.Time `json:"date"`
// SuccessCount is the number of successful executions on this date
SuccessCount int `json:"success_count"`
// FailedCount is the number of failed executions on this date
FailedCount int `json:"failed_count"`
}

// ExecutionStats is a collection of execution statistics
type ExecutionStats struct {
Stats []ExecutionStat `json:"stats"`
}

// TotalExecutions returns the total number of executions
func (es *ExecutionStat) TotalExecutions() int {
return es.SuccessCount + es.FailedCount
}
172 changes: 172 additions & 0 deletions dkron/execution_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package dkron

import (
"context"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
)

func TestExecutionStats(t *testing.T) {
logger := logrus.NewEntry(logrus.New())
tracer := noop.NewTracerProvider().Tracer("test")

store, err := NewStore(logger, tracer)
require.NoError(t, err)
defer store.Shutdown()

ctx := context.Background()
now := time.Now()

Comment on lines +22 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stabilize time-dependent tests.
Line 23 and Line 128 use time.Now(), which can be flaky around midnight/DST. Use a fixed UTC time for deterministic results.

🧪 Deterministic time setup
-	now := time.Now()
+	now := time.Date(2025, 1, 2, 12, 0, 0, 0, time.UTC)
...
-	now := time.Now()
+	now := time.Date(2025, 1, 2, 12, 0, 0, 0, time.UTC)

Also applies to: 128-129

🤖 Prompt for AI Agents
In `@dkron/execution_stats_test.go` around lines 22 - 24, The test uses time.Now()
to set the variable now (and again later) which makes the test flaky; replace
time.Now() with a fixed UTC time value (e.g., a time.Date(...) constant) and use
that fixed variable wherever now is used in the file (including the occurrences
around lines where now is set and the later uses at ~128-129) so the test is
deterministic across time zones/DST; update the variable declaration (currently
"now") and any assertions that depend on it (in the test functions in
dkron/execution_stats_test.go) to reference this fixed UTC timestamp.

t.Run("IncrementExecutionStat creates new stat entry", func(t *testing.T) {
err := store.IncrementExecutionStat(ctx, now, true)
require.NoError(t, err)

stats, err := store.GetExecutionStats(ctx, 1)
require.NoError(t, err)
require.Len(t, stats.Stats, 1)

assert.Equal(t, 1, stats.Stats[0].SuccessCount)
assert.Equal(t, 0, stats.Stats[0].FailedCount)
})

t.Run("IncrementExecutionStat increments existing stat", func(t *testing.T) {
// Add another success
err := store.IncrementExecutionStat(ctx, now, true)
require.NoError(t, err)

// Add a failure
err = store.IncrementExecutionStat(ctx, now, false)
require.NoError(t, err)

stats, err := store.GetExecutionStats(ctx, 1)
require.NoError(t, err)
require.Len(t, stats.Stats, 1)

assert.Equal(t, 2, stats.Stats[0].SuccessCount)
assert.Equal(t, 1, stats.Stats[0].FailedCount)
})

t.Run("GetExecutionStats returns empty stats for missing days", func(t *testing.T) {
// Create a new store to have clean data
store2, err := NewStore(logger, tracer)
require.NoError(t, err)
defer store2.Shutdown()

stats, err := store2.GetExecutionStats(ctx, 7)
require.NoError(t, err)
require.Len(t, stats.Stats, 7)

// All should be zero
for _, stat := range stats.Stats {
assert.Equal(t, 0, stat.SuccessCount)
assert.Equal(t, 0, stat.FailedCount)
}
})

t.Run("GetExecutionStats returns stats in chronological order", func(t *testing.T) {
store3, err := NewStore(logger, tracer)
require.NoError(t, err)
defer store3.Shutdown()

// Add stats for yesterday
yesterday := now.AddDate(0, 0, -1)
err = store3.IncrementExecutionStat(ctx, yesterday, true)
require.NoError(t, err)

// Add stats for today
err = store3.IncrementExecutionStat(ctx, now, false)
require.NoError(t, err)

stats, err := store3.GetExecutionStats(ctx, 2)
require.NoError(t, err)
require.Len(t, stats.Stats, 2)

// First stat should be yesterday
assert.Equal(t, 1, stats.Stats[0].SuccessCount)
assert.Equal(t, 0, stats.Stats[0].FailedCount)

// Second stat should be today
assert.Equal(t, 0, stats.Stats[1].SuccessCount)
assert.Equal(t, 1, stats.Stats[1].FailedCount)
})

t.Run("TotalExecutions returns sum of success and failed", func(t *testing.T) {
stat := ExecutionStat{
SuccessCount: 5,
FailedCount: 3,
}
assert.Equal(t, 8, stat.TotalExecutions())
})
}

func TestSetExecutionDoneUpdatesStats(t *testing.T) {
logger := logrus.NewEntry(logrus.New())
tracer := noop.NewTracerProvider().Tracer("test")

store, err := NewStore(logger, tracer)
require.NoError(t, err)
defer store.Shutdown()

ctx := context.Background()

// Create a test job
testJob := &Job{
Name: "stats-test-job",
Schedule: "@manually",
Executor: "shell",
ExecutorConfig: map[string]string{"command": "/bin/true"},
}

err = store.SetJob(ctx, testJob, true)
require.NoError(t, err)

now := time.Now()

// Create a successful execution
exec1 := &Execution{
JobName: testJob.Name,
Group: now.UnixNano(),
StartedAt: now,
FinishedAt: now.Add(time.Second),
NodeName: "test-node",
Success: true,
Output: "success output",
}

// SetExecutionDone should update the stats
_, err = store.SetExecutionDone(ctx, exec1)
require.NoError(t, err)

// Check stats were updated
stats, err := store.GetExecutionStats(ctx, 1)
require.NoError(t, err)
require.Len(t, stats.Stats, 1)
assert.Equal(t, 1, stats.Stats[0].SuccessCount)
assert.Equal(t, 0, stats.Stats[0].FailedCount)

// Create a failed execution
exec2 := &Execution{
JobName: testJob.Name,
Group: now.UnixNano() + 1,
StartedAt: now,
FinishedAt: now.Add(time.Second),
NodeName: "test-node",
Success: false,
Output: "failed output",
}

_, err = store.SetExecutionDone(ctx, exec2)
require.NoError(t, err)

// Check stats were updated
stats, err = store.GetExecutionStats(ctx, 1)
require.NoError(t, err)
require.Len(t, stats.Stats, 1)
assert.Equal(t, 1, stats.Stats[0].SuccessCount)
assert.Equal(t, 1, stats.Stats[0].FailedCount)
}
2 changes: 1 addition & 1 deletion dkron/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestGRPCExecutionDone(t *testing.T) {
t.Run("Test job retry with broken stream error", func(t *testing.T) {
// Use the actual error format that would be returned when a broken stream occurs
brokenStreamErrorMsg := ErrBrokenStream.Error() + ": rpc error: code = Internal desc = grpc: error while marshaling"

testJob.Name = "test-retry"
testJob.Schedule = "0 * * * * *" // Every minute at 0 seconds (6-field format)
testJob.Retries = 2
Expand Down
5 changes: 5 additions & 0 deletions dkron/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"context"
"io"
"time"
)

// Storage is the interface that should be used by any
Expand All @@ -25,4 +26,8 @@ type Storage interface {
Shutdown() error
Snapshot(w io.WriteCloser) error
Restore(r io.ReadCloser) error
// GetExecutionStats retrieves execution statistics for the specified number of days
GetExecutionStats(ctx context.Context, days int) (*ExecutionStats, error)
// IncrementExecutionStat increments the execution statistics for a given date
IncrementExecutionStat(ctx context.Context, date time.Time, success bool) error
}
114 changes: 114 additions & 0 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (

jobsPrefix = "jobs"
executionsPrefix = "executions"
statsPrefix = "stats"
)

var (
Expand Down Expand Up @@ -267,6 +268,7 @@ func (s *Store) SetExecutionDone(ctx context.Context, execution *Execution) (boo
ctx, span := s.tracer.Start(ctx, "buntdb.set.execution_done")
defer span.End()

var success bool
err := s.db.Update(func(tx *buntdb.Tx) error {
// Load the job from the store
var pbj dkronpb.Job
Expand All @@ -287,6 +289,7 @@ func (s *Store) SetExecutionDone(ctx context.Context, execution *Execution) (boo
return err
}

success = pbe.Success
if pbe.Success {
pbj.LastSuccess.HasValue = true
pbj.LastSuccess.Time = pbe.FinishedAt
Expand All @@ -311,6 +314,12 @@ func (s *Store) SetExecutionDone(ctx context.Context, execution *Execution) (boo
return err
}

// Update execution statistics for the day
if err := s.incrementStatTxFunc(execution.FinishedAt, success)(tx); err != nil {
s.logger.WithError(err).Warn("store: Failed to update execution stats")
// Don't fail the whole operation if stats update fails
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -880,3 +889,108 @@ func trimDirectoryKey(key []byte) []byte {
func isDirectoryKey(key []byte) bool {
return len(key) > 0 && key[len(key)-1] == ':'
}

// formatStatDate formats a time to the date key format used in stats storage
func formatStatDate(t time.Time) string {
return t.UTC().Truncate(24 * time.Hour).Format("2006-01-02")
}

// incrementStatTxFunc returns a transaction function to increment execution stats
func (s *Store) incrementStatTxFunc(date time.Time, success bool) func(tx *buntdb.Tx) error {
return func(tx *buntdb.Tx) error {
dateKey := formatStatDate(date)
key := fmt.Sprintf("%s:%s", statsPrefix, dateKey)

var stat ExecutionStat

item, err := tx.Get(key)
if err != nil && err != buntdb.ErrNotFound {
return err
}

if err == buntdb.ErrNotFound {
// Create new stat entry
stat = ExecutionStat{
Date: date.UTC().Truncate(24 * time.Hour),
SuccessCount: 0,
FailedCount: 0,
}
} else {
// Parse existing stat
if err := json.Unmarshal([]byte(item), &stat); err != nil {
return err
}
}

// Increment the appropriate counter
if success {
stat.SuccessCount++
} else {
stat.FailedCount++
}

// Save the updated stat
data, err := json.Marshal(stat)
if err != nil {
return err
}

_, _, err = tx.Set(key, string(data), nil)
return err
}
}

// IncrementExecutionStat increments the execution statistics for a given date
func (s *Store) IncrementExecutionStat(ctx context.Context, date time.Time, success bool) error {
_, span := s.tracer.Start(ctx, "buntdb.increment.execution_stat")
defer span.End()

return s.db.Update(s.incrementStatTxFunc(date, success))
}

// GetExecutionStats retrieves execution statistics for the specified number of days
func (s *Store) GetExecutionStats(ctx context.Context, days int) (*ExecutionStats, error) {
_, span := s.tracer.Start(ctx, "buntdb.get.execution_stats")
defer span.End()

if days <= 0 {
days = 30 // Default to 30 days
}

stats := &ExecutionStats{
Stats: make([]ExecutionStat, 0, days),
}

// Generate date keys for the requested period
today := time.Now().UTC().Truncate(24 * time.Hour)

err := s.db.View(func(tx *buntdb.Tx) error {
for i := days - 1; i >= 0; i-- {
date := today.AddDate(0, 0, -i)
dateKey := formatStatDate(date)
key := fmt.Sprintf("%s:%s", statsPrefix, dateKey)

item, err := tx.Get(key)
if err == buntdb.ErrNotFound {
// No stats for this day, add zero entry
stats.Stats = append(stats.Stats, ExecutionStat{
Date: date,
SuccessCount: 0,
FailedCount: 0,
})
continue
} else if err != nil {
return err
}

var stat ExecutionStat
if err := json.Unmarshal([]byte(item), &stat); err != nil {
return err
}
stats.Stats = append(stats.Stats, stat)
}
return nil
})

return stats, err
Comment thread
vcastellm marked this conversation as resolved.
}
Loading
Loading