Skip to content

Comments

feat: scheduler catchup#1647

Closed
yottahmd wants to merge 17 commits intomainfrom
scheduler-catchup
Closed

feat: scheduler catchup#1647
yottahmd wants to merge 17 commits intomainfrom
scheduler-catchup

Conversation

@yottahmd
Copy link
Collaborator

@yottahmd yottahmd commented Feb 8, 2026

Summary by CodeRabbit

Release Notes

  • New Features

    • Catch-up mechanism automatically replays missed DAG runs during scheduler downtime
    • Scheduled-time parameter tracked through DAG execution and exposed to steps via environment variables
    • New catch-up policy configuration options (off, latest, all) for flexible handling of missed runs
  • Configuration

    • New scheduler settings for catch-up run limits (global and per-DAG) and dispatch rate limiting
    • Catch-up window parameter added to schedule configuration

@coderabbitai
Copy link

coderabbitai bot commented Feb 8, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review
📝 Walkthrough

Walkthrough

This PR implements a catch-up scheduling system for replaying missed DAG runs during scheduler downtime. It adds catch-up policy types, extends schedule configurations with per-entry catch-up settings, introduces a CatchupEngine for generating and dispatching missed candidates, propagates scheduled-time through CLI commands and runtime layers, and persists per-DAG watermarks for state tracking.

Changes

