Conversation
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
📝 WalkthroughWalkthroughThe changes introduce a stale execution cleanup mechanism for distributed job execution. Helper functions are added to the agent to manage execution state and removal via Raft. The job's cleanup logic is refactored to delegate to these helpers. Leader startup now initiates a reconciliation process to identify and clean up orphaned running executions in storage. Tests validate the reconciliation behavior. Changes
Sequence DiagramsequenceDiagram
participant Leader as Leader (Startup)
participant Agent as Agent
participant JobStore as Job Store
participant RaftLog as Raft
Leader->>Agent: establishLeadership()
Agent->>Agent: GetActiveExecutions()
Agent->>Leader: activeExecutionKeys
loop For each job
Agent->>JobStore: Get running executions
JobStore-->>Agent: runningExecs
Agent->>Agent: filterStaleExecutions(activeKeys, runningExecs)
alt Stale execution detected
Agent->>Agent: markExecutionDone(execution)
Agent->>RaftLog: RaftApply(ExecutionDoneRequest)
RaftLog-->>Agent: error or success
else Recent execution
Agent->>Agent: Log and continue
end
end
Agent->>Leader: Reconciliation complete
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
dkron/agent.go (1)
897-913: Consider adding an IsLeader guard or documenting the precondition.Per coding guidelines, Raft log applications should be preceded by an
a.IsLeader()check. This function applies anExecutionDoneTypecommand viaa.RaftApply, but doesn't verify leadership. While the current call sites (leader startup reconciliation andisRunnableduring scheduling) run on the leader, adding a defensive check or documenting the precondition would prevent misuse.💡 Optional: Add defensive leadership check
func (a *Agent) markExecutionDone(execution *Execution) error { + if !a.IsLeader() { + return errors.New("not leader") + } + execDoneReq := &typesv1.ExecutionDoneRequest{ Execution: execution.ToProto(), }Based on learnings: "Check a.IsLeader() before performing leader-only operations like job scheduling or applying Raft logs."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dkron/agent.go` around lines 897 - 913, markExecutionDone calls a.RaftApply with an ExecutionDoneType command but lacks a leadership guard; add a defensive check at the top of markExecutionDone to return an error (or no-op) if a.IsLeader() is false, or document the precondition clearly. Specifically, in the markExecutionDone function, call a.IsLeader() before creating/encoding the typesv1.ExecutionDoneRequest and invoking a.RaftApply (referencing markExecutionDone, a.IsLeader, a.RaftApply, and ExecutionDoneType) and ensure the function returns a clear error when not leader so callers cannot accidentally apply Raft entries from non-leaders.dkron/leader.go (1)
246-265: Consider early return on first error to avoid partial reconciliation.The function iterates through all jobs and calls
cleanupStaleRunningExecutionsfor each. If an error occurs mid-iteration, it returns immediately, leaving some jobs reconciled and others not. This creates an inconsistent state.Consider either:
- Collecting errors and continuing (best-effort for all jobs)
- Documenting that partial reconciliation is acceptable
Additionally,
time.Since(exec.StartedAt)on line 259 uses local time implicitly, whilecleanupStaleRunningExecutionsusestime.Now().UTC()forrunningForcalculation. This is a minor inconsistency for logging purposes only (not affecting logic), but using.UTC()would be more consistent.💡 Minor consistency fix for time calculation
for _, exec := range runningExecs { a.logger.WithFields(map[string]interface{}{ "job": job.Name, "execution": exec.Key(), "node": exec.NodeName, "started_at": exec.StartedAt, - "running_for": time.Since(exec.StartedAt).String(), + "running_for": time.Now().UTC().Sub(exec.StartedAt).String(), }).Info("leader: Leaving running execution in storage during startup reconciliation") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dkron/leader.go` around lines 246 - 265, reconcileRunningExecutionOrphans currently returns on the first error from cleanupStaleRunningExecutions which causes partial reconciliation; change it to continue iterating all jobs while collecting errors (e.g., append errors to a slice and at the end return either nil or a combined error) so the loop is best-effort across all jobs, and also make the logged running duration consistent by calculating running_for using UTC (use exec.StartedAt.UTC() or convert the reference time to UTC to match cleanupStaleRunningExecutions); reference functions/values: reconcileRunningExecutionOrphans, cleanupStaleRunningExecutions, runningExecs, exec.StartedAt, and the "running_for" log field.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@dkron/agent.go`:
- Around line 897-913: markExecutionDone calls a.RaftApply with an
ExecutionDoneType command but lacks a leadership guard; add a defensive check at
the top of markExecutionDone to return an error (or no-op) if a.IsLeader() is
false, or document the precondition clearly. Specifically, in the
markExecutionDone function, call a.IsLeader() before creating/encoding the
typesv1.ExecutionDoneRequest and invoking a.RaftApply (referencing
markExecutionDone, a.IsLeader, a.RaftApply, and ExecutionDoneType) and ensure
the function returns a clear error when not leader so callers cannot
accidentally apply Raft entries from non-leaders.
In `@dkron/leader.go`:
- Around line 246-265: reconcileRunningExecutionOrphans currently returns on the
first error from cleanupStaleRunningExecutions which causes partial
reconciliation; change it to continue iterating all jobs while collecting errors
(e.g., append errors to a slice and at the end return either nil or a combined
error) so the loop is best-effort across all jobs, and also make the logged
running duration consistent by calculating running_for using UTC (use
exec.StartedAt.UTC() or convert the reference time to UTC to match
cleanupStaleRunningExecutions); reference functions/values:
reconcileRunningExecutionOrphans, cleanupStaleRunningExecutions, runningExecs,
exec.StartedAt, and the "running_for" log field.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0d62828d-008d-415c-ad5e-42d50273c9dc
📒 Files selected for processing (4)
dkron/agent.godkron/job.godkron/leader.godkron/leader_test.go
This pull request introduces a robust mechanism for cleaning up stale running executions in Dkron, ensuring that executions which are no longer active on any node are marked as failed and do not block new job runs. The logic for detecting and cleaning up these "orphaned" executions has been refactored, centralized, and is now invoked both during job scheduling and on leader startup. Comprehensive tests have been added to verify this behavior.
Refactoring and Centralization of Stale Execution Cleanup:
Extracted and centralized the stale execution cleanup logic into a new
Agent.cleanupStaleRunningExecutionsmethod, which marks executions as failed if they are no longer active and have exceeded the stale threshold. This replaces the previous inline logic inJob.isRunnable. (dkron/agent.go,dkron/job.go) [1] [2]Added a helper function
activeExecutionKeysto efficiently track currently active executions by key. (dkron/agent.go)Leadership Startup Reconciliation:
Agent.reconcileRunningExecutionOrphans) that iterates over all jobs and cleans up any orphaned running executions in persistent storage, ensuring a consistent state after leader changes. (dkron/leader.go)Testing Improvements:
dkron/leader_test.goto verify that stale executions are properly cleaned up and that recent (non-stale) executions are left untouched during reconciliation. (dkron/leader_test.go)Code Quality:
dkron/agent.go)Summary by CodeRabbit
Release Notes
Bug Fixes
Tests