Cohort / File(s) Summary
Core Enum & Policy Types
internal/core/catchup_policy.go, internal/core/catchup_policy_test.go
New CatchupPolicy enum with Off/Latest/All values and parsing; includes String() serialization and ParseCatchupPolicy() parser.
Trigger Type Extension
internal/core/status.go
Added TriggerTypeCatchUp trigger type and updated String()/ParseTriggerType() to recognize "catchup".
Schedule Structure
internal/core/dag.go, internal/core/spec/dag.go, internal/core/spec/schedule.go, internal/core/spec/types/schedule.go, internal/core/spec/types/schedule_test.go
Extended Schedule with Catchup and CatchupWindow fields; updated marshaling/unmarshaling; added ScheduleEntry type for structured catch-up metadata in YAML; refactored schedule parsing to support both simple and structured entries; updated tests.
Status & Agent Fields
internal/core/exec/runstatus.go, internal/core/exec/env.go, internal/runtime/agent/agent.go
Added ScheduledTime field to DAGRunStatus; introduced DAGU_SCHEDULED_TIME and DAGU_IS_CATCHUP environment variables; wired ScheduledTime through Agent Options and execution context.
Duration Parser
internal/cmn/duration/parse.go, internal/cmn/duration/parse_test.go
New Parse() function supporting day, hour, minute, second units (e.g., "2d12h"); includes validation and error wrapping.
Scheduler Configuration
internal/cmn/config/config.go, internal/cmn/config/definition.go, internal/cmn/config/loader.go, internal/cmn/config/loader_test.go
Added MaxGlobalCatchupRuns, MaxCatchupRunsPerDAG, and CatchupRateLimit fields to Scheduler config; implemented parsing and defaults (100, 20, 100ms respectively).
CLI Command Layer
internal/cmd/enqueue.go, internal/cmd/start.go, internal/cmd/exec.go
Added scheduled-time flag to enqueue and start commands; propagated scheduledTime parameter through tryExecuteDAG, handleSubDAGRun, and executeDAGRun function signatures.
Runtime Options & Transform
internal/runtime/subcmd.go, internal/runtime/transform/status.go
Extended StartOptions and EnqueueOptions with ScheduledTime field; added WithScheduledTime() status transform option; wired into command construction for distributed execution.
DAG State Persistence
internal/service/scheduler/dag_state_store.go, internal/service/scheduler/dag_state_store_test.go
New DAGStateStore for per-DAG watermark persistence under {dataDir}/scheduler/dag-state/; implements Load/Save/SaveAll/LoadAll/Migrate; includes safe filename generation with collision handling; comprehensive tests covering state I/O and legacy migration.
Catch-up Engine
internal/service/scheduler/catchup.go, internal/service/scheduler/catchup_test.go
New CatchupEngine subsystem generating and dispatching missed run candidates; includes candidate generation with policy application, window/cap enforcement, dispatch coordination, duplicate detection, watermark updates, and rate limiting; extensive test suite validating all scenarios.
Scheduler Integration
internal/service/scheduler/scheduler.go, internal/service/scheduler/dagrunjob.go, internal/service/scheduler/entryreader.go, internal/service/scheduler/mocks_test.go
Integrated CatchupEngine and DAGStateStore into Scheduler; added watermark migration and per-DAG state persistence in cronLoop; propagated scheduledTime from DAGRunJob.Start through HandleJob; added Registry() method to EntryReader for DAG snapshot access.
DAG Executor Refactor
internal/service/scheduler/dag_executor.go, internal/service/scheduler/dag_executor_test.go, internal/service/scheduler/queue_processor.go
Updated HandleJob and ExecuteDAG to accept scheduledTime parameter; added formatScheduledTime helper; wired ScheduledTime into Start/Enqueue options for both distributed and local paths; queue processor now parses and propagates scheduledTime from status.
Documentation
rfcs/004-schedule-catchup.md
Comprehensive RFC documenting catch-up scheduling design, configuration semantics, YAML shapes, policies, watermark advancement, observability, and migration guidance; replaces legacy misfire-based approach with unified catchup mechanism.

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler
    participant CatchupEngine
    participant DAGStateStore
    participant DAGExecutor
    participant QueueStore

    Scheduler->>CatchupEngine: Run(dags)
    CatchupEngine->>DAGStateStore: LoadAll(dags)
    DAGStateStore-->>CatchupEngine: per-DAG states
    CatchupEngine->>CatchupEngine: generateCandidates(policy, window)
    CatchupEngine->>CatchupEngine: applyPolicy(Off/Latest/All)
    CatchupEngine->>CatchupEngine: enforceGlobalCap
    CatchupEngine->>CatchupEngine: enforcePerDAGCap
    loop For each candidate
        CatchupEngine->>CatchupEngine: isDuplicate(check recent)
        alt Not duplicate
            CatchupEngine->>DAGExecutor: dispatchCandidate
            DAGExecutor->>QueueStore: Enqueue(dagRun, scheduledTime)
            QueueStore-->>DAGExecutor: success
            DAGExecutor-->>CatchupEngine: dispatched
        else Duplicate
            CatchupEngine-->>CatchupEngine: skip
        end
    end
    CatchupEngine->>DAGStateStore: SaveAll(dags, newWatermark)
    DAGStateStore-->>CatchupEngine: saved
    CatchupEngine-->>Scheduler: CatchupResult{Dispatched, Skipped}
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • feat: shared-nothing worker #1564: Modifies internal/runtime/agent/agent.go to wire agent options and status-related behavior, directly overlapping with this PR's Agent ScheduledTime field additions.
  • feat: Trigger visibility #1612: Propagates trigger-type across enqueue/start/execute paths and DAGRun status, aligning with this PR's scheduledTime and trigger-type propagation throughout the stack.
  • feat: default execution mode #1642: Updates internal/service/scheduler/dag_executor.go HandleJob/ExecuteDAG signature and dispatch logic, directly overlapping with the executor refactoring in this PR.
🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.94% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: scheduler catchup' directly summarizes the main change in the pull request—introducing a catch-up feature for the scheduler to replay missed DAG runs.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch scheduler-catchup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rfcs/004-schedule-catchup.md (1)

282-286: ⚠️ Potential issue | 🟠 Major

DAGU_IS_CATCHUP value documented as "true" but implementation uses "1".

See the corresponding issue flagged in internal/runtime/agent/agent.go line 417. The RFC documents the value as "true", which should be the authoritative specification. Ensure the code is updated to match.

internal/service/scheduler/dag_executor.go (1)

94-105: ⚠️ Potential issue | 🟡 Minor

scheduledTime is accepted but not propagated to the distributed execution task.

When ExecuteDAG takes the distributed path (lines 94-105), scheduledTime is not passed to CreateTask. In contrast, the local execution path explicitly passes it via formatScheduledTime(scheduledTime) (line 117).

For the main flows, scheduledTime is preserved elsewhere: for distributed START, it's persisted in the queue entry before dispatch; for RETRY, it's embedded in previousStatus (as JSON). However, this inconsistency creates confusion and means the parameter is unused in the distributed code path. If ExecuteDAG is called directly for distributed execution outside the normal HandleJob/queue flows, the scheduled time is effectively lost.

Consider whether CreateTask should accept a scheduledTime option for consistency, or document why the distributed path intentionally omits it.

🤖 Fix all issues with AI agents
In `@internal/core/dag.go`:
- Around line 657-661: UnmarshalJSON currently parses alias.CatchupWindow with
time.ParseDuration which doesn't accept day units, causing mismatch with
buildSchedulerFromEntries (which uses duration.Parse); update the UnmarshalJSON
logic that sets s.CatchupWindow (when alias.CatchupWindow != "") to use the same
duration.Parse function as the spec builder (and keep the existing error
wrapping message), ensuring alias.CatchupWindow, s.CatchupWindow, and
UnmarshalJSON behave consistently with buildSchedulerFromEntries.

In `@internal/runtime/agent/agent.go`:
- Around line 410-421: The env var value for catch-up is being set to "1" which
contradicts the RFC; change the value to "true" where
extraEnvs[exec.EnvKeyIsCatchup] is assigned in the scheduled-time injection
block (inside the a.scheduledTime conditional that uses runtime.GetDAGContext,
rCtx.EnvScope.WithEntries, eval.EnvSourceDAGEnv and runtime.WithDAGContext) so
the map entry uses "true" when a.triggerType == core.TriggerTypeCatchUp, keeping
all other logic intact.

In `@internal/service/scheduler/catchup.go`:
- Around line 83-91: The first-run branch uses c.clock() twice causing
inconsistent timestamps: replace the second call to c.clock() inside the
hasAnyWatermark == false branch with the previously captured variable catchupTo
so the saved watermarks and the catchup reference use the same timestamp; update
the SaveAll invocation (called via c.dagStateStore.SaveAll) to pass catchupTo
instead of c.clock(), leaving hasAnyWatermark, catchupTo and result.Duration
logic unchanged.
- Around line 113-145: The loop currently breaks on dispatch failure but
unconditionally calls c.dagStateStore.SaveAll(dags, catchupTo), advancing every
DAG watermark; change this so only DAGs that completed catch-up are advanced to
catchupTo: track successful DAGs inside the dispatch loop (e.g., a map keyed by
cand.dag.Name or dag ID updated when you increment result.Dispatched and after
successful per-DAG Save(cand.dag, dagState{LastTick: cand.scheduledTime})), and
after the loop call SaveAll only for that subset (or iterate and call Save(dag,
dagState{LastTick: catchupTo}) for each successful DAG). Ensure failing DAGs are
left at their last saved state (you already Save per-dispatch in
dispatchCandidate flow) and do not include them in the final SaveAll(catchupTo)
advance.
🧹 Nitpick comments (14)
internal/cmd/enqueue.go (1)

68-70: Silently discarding the error from ctx.StringParam is inconsistent with other parameters in this function.

Lines 35–37 and 49–51 both check err from ctx.StringParam. While scheduled-time is optional and the flag is registered (so it should always succeed), swallowing the error silently makes debugging harder if the param name ever gets out of sync. The same pattern appears in start.go line 104.

Suggested fix
-	scheduledTime, _ := ctx.StringParam("scheduled-time")
-
-	return enqueueDAGRun(ctx, dag, runID, triggerType, scheduledTime)
+	scheduledTime, err := ctx.StringParam("scheduled-time")
+	if err != nil {
+		return fmt.Errorf("failed to get scheduled-time: %w", err)
+	}
+
+	return enqueueDAGRun(ctx, dag, runID, triggerType, scheduledTime)
internal/cmd/start.go (2)

104-104: Same error-discarding pattern as enqueue.go.

See the comment on enqueue.go line 68 — same concern applies here. Consider handling the error consistently.


335-335: Function signatures are growing long — consider an options struct.

handleSubDAGRun now takes 9 parameters and executeDAGRun takes 8. Each new cross-cutting concern (triggerType, scheduledTime) adds another parameter to every function in the call chain. An options struct would make future additions non-breaking and improve readability.

Not blocking for this PR since the pattern is pre-existing, but worth considering as a follow-up.

Also applies to: 371-371

rfcs/004-schedule-catchup.md (1)

368-392: Add language identifiers to fenced code blocks.

The log output and CLI output code blocks lack language identifiers, which triggers markdownlint MD040 warnings. Use text or log for log output blocks, and text for CLI output blocks.

For example:

-```
+```text
 level=INFO msg="Catch-up started" ...

Also applies to: 419-438, 522-560

internal/cmn/config/loader_test.go (1)

487-489: Consider adding a test case that exercises explicit YAML/env overrides for the catchup fields.

The current YAML test doesn't set maxGlobalCatchupRuns, maxCatchupRunsPerDAG, or catchupRateLimit in the YAML input, so it only tests defaults. A dedicated test case (or extending the YAML block) would confirm that user-provided values override the defaults correctly.

internal/service/scheduler/queue_processor.go (1)

382-387: Silently discarding ParseTime error may hide issues.

If status.ScheduledTime is non-empty but malformed, a zero time.Time will silently propagate to ExecuteDAG. Consider logging at debug level on parse failure so operators can diagnose mismatches.

Suggested change
 	go func() {
-		scheduledTime, _ := stringutil.ParseTime(status.ScheduledTime)
+		scheduledTime, err := stringutil.ParseTime(status.ScheduledTime)
+		if err != nil && status.ScheduledTime != "" {
+			logger.Debug(ctx, "Failed to parse scheduledTime, using zero value", tag.Error(err))
+		}
 		if err := p.dagExecutor.ExecuteDAG(ctx, dag, coordinatorv1.Operation_OPERATION_RETRY, runID, status, status.TriggerType, scheduledTime); err != nil {
internal/core/catchup_policy_test.go (2)

28-31: Consider wrapping table iterations in t.Run for better failure diagnostics.

When a table case fails, the test output won't indicate which row. Using t.Run(tt.want, ...) (or a name field) would pinpoint failures. This is a minor nit for a small table.


50-58: Same t.Run suggestion applies to the parse test loop.

internal/service/scheduler/catchup_test.go (2)

226-271: Global cap test relies on deterministic map iteration order.

dags is a map[string]*core.DAG, and Go maps have non-deterministic iteration order. With MaxGlobalCatchupRuns = 3 and two DAGs each generating 24 candidates, the test asserts exactly 3 total candidates — but the per-DAG distribution depends on which DAG is visited first by generateCandidates. If the implementation iterates the map and accumulates candidates until the global cap is hit, the result count (3) is stable, but if you ever want to assert which DAG's candidates appear, this will be flaky.

The current assertion (assert.Equal(t, 3, len(candidates))) is fine as long as only the total count matters. Just noting this for awareness in case future assertions on candidate contents are added.


31-65: Consider extracting the mock to a shared test helper or using a code generator.

The catchupMockDAGRunStore duplicates mock boilerplate for the exec.DAGRunStore interface. If this interface grows, this mock will need manual updates.

Based on learnings: "Integration fixtures should live in internal/test for use in test files."

internal/service/scheduler/entryreader.go (1)

211-218: Registry() is read-only but acquires an exclusive lock.

The entryReaderImpl.lock is a sync.Mutex, so Registry() blocks all other operations (including other Registry() callers). If this becomes a hot path (e.g., called every catchup tick), consider upgrading to sync.RWMutex so reads can proceed concurrently.

internal/service/scheduler/scheduler.go (1)

339-347: Per-tick file I/O for watermark persistence — consider the write volume.

SaveAll writes one JSON file per catch-up-enabled DAG on every scheduler tick (once per minute). With many catch-up-enabled DAGs this creates sustained filesystem churn. For most deployments this is fine, but consider batching into a single file or only writing when the set of catch-up DAGs changes, if this becomes a bottleneck.

internal/cmn/config/loader.go (1)

732-734: Missing environment variable bindings for new catchup config fields.

The other scheduler settings (scheduler.port, lockStaleThreshold, etc.) have corresponding DAGU_SCHEDULER_* env var bindings in the envBindings slice (lines 1208-1211). The three new fields (MaxGlobalCatchupRuns, MaxCatchupRunsPerDAG, CatchupRateLimit) lack env var bindings, making them config-file-only. If environment-based configuration is expected to be supported (as it is for sibling fields), add bindings.

Proposed env bindings (to add near line 1211)
 	{key: "scheduler.zombieDetectionInterval", env: "SCHEDULER_ZOMBIE_DETECTION_INTERVAL"},
+	{key: "scheduler.maxGlobalCatchupRuns", env: "SCHEDULER_MAX_GLOBAL_CATCHUP_RUNS"},
+	{key: "scheduler.maxCatchupRunsPerDAG", env: "SCHEDULER_MAX_CATCHUP_RUNS_PER_DAG"},
+	{key: "scheduler.catchupRateLimit", env: "SCHEDULER_CATCHUP_RATE_LIMIT"},
internal/service/scheduler/catchup.go (1)

321-336: Duplicate check scans only the last 50 attempts — could miss duplicates with high catch-up volume.

If a DAG has more than 50 recent runs (e.g., from a previous catch-up or frequent schedule), older runs won't be checked. With the default per-DAG cap of 20, this is unlikely to be hit in practice. Consider making the limit configurable or at least matching it to MaxCatchupRunsPerDAG.

Comment on lines 657 to 661
if alias.CatchupWindow != "" {
s.CatchupWindow, err = time.ParseDuration(alias.CatchupWindow)
if err != nil {
return fmt.Errorf("invalid catchupWindow %q: %w", alias.CatchupWindow, err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent duration parsing: time.ParseDuration here vs duration.Parse in spec builder.

UnmarshalJSON uses time.ParseDuration which doesn't support day-unit durations (e.g., "2d12h"), but buildSchedulerFromEntries in internal/core/spec/schedule.go (line 54) uses duration.Parse which does. A user who sets catchupWindow: "2d" in YAML will succeed at build time, but the value will fail to round-trip through JSON serialization/deserialization.

Proposed fix
+	"github.com/dagu-org/dagu/internal/cmn/duration"
...
 	if alias.CatchupWindow != "" {
-		s.CatchupWindow, err = time.ParseDuration(alias.CatchupWindow)
+		s.CatchupWindow, err = duration.Parse(alias.CatchupWindow)
 		if err != nil {
 			return fmt.Errorf("invalid catchupWindow %q: %w", alias.CatchupWindow, err)
 		}
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if alias.CatchupWindow != "" {
s.CatchupWindow, err = time.ParseDuration(alias.CatchupWindow)
if err != nil {
return fmt.Errorf("invalid catchupWindow %q: %w", alias.CatchupWindow, err)
}
if alias.CatchupWindow != "" {
s.CatchupWindow, err = duration.Parse(alias.CatchupWindow)
if err != nil {
return fmt.Errorf("invalid catchupWindow %q: %w", alias.CatchupWindow, err)
}
}
🤖 Prompt for AI Agents
In `@internal/core/dag.go` around lines 657 - 661, UnmarshalJSON currently parses
alias.CatchupWindow with time.ParseDuration which doesn't accept day units,
causing mismatch with buildSchedulerFromEntries (which uses duration.Parse);
update the UnmarshalJSON logic that sets s.CatchupWindow (when
alias.CatchupWindow != "") to use the same duration.Parse function as the spec
builder (and keep the existing error wrapping message), ensuring
alias.CatchupWindow, s.CatchupWindow, and UnmarshalJSON behave consistently with
buildSchedulerFromEntries.

Comment on lines 410 to 421
// Inject scheduled time and catch-up environment variables
if a.scheduledTime != "" {
rCtx := runtime.GetDAGContext(ctx)
extraEnvs := map[string]string{
exec.EnvKeyScheduledTime: a.scheduledTime,
}
if a.triggerType == core.TriggerTypeCatchUp {
extraEnvs[exec.EnvKeyIsCatchup] = "1"
}
rCtx.EnvScope = rCtx.EnvScope.WithEntries(extraEnvs, eval.EnvSourceDAGEnv)
ctx = runtime.WithDAGContext(ctx, rCtx)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

DAGU_IS_CATCHUP value "1" contradicts the RFC spec which says "true".

The RFC (line 285) documents DAGU_IS_CATCHUP as "true", and the walkthrough example (line 544) shows DAGU_IS_CATCHUP=true. However, line 417 sets it to "1". DAG authors reading the RFC would write if [ "$DAGU_IS_CATCHUP" = "true" ], which would fail.

Either update the code to use "true" or update the RFC — recommend aligning the code with the RFC since it's the public contract.

Suggested fix
 		if a.triggerType == core.TriggerTypeCatchUp {
-			extraEnvs[exec.EnvKeyIsCatchup] = "1"
+			extraEnvs[exec.EnvKeyIsCatchup] = "true"
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Inject scheduled time and catch-up environment variables
if a.scheduledTime != "" {
rCtx := runtime.GetDAGContext(ctx)
extraEnvs := map[string]string{
exec.EnvKeyScheduledTime: a.scheduledTime,
}
if a.triggerType == core.TriggerTypeCatchUp {
extraEnvs[exec.EnvKeyIsCatchup] = "1"
}
rCtx.EnvScope = rCtx.EnvScope.WithEntries(extraEnvs, eval.EnvSourceDAGEnv)
ctx = runtime.WithDAGContext(ctx, rCtx)
}
// Inject scheduled time and catch-up environment variables
if a.scheduledTime != "" {
rCtx := runtime.GetDAGContext(ctx)
extraEnvs := map[string]string{
exec.EnvKeyScheduledTime: a.scheduledTime,
}
if a.triggerType == core.TriggerTypeCatchUp {
extraEnvs[exec.EnvKeyIsCatchup] = "true"
}
rCtx.EnvScope = rCtx.EnvScope.WithEntries(extraEnvs, eval.EnvSourceDAGEnv)
ctx = runtime.WithDAGContext(ctx, rCtx)
}
🤖 Prompt for AI Agents
In `@internal/runtime/agent/agent.go` around lines 410 - 421, The env var value
for catch-up is being set to "1" which contradicts the RFC; change the value to
"true" where extraEnvs[exec.EnvKeyIsCatchup] is assigned in the scheduled-time
injection block (inside the a.scheduledTime conditional that uses
runtime.GetDAGContext, rCtx.EnvScope.WithEntries, eval.EnvSourceDAGEnv and
runtime.WithDAGContext) so the map entry uses "true" when a.triggerType ==
core.TriggerTypeCatchUp, keeping all other logic intact.

Comment on lines 83 to 91
if !hasAnyWatermark {
// First run — no catch-up needed, seed all DAGs with current time
logger.Info(ctx, "No per-DAG watermarks found, skipping catch-up")
if err := c.dagStateStore.SaveAll(dags, c.clock()); err != nil {
return nil, fmt.Errorf("failed to save initial watermarks: %w", err)
}
result.Duration = c.clock().Sub(start)
return result, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor inconsistency: c.clock() called separately for SaveAll vs. catchupTo.

Line 72 captures catchupTo := c.clock(), but line 86 calls c.clock() again for SaveAll. In the first-run path, the saved watermark could be slightly later than catchupTo used elsewhere. Use catchupTo for consistency.

Proposed fix
-		if err := c.dagStateStore.SaveAll(dags, c.clock()); err != nil {
+		if err := c.dagStateStore.SaveAll(dags, catchupTo); err != nil {
🤖 Prompt for AI Agents
In `@internal/service/scheduler/catchup.go` around lines 83 - 91, The first-run
branch uses c.clock() twice causing inconsistent timestamps: replace the second
call to c.clock() inside the hasAnyWatermark == false branch with the previously
captured variable catchupTo so the saved watermarks and the catchup reference
use the same timestamp; update the SaveAll invocation (called via
c.dagStateStore.SaveAll) to pass catchupTo instead of c.clock(), leaving
hasAnyWatermark, catchupTo and result.Duration logic unchanged.

Comment on lines 113 to 145
// Dispatch candidates
for _, cand := range candidates {
if ctx.Err() != nil {
break
}

dispatched, err := c.dispatchCandidate(ctx, cand)
if err != nil {
logger.Error(ctx, "Catch-up dispatch failed, stopping catch-up",
tag.DAG(cand.dag.Name),
tag.Error(err),
)
// Save watermark at the last successful dispatch point
break
}

if dispatched {
result.Dispatched++
// Advance this DAG's watermark after each successful dispatch
if err := c.dagStateStore.Save(cand.dag, dagState{LastTick: cand.scheduledTime}); err != nil {
logger.Error(ctx, "Failed to save DAG state", tag.Error(err))
}
} else {
result.Skipped++
}

time.Sleep(c.config.Scheduler.CatchupRateLimit)
}

// Set watermarks to catchupTo after all dispatches
if err := c.dagStateStore.SaveAll(dags, catchupTo); err != nil {
logger.Error(ctx, "Failed to save final watermarks", tag.Error(err))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Watermark advances to catchupTo even after dispatch failure — missed runs will be lost.

When a dispatch fails (line 120), the loop breaks, but line 143 unconditionally calls SaveAll(dags, catchupTo), advancing all DAG watermarks to catchupTo. This means:

  1. The failed DAG's watermark jumps past any un-dispatched candidates.
  2. Any remaining candidates for other DAGs that weren't reached in the loop are also skipped.

On the next restart, catch-up won't retry these skipped runs because the watermark already advanced past them.

Consider tracking which DAGs completed successfully and only advancing their watermarks to catchupTo, while leaving failed/incomplete DAGs at their last successful dispatch point (already saved incrementally at line 132).

Sketch of a fix
+	// Track which DAGs had all candidates dispatched or skipped
+	completedDAGs := make(map[string]*core.DAG)
+	failedDAG := ""
+
 	// Dispatch candidates
 	for _, cand := range candidates {
 		if ctx.Err() != nil {
 			break
 		}
 
 		dispatched, err := c.dispatchCandidate(ctx, cand)
 		if err != nil {
 			logger.Error(ctx, "Catch-up dispatch failed, stopping catch-up",
 				tag.DAG(cand.dag.Name),
 				tag.Error(err),
 			)
-			// Save watermark at the last successful dispatch point
+			failedDAG = cand.dag.Name
 			break
 		}
 
 		if dispatched {
 			result.Dispatched++
 			// Advance this DAG's watermark after each successful dispatch
 			if err := c.dagStateStore.Save(cand.dag, dagState{LastTick: cand.scheduledTime}); err != nil {
 				logger.Error(ctx, "Failed to save DAG state", tag.Error(err))
 			}
 		} else {
 			result.Skipped++
 		}
 
 		time.Sleep(c.config.Scheduler.CatchupRateLimit)
 	}
 
-	// Set watermarks to catchupTo after all dispatches
-	if err := c.dagStateStore.SaveAll(dags, catchupTo); err != nil {
-		logger.Error(ctx, "Failed to save final watermarks", tag.Error(err))
+	// Only advance watermarks for DAGs that weren't interrupted
+	for k, dag := range dags {
+		if k == failedDAG {
+			continue // Leave at last successful dispatch watermark
+		}
+		if err := c.dagStateStore.Save(dag, dagState{LastTick: catchupTo}); err != nil {
+			logger.Error(ctx, "Failed to save final watermark", tag.Error(err))
+		}
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Dispatch candidates
for _, cand := range candidates {
if ctx.Err() != nil {
break
}
dispatched, err := c.dispatchCandidate(ctx, cand)
if err != nil {
logger.Error(ctx, "Catch-up dispatch failed, stopping catch-up",
tag.DAG(cand.dag.Name),
tag.Error(err),
)
// Save watermark at the last successful dispatch point
break
}
if dispatched {
result.Dispatched++
// Advance this DAG's watermark after each successful dispatch
if err := c.dagStateStore.Save(cand.dag, dagState{LastTick: cand.scheduledTime}); err != nil {
logger.Error(ctx, "Failed to save DAG state", tag.Error(err))
}
} else {
result.Skipped++
}
time.Sleep(c.config.Scheduler.CatchupRateLimit)
}
// Set watermarks to catchupTo after all dispatches
if err := c.dagStateStore.SaveAll(dags, catchupTo); err != nil {
logger.Error(ctx, "Failed to save final watermarks", tag.Error(err))
}
// Track which DAGs had all candidates dispatched or skipped
completedDAGs := make(map[string]*core.DAG)
failedDAG := ""
// Dispatch candidates
for _, cand := range candidates {
if ctx.Err() != nil {
break
}
dispatched, err := c.dispatchCandidate(ctx, cand)
if err != nil {
logger.Error(ctx, "Catch-up dispatch failed, stopping catch-up",
tag.DAG(cand.dag.Name),
tag.Error(err),
)
failedDAG = cand.dag.Name
break
}
if dispatched {
result.Dispatched++
// Advance this DAG's watermark after each successful dispatch
if err := c.dagStateStore.Save(cand.dag, dagState{LastTick: cand.scheduledTime}); err != nil {
logger.Error(ctx, "Failed to save DAG state", tag.Error(err))
}
} else {
result.Skipped++
}
time.Sleep(c.config.Scheduler.CatchupRateLimit)
}
// Only advance watermarks for DAGs that weren't interrupted
for k, dag := range dags {
if k == failedDAG {
continue // Leave at last successful dispatch watermark
}
if err := c.dagStateStore.Save(dag, dagState{LastTick: catchupTo}); err != nil {
logger.Error(ctx, "Failed to save final watermark", tag.Error(err))
}
}
🤖 Prompt for AI Agents
In `@internal/service/scheduler/catchup.go` around lines 113 - 145, The loop
currently breaks on dispatch failure but unconditionally calls
c.dagStateStore.SaveAll(dags, catchupTo), advancing every DAG watermark; change
this so only DAGs that completed catch-up are advanced to catchupTo: track
successful DAGs inside the dispatch loop (e.g., a map keyed by cand.dag.Name or
dag ID updated when you increment result.Dispatched and after successful per-DAG
Save(cand.dag, dagState{LastTick: cand.scheduledTime})), and after the loop call
SaveAll only for that subset (or iterate and call Save(dag, dagState{LastTick:
catchupTo}) for each successful DAG). Ensure failing DAGs are left at their last
saved state (you already Save per-dispatch in dispatchCandidate flow) and do not
include them in the final SaveAll(catchupTo) advance.

@yottahmd yottahmd closed this Feb 8, 2026
@codecov
Copy link

codecov bot commented Feb 8, 2026

Codecov Report

❌ Patch coverage is 66.36905% with 113 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.80%. Comparing base (c62a7e8) to head (2e77d54).

Files with missing lines Patch % Lines
internal/core/spec/types/schedule.go 58.26% 39 Missing and 9 partials ⚠️
internal/persis/filedagstate/store.go 67.74% 12 Missing and 8 partials ⚠️
internal/core/dag.go 42.85% 8 Missing and 4 partials ⚠️
internal/runtime/agent/agent.go 18.18% 8 Missing and 1 partial ⚠️
internal/cmd/start.go 50.00% 5 Missing and 1 partial ⚠️
internal/service/scheduler/entryreader.go 14.28% 6 Missing ⚠️
internal/core/status.go 0.00% 4 Missing ⚠️
internal/cmd/enqueue.go 66.66% 1 Missing and 1 partial ⚠️
internal/core/spec/dag.go 0.00% 1 Missing and 1 partial ⚠️
internal/runtime/subcmd.go 50.00% 1 Missing and 1 partial ⚠️
... and 1 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1647      +/-   ##
==========================================
- Coverage   69.88%   69.80%   -0.09%     
==========================================
  Files         335      338       +3     
  Lines       37440    37733     +293     
==========================================
+ Hits        26166    26340     +174     
- Misses       9204     9293      +89     
- Partials     2070     2100      +30     
Files with missing lines Coverage Δ
internal/cmd/exec.go 64.53% <100.00%> (ø)
internal/cmn/config/config.go 76.28% <ø> (ø)
internal/cmn/config/loader.go 83.35% <100.00%> (+0.27%) ⬆️
internal/cmn/duration/parse.go 100.00% <100.00%> (ø)
internal/core/catchup_policy.go 100.00% <100.00%> (ø)
internal/core/exec/runstatus.go 96.66% <ø> (ø)
internal/core/spec/schedule.go 100.00% <100.00%> (ø)
internal/runtime/manager.go 55.25% <100.00%> (+1.36%) ⬆️
internal/runtime/transform/status.go 93.33% <100.00%> (+0.22%) ⬆️
internal/service/scheduler/dag_executor.go 83.95% <100.00%> (+1.28%) ⬆️
... and 13 more

... and 5 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c62a7e8...2e77d54. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant