Skip to content

Comments

feat: scheduler catch-up#1649

Merged
yottahmd merged 36 commits intomainfrom
004-schedule-catchup
Feb 11, 2026
Merged

feat: scheduler catch-up#1649
yottahmd merged 36 commits intomainfrom
004-schedule-catchup

Conversation

@yottahmd
Copy link
Collaborator

@yottahmd yottahmd commented Feb 10, 2026

Summary by CodeRabbit

  • New Features

    • Added per‑DAG catch-up: catchupWindow (Go duration, supports day units) and overlapPolicy ("skip", "all", "latest") to replay missed runs with configurable overlap handling.
    • Scheduler now buffers and replays missed runs and exposes "catchup" trigger type in APIs and UI (new Trigger column and "Catch‑up" label).
  • Bug Fixes

    • Added panic recovery in queue processing to improve runtime robustness.
  • Documentation

    • RFC updated to "implemented" reflecting the new catch‑up model.

…xecTime

- Add nil-safe defaults for GenRunID and Dispatch in NewTickPlanner,
  matching all other function fields
- Fix data race in recomputeBuffer: snapshot watermark values under the
  lock instead of copying a pointer and reading the shared map after
  releasing it. Apply same defensive fix to initBuffers.
- Fix computePrevExecTime for non-uniform cron schedules (e.g.,
  "0 9,17 * * *") by walking forward from next-7d instead of assuming
  constant interval between firings
- Delete dagrunjob.go (DAGRunJob only had String/PrevExecTime, both
  superseded by computePrevExecTime in tick_planner.go)
- Move TestPrevExecTime to TestComputePrevExecTime in tick_planner_test
  with additional cases for non-uniform and weekly cron schedules
- Update dag_executor.go comments to reference TickPlanner.DispatchRun
- Fix stale "DAGRunJob.Stop" comment in tick_planner.go
- Add shouldRun guard tests: SkipIfSuccessful, AlreadyFinished,
  FailedPreviousRunNotSkipped
- Add TestScheduler/Start e2e test for start-schedule dispatch
- Add DispatchRunStart and StartStop lifecycle tests
- Add SetDispatchFunc to Scheduler for test injection
…bitmask ops, fsync

- Add default case to overlap policy switch treating unknown values as skip
- Add nil map guard after watermark state load
- Make ParseOverlapPolicy case-insensitive and whitespace-tolerant
- Fix fsnotify.Op checks to use bitmask instead of equality
- Add version validation in watermark store with fresh-state fallback
- Add fsync before rename in watermark store for durability
- Expand thread-safety documentation for TickPlanner
…e tests

- Add timezone context comments for start vs stop/restart schedule eval
- Document setEvents pre-Start contract in entryreader
- Add infinite loop guard in ComputeMissedIntervals
- Remove extra blank line in entryreader
- Fix misleading test comment in scheduler_test
- Document dagName field purpose in schedule_buffer
- Add negative and whitespace edge-case tests for ParseDuration
- Export triggerTypeLabels from TriggerTypeIndicator and reuse in DAGStatusOverview
@coderabbitai
Copy link

coderabbitai bot commented Feb 10, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements DAG catch-up scheduling: adds catchupWindow and overlapPolicy fields, introduces persistent watermarking and per-DAG in-memory buffers, replaces the old job-based scheduler with an event-driven TickPlanner, refactors EntryReader, wires a file-based watermark store, and adds the "catchup" trigger type across API/UI/specs.

Changes

Cohort / File(s) Summary
Trigger Type Additions
api/v1/api.gen.go, api/v1/api.yaml, internal/core/status.go, ui/src/api/v1/schema.ts
Added "catchup"/TriggerTypeCatchup across API spec, core status parsing, and UI schema.
Schema & Core DAG Fields
internal/cmn/schema/dag.schema.json, internal/core/dag.go, internal/core/spec/dag.go
Added DAG-level catchupWindow and overlapPolicy schema fields and mapped them into core DAG fields via transformers.
Duration & Overlap Parsing
internal/core/duration.go, internal/core/duration_test.go, internal/core/overlap_policy.go, internal/core/overlap_policy_test.go
New ParseDuration supporting day units and ParseOverlapPolicy with values `skip
Watermark Persistence
internal/persis/filewatermark/store.go, internal/persis/filewatermark/store_test.go
New file-backed watermark Store with atomic JSON writes, versioning, load/save semantics and tests.
TickPlanner & Scheduler Core
internal/service/scheduler/tick_planner.go, internal/service/scheduler/tick_planner_test.go, internal/service/scheduler/scheduler.go, internal/service/scheduler/scheduler_test.go
Introduced TickPlanner (event-driven planning, per-DAG buffers, watermarks, dispatch hooks), integrated it into Scheduler, added planner lifecycle and extensive tests.
Catchup Computation
internal/service/scheduler/catchup.go, internal/service/scheduler/catchup_test.go
Added ComputeReplayFrom, ComputeMissedIntervals, MaxMissedRuns, and associated tests for missed-run computation and capping.
Schedule Buffer
internal/service/scheduler/schedule_buffer.go, internal/service/scheduler/schedule_buffer_test.go
Per-DAG FIFO ScheduleBuffer implementation with overflow handling, peek/pop/drop behaviors and tests.
EntryReader Refactor
internal/service/scheduler/entryreader.go, internal/service/scheduler/entryreader_internal_test.go
Reworked EntryReader to expose DAGs() and emit DAGChangeEvent stream; removed Next()/ScheduledJob surface; added watcher-driven change handling and tests.
DAGRunJob Removal & Mocks
internal/service/scheduler/dagrunjob.go, internal/service/scheduler/mocks_test.go
Removed DAGRunJob type and related orchestration; updated test mocks to use DAGs() and simplified mock manager.
Watermark Interfaces
internal/service/scheduler/watermark.go
Added SchedulerState, DAGWatermark types and WatermarkStore interface with noop implementation.
Queue Processor & Safety
internal/service/scheduler/queue_processor.go
Adjusted backoff defaults and added panic recovery in queue processing goroutines.
Cmd Wiring & CLI
internal/cmd/context.go, internal/cmd/helper.go, internal/cmd/start.go
Wired file watermark store into startup, adapted EntryReader and scheduler constructor calls, and updated trigger-type help/validation to include catchup.
Frontend UI
ui/src/features/dag-runs/.../DAGRunTable.tsx, ui/src/features/dags/components/common/TriggerTypeIndicator.tsx, ui/src/features/dags/components/dag-details/DAGStatusOverview.tsx
Added Trigger column/indicator and exported triggerTypeLabels including catchup; updated DAG run displays.
Docs & RFC
rfcs/004-schedule-catchup.md, internal/test/scheduler.go
RFC moved to implemented; updated docs and test helpers to reflect new scheduler/watermark behavior.

Sequence Diagram(s)

sequenceDiagram
    participant EntryReader as EntryReader (FS watcher)
    participant TickPlanner as TickPlanner
    participant Watermark as WatermarkStore
    participant Scheduler as Scheduler
    participant Buffer as ScheduleBuffer
    participant Dispatcher as DispatchFunc

    EntryReader->>TickPlanner: emit DAGChangeEvent (add/update/delete)
    TickPlanner->>Watermark: Load() / Flush()
    loop Tick (periodic)
        Scheduler->>TickPlanner: Plan(now)
        TickPlanner->>Buffer: Peek/Pop (prioritize catch-up)
        TickPlanner-->>Scheduler: []PlannedRun
        Scheduler->>Dispatcher: dispatchRun(PlannedRun)
        Dispatcher-->>TickPlanner: success/failure
        TickPlanner->>Watermark: Save(updated SchedulerState) (async flusher)
    end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.95% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: scheduler catch-up' accurately summarizes the main feature addition of implementing a scheduler catch-up mechanism, which is the primary focus of this large changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 004-schedule-catchup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/service/scheduler/mocks_test.go (1)

12-32: ⚠️ Potential issue | 🟡 Minor

Consider moving the EntryReader mock into internal/test fixtures.

This mock is now reusable across scheduler tests; centralizing it avoids duplication and keeps test helpers consistent.

As per coding guidelines: "Use stretchr/testify/require for assertions and shared fixtures from internal/test instead of duplicating mocks".

rfcs/004-schedule-catchup.md (1)

205-213: ⚠️ Potential issue | 🟠 Major

RFC states watermark advances only on successful dispatch, but implementation advances unconditionally.

Line 207 says: "the watermark does not advance past that time and catch-up stops." However, in scheduler.go, Advance() is called immediately after Plan() (before async dispatch completes), moving the watermark regardless of dispatch outcome. Either the implementation or the RFC should be updated for consistency — see the related comment on scheduler.go lines 349–354.

🤖 Fix all issues with AI agents
In `@internal/core/overlap_policy_test.go`:
- Around line 3-38: Replace the final assertion in TestParseOverlapPolicy to use
require.Equal instead of assert.Equal (change the call in the test body where
ParseOverlapPolicy is compared to tt.want), and remove the now-unused
"github.com/stretchr/testify/assert" import from the import block so only
"github.com/stretchr/testify/require" remains; this keeps assertions consistent
with the test guidelines and avoids an unused import.

In `@internal/service/scheduler/catchup.go`:
- Around line 37-67: ComputeMissedIntervals currently uses time.Time values with
possible monotonic clock readings as map keys causing dedupe to fail; normalize
every timestamp returned by sched.Parsed.Next by calling Round(0) (e.g., set t =
t.Round(0) and next = next.Round(0)) before checking/setting seen and before
appending to result so map lookups and stored times use wall-clock-only
instants; apply this normalization for both the initial t and the loop's next
value in the ComputeMissedIntervals function.

In `@internal/service/scheduler/scheduler.go`:
- Around line 349-354: The planner's watermark is advanced immediately after
s.planner.Plan but before s.dispatchRun goroutines complete, violating the RFC;
change the flow so the watermark only advances after successful dispatch: have
s.dispatchRun return an error (or a completion signal) instead of spawning a
fire-and-forget goroutine, collect results for the planned runs from
s.planner.Plan, wait for each dispatch to complete and only call
s.planner.Advance(tickTime) if all catch-up runs dispatched successfully (or
advance per-run inside dispatchRun on success), updating callers of
s.dispatchRun accordingly.

In `@internal/service/scheduler/tick_planner_test.go`:
- Around line 98-178: Replace the mixed assert.* calls in the TickPlanner tests
with require.* so failures stop the test immediately: in
TestTickPlanner_InitLoadError replace assert.NotNil and assert.Equal on
tp.watermarkState with require.NotNil and require.Equal; in
TestTickPlanner_InitWithMissedRuns replace assert.Equal on buf.Len() with
require.Equal; in TestTickPlanner_Advance replace assert.True on
tp.watermarkDirty.Load() with require.True; and in
TestTickPlanner_FlushWritesSnapshot replace
require.NotNil/assert.Equal/assert.False usage to all use require.NotNil,
require.Equal and require.False respectively (use the existing tp,
tp.watermarkState, buf, tp.watermarkDirty.Load and store.lastSaved() references
to locate the assertions).

In `@internal/service/scheduler/tick_planner.go`:
- Around line 548-566: The Advance function unconditionally advances per-DAG
watermarks from tp.lastPlanResult (in TickPlanner.Advance), which moves catch-up
watermarks even for runs that may not have been dispatched; change the logic so
Advance only updates tp.watermarkState.DAGs for ScheduleTypeStart runs that have
been confirmed dispatched. Concretely: modify the flow around
TickPlanner.lastPlanResult and Advance to either (a) accept/consume a list of
successfully dispatched run IDs or runs (e.g., pass in confirmedRuns or have
Advance read a tp.dispatchedRuns set) and only set
DAGWatermark{LastScheduledTime: run.ScheduledTime} for runs present in that
confirmed-dispatch list, or (b) mark individual runs as dispatched when
DispatchRun returns success and update watermark immediately from that success
path instead of from lastPlanResult. Ensure tp.watermarkDirty.Store(true) and
clearing tp.lastPlanResult happen only after applying updates for confirmed
dispatched runs.
- Around line 336-350: The code pops a skipped buffer item under
OverlapPolicySkip (buf.Pop()) but never advances the DAG watermark, so skipped
intervals are lost and re-detected on restart; fix by advancing the watermark
when handling OverlapPolicySkip — after buf.Pop() compute the popped item's
interval (or use the popped item/iterator returned by buf.Pop()) and call the
planner/watermark advancement path (the same mechanism used by Advance()/which
consumes lastPlanResult) to move the watermark past that interval so subsequent
ticks won’t re-enqueue it; update references to buf.Pop(),
overlapPolicy/OverlapPolicySkip, Advance(), and lastPlanResult accordingly.
- Around line 591-603: The TickPlanner.Start method can be called twice which
causes tp.cancel to be overwritten and the first goroutines
(drainEvents/startFlusher) to become uncancellable; add a re-entry guard (e.g.,
a field atomic.Bool started on TickPlanner) checked-and-set at the top of Start
to prevent double-start, and if already started either return an error or no-op;
ensure the guard is used before creating the context/cancel and before tp.wg.Add
so the original tp.cancel isn't lost and goroutines are not leaked.
- Around line 305-310: The suspension check incorrectly derives dagBaseName from
entry.dag.Location; instead call tp.cfg.IsSuspended with the DAG's explicit
identifier entry.dag.Name. In the loop over tp.entries replace the
filepath/strings-based stem computation and the call tp.cfg.IsSuspended(ctx,
dagBaseName) with tp.cfg.IsSuspended(ctx, entry.dag.Name) so suspension honors
YAML-set dag.Name (reference: tp.entries, entry.dag.Location, entry.dag.Name,
tp.cfg.IsSuspended).

In `@ui/src/features/dag-runs/components/dag-run-list/DAGRunTable.tsx`:
- Around line 273-279: The DAGRunTable component currently renders
dagRun.dagRunId and TriggerTypeIndicator without wrapping classes, causing long
IDs to overflow; update the JSX that contains {dagRun.dagRunId} (the span with
className "font-mono text-muted-foreground") and the corresponding cell at the
other occurrence (the block referenced around the second location) to include
"whitespace-normal break-words" on the containing element (either the wrapping
div or the span) so long trigger/run IDs wrap cleanly and prevent layout
overflow; ensure you apply the same class addition to the other occurrence
mentioned (the similar span at the later lines) so both places handle long text
consistently.
🧹 Nitpick comments (20)
internal/cmd/helper.go (1)

28-33: Hardcoded trigger-type list in error message may drift from actual valid values.

The error message enumerates valid trigger types as a literal string. If a new type is added to core.TriggerType in the future, this message can silently go stale. Consider deriving the list programmatically (e.g., from a slice of known types or a String() method) to keep it in sync.

That said, this is a pre-existing pattern and the addition of "catchup" here is correct for this PR.

internal/core/duration.go (1)

20-23: Silently swallowing strconv.Atoi error could mask overflow on extreme inputs.

If someone passes "99999999999999999999d", Atoi fails, n becomes 0, and the result is "0h" — rejected as "duration must be positive" rather than a more accurate error. This is unlikely in practice but worth a note.

Optional: propagate Atoi errors
-	s = reDays.ReplaceAllStringFunc(s, func(v string) string {
-		n, _ := strconv.Atoi(strings.TrimSuffix(v, "d"))
-		return strconv.Itoa(n*24) + "h"
-	})
+	var convErr error
+	s = reDays.ReplaceAllStringFunc(s, func(v string) string {
+		n, err := strconv.Atoi(strings.TrimSuffix(v, "d"))
+		if err != nil {
+			convErr = fmt.Errorf("invalid day value %q: %w", v, err)
+			return v // leave as-is; will fail in ParseDuration
+		}
+		return strconv.Itoa(n*24) + "h"
+	})
+	if convErr != nil {
+		return 0, convErr
+	}
internal/core/duration_test.go (1)

14-27: Consider adding a standard Go duration case (no day unit) for completeness.

All valid cases use either d or h. A case like {name: "standard minutes", input: "30m", want: 30 * time.Minute} would confirm the pass-through to time.ParseDuration works unmodified.

internal/service/scheduler/schedule_buffer_test.go (1)

12-31: Consider adding a test for OverlapPolicySkip send-rejection semantics.

TestScheduleBuffer_SendAndPeek uses OverlapPolicySkip but only sends one item, so it never exercises the skip-duplicate behavior. A dedicated test verifying that Send rejects (or deduplicates) when an item is already buffered under OverlapPolicySkip would strengthen coverage of the core policy logic.

internal/test/scheduler.go (1)

36-44: Pre-existing: hasDAGsDir check is always true when any option is passed.

Every HelperOption is a non-nil function value, so the if opt != nil check on line 41 is always true whenever opts is non-empty. This means the default testdata path (lines 47-50) is skipped even when a caller passes unrelated options (e.g., WithConfigMutator). This is pre-existing, but worth noting.

Suggested fix

Track the DAGs dir explicitly via a sentinel or by inspecting option results:

-	var hasDAGsDir bool
-	for _, opt := range opts {
-		schedulerOpts = append(schedulerOpts, opt)
-		// Check if DAGsDir option is already provided
-		// This is a simple check, in production code you might want a more robust solution
-		if opt != nil {
-			hasDAGsDir = true
-		}
-	}
+	schedulerOpts = append(schedulerOpts, opts...)
+	// Apply opts to a scratch helper to detect if DAGsDir was set
+	hasDAGsDir := false
+	for _, opt := range opts {
+		// TODO: implement a reliable DAGsDir detection mechanism
+		_ = opt
+	}

A simpler approach: always append the testdata default first and let a caller-supplied WithDAGsDir override it (since options are applied in order).

internal/persis/filewatermark/store_test.go (1)

113-152: TestStore_SaveAtomicity name is slightly misleading — it verifies sequential overwrite, not crash safety.

This test confirms that a second save overwrites the first and that no .tmp file is left behind, which is useful. True atomicity (e.g., crash mid-write leaves valid state) would require fault injection. The current name could mislead readers into thinking crash safety is proven. Consider renaming to TestStore_SaveOverwrite or TestStore_SaveCleansTmpFile.

internal/service/scheduler/catchup_test.go (2)

23-72: Good table-driven coverage of ComputeReplayFrom.

Consider adding a case where lastTick or lastScheduledTime is after now to document expected behavior for clock skew or future-dated watermarks.


157-170: Cap test doesn't assert that the most recent runs are retained.

The comment on line 167 says "Should keep the most recent runs (closest to replayTo)", but the only assertion is that the last element is <= replayTo. To truly verify FIFO-tail behavior, assert that got[0] is close to replayTo (i.e., far from replayFrom).

Suggested additional assertion
 	// Should keep the most recent runs (closest to replayTo)
 	if len(got) > 0 {
 		assert.True(t, got[len(got)-1].Before(replayTo) || got[len(got)-1].Equal(replayTo))
+		// The first retained run should be far from replayFrom (we kept the tail)
+		assert.True(t, got[0].After(replayFrom.Add(24*time.Hour)),
+			"expected most recent runs to be retained, but got[0]=%v is too close to replayFrom=%v", got[0], replayFrom)
 	}
internal/persis/filewatermark/store.go (1)

71-99: Make the rename durable with a directory fsync.

os.Rename is atomic, but without syncing the directory, the new entry can be lost on crash/power loss. Consider syncing the base dir after rename.

💾 Suggested durability tweak
 if err := os.Rename(tmpFile, path); err != nil {
 	// Clean up temp file on rename failure
 	_ = os.Remove(tmpFile)
 	logger.Error(ctx, "Failed to rename watermark state file",
 		tag.Error(err),
 		tag.File(path),
 	)
 	return fmt.Errorf("failed to rename state file: %w", err)
 }
+
+// Ensure directory entry is durable
+if err := syncDir(s.baseDir); err != nil {
+	return fmt.Errorf("failed to sync watermark directory: %w", err)
+}
+func syncDir(dir string) error {
+	d, err := os.Open(dir)
+	if err != nil {
+		return err
+	}
+	defer d.Close()
+	return d.Sync()
+}
internal/service/scheduler/tick_planner_test.go (1)

17-53: Centralize the watermark store mock in internal/test fixtures.

This mock will likely be reused across scheduler tests; moving it into shared fixtures avoids duplication.

As per coding guidelines: "Use stretchr/testify/require for assertions and shared fixtures from internal/test instead of duplicating mocks".

internal/service/scheduler/scheduler_test.go (1)

58-93: Missing blank line before the "Start" sub-test.

The "Restart" sub-test closing brace at line 57 is immediately followed by the next t.Run at line 58 with no separator, reducing readability compared to the other sub-tests in the file.

 	})
+
 	t.Run("Start", func(t *testing.T) {
internal/service/scheduler/scheduler.go (4)

264-266: Planner init failure is logged but execution continues — verify this is intentional.

If planner.Init fails (e.g., corrupt watermark file), the error is logged but Start() proceeds. Per the RFC's "Graceful Degradation" policy (missing watermark = no catch-up), this seems intentional. However, Init returning an error currently returns nil from Init internally anyway (line 223 in tick_planner.go always returns nil). If future changes make Init return meaningful errors, this silent swallow could mask data-loss scenarios.

Consider at minimum logging at Warn level instead of Error to signal that this is a degraded-but-acceptable state, not a failure that needs operator intervention.


86-124: Type assertion to concrete entryReaderImpl couples the constructor to the implementation.

Line 91 uses er.(*entryReaderImpl) to extract IsSuspended and wire the event channel. This bypasses the EntryReader interface and silently disables suspension checks and event propagation when a different implementation is provided (e.g., in integration tests using a mock).

Consider extending the EntryReader interface (or introducing a narrower interface) to expose IsSuspended and event wiring, rather than relying on a type assertion against the concrete type.


436-449: Fire-and-forget goroutines in dispatchRun are not tracked.

Each call spawns a goroutine that isn't joined during Stop(). On shutdown, in-flight dispatches may be orphaned or race with resource cleanup. The context from cronLoop will eventually cancel, but the goroutine may outlive the wg.Wait() in Stop.

Consider adding these goroutines to a sync.WaitGroup that Stop joins, or dispatching synchronously for catch-up runs where ordering and watermark correctness matter.


148-175: Test-hook setters mutate planner config without synchronization.

The Set* methods (e.g., SetRestartFunc, SetDispatchFunc) directly write to s.planner.cfg fields. The doc comments correctly state "must be called before Start()", but there's no runtime guard. If called after Start(), this would race with Plan() and drainEvents reading from cfg.

A lightweight safeguard would be to panic if s.running.Load() is true, making the contract enforceable at runtime.

rfcs/004-schedule-catchup.md (1)

162-167: Add a language identifier to fenced code blocks.

Several fenced code blocks in the document (lines 162, 271, 276, 283, 429) lack a language specifier, which triggers markdownlint MD040 warnings. Use text or plaintext for pseudocode/log snippets.

-```
+```text
 replayFrom = max(
internal/service/scheduler/entryreader.go (1)

122-197: Correctness of name-change detection is sound, but sendEvent for the "added" case after a name change uses the wrong condition.

When a DAG file is updated and the DAG name changes, the code correctly emits a DAGChangeDeleted for the old name (line 159–163), then falls through to line 165. The condition existed && oldDAGName == "" is false (because oldDAGName != ""), so the else branch fires and emits DAGChangeAdded — which is correct behavior (the new name is indeed a new entry for the planner).

However, the logic is non-obvious. Consider restructuring into explicit branches for clarity:

♻️ Suggested restructure for readability
-		if existed && oldDAGName == "" {
-			er.sendEvent(ctx, DAGChangeEvent{
-				Type:    DAGChangeUpdated,
-				DAG:     dag,
-				DAGName: dag.Name,
-			})
-		} else {
-			er.sendEvent(ctx, DAGChangeEvent{
-				Type:    DAGChangeAdded,
-				DAG:     dag,
-				DAGName: dag.Name,
-			})
-		}
+		changeType := DAGChangeAdded
+		if existed && oldDAGName == "" {
+			changeType = DAGChangeUpdated
+		}
+		er.sendEvent(ctx, DAGChangeEvent{
+			Type:    changeType,
+			DAG:     dag,
+			DAGName: dag.Name,
+		})
internal/service/scheduler/tick_planner.go (3)

295-302: Plan() holds entryMu during I/O calls — documented but worth noting the latency risk.

The struct comment (line 90–93) explains this is intentional for snapshot consistency. However, Plan() calls IsSuspended, IsRunning, GetLatestStatus, and GenRunID — all potentially involving disk/network I/O — while holding entryMu. This blocks drainEvents for the entire planning cycle.

For a small number of DAGs this is fine, but at scale (hundreds of DAGs), a single slow GetLatestStatus call could delay event processing and cause missed FS events to queue up. Consider computing the suspension/running state outside the lock in a future iteration if scale becomes a concern.


508-527: computePrevExecTime 7-day lookback window works but relies on a heuristic fallback for infrequent schedules.

For cron expressions with intervals > 7 days (e.g., monthly 0 0 1 * *), the walk loop finds no previous fire time and falls back to the interval heuristic (line 523–524). The heuristic next - (nextNext - next) is correct for uniform intervals but could be inaccurate for non-uniform crons (e.g., 0 0 1,15 * * where intervals alternate between ~14 and ~16 days). This only affects skipIfSuccessful guard accuracy — a minor edge case.


732-756: DispatchRun logs errors but does not propagate them — intentional fire-and-forget but limits observability.

Consider emitting a metric or structured log field (e.g., slog.Bool("success", false)) that can be used for alerting, since the caller (scheduler.dispatchRun) also doesn't inspect the result.

@yottahmd
Copy link
Collaborator Author

@coderabbitai do full review

@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

@yottahmd, I'll perform a full review of all the changes in this PR.

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/service/scheduler/dag_executor.go (1)

69-69: ⚠️ Potential issue | 🟡 Minor

Stale comment still references DAGRunJob.Start.

The architectural documentation on lines 29 and 31 was updated to reference TickPlanner.DispatchRun, but this comment still says "from DAGRunJob.Start". Update it for consistency.

📝 Proposed fix
-// HandleJob is the entry point for new scheduled jobs (from DAGRunJob.Start).
+// HandleJob is the entry point for new scheduled jobs (from TickPlanner.DispatchRun).
🤖 Fix all issues with AI agents
In `@internal/core/duration.go`:
- Around line 20-23: The current reDays.ReplaceAllStringFunc call swallows
strconv.Atoi errors causing silent overflow; replace this one-liner with a loop
that finds day matches via reDays (e.g., reDays.FindAllString or
FindAllStringIndex), parse the numeric part using strconv.ParseInt (or
ParseUint) with a 64-bit size, check and return any parse error instead of
ignoring it, convert the validated value to hours (multiply by 24) and build the
resulting string, and ensure the enclosing function (the one calling this
replacement) returns that error to the caller; reference the reDays regexp and
the strconv.Atoi usage in your change.

In `@internal/service/frontend/api/v1/queues.go`:
- Line 108: QueuedCount currently uses q.queuedCount (raw queueStore.Len())
which includes in-flight/running items while ListQueueItems filters them out;
update the assignment for QueuedCount to represent truly-waiting/pending items
by subtracting the running count (e.g., compute runningCount from the same
source ListQueueItems uses and set QueuedCount = q.queuedCount - runningCount,
ensuring it never goes negative) so the summary and ListQueueItems remain
consistent; alternatively, if you prefer to show total enqueued including
in-flight, rename the field (e.g., TotalEnqueued) and/or remove the filter in
ListQueueItems—pick one approach and apply it consistently to the QueuedCount
assignment and ListQueueItems behavior.

In `@internal/service/scheduler/catchup_test.go`:
- Around line 140-152: Replace the non-fatal length check with a fatal one to
avoid panics: change assert.Len(t, got, tt.wantCount) to require.Len(t, got,
tt.wantCount) before accessing got[0] and got[len(got)-1]; ensure the test
imports github.com/stretchr/testify/require (or adds it to the existing testify
imports). Keep the subsequent equality and chronological assertions as-is (or
convert them to require.* if you want the test to stop on first failure).

In `@internal/service/scheduler/scheduler.go`:
- Around line 258-260: The call to s.planner.Init(ctx, s.entryReader.DAGs())
currently logs errors with logger.Error but allows scheduler startup to
continue; decide whether watermark load failure should be fatal — if it should
be fatal, change the handler in the scheduler initialization to return the error
(propagate it up) instead of just logging, ensuring the caller cannot start an
inconsistent scheduler; if graceful degradation is intended, add a clear comment
above the s.planner.Init call explaining why proceeding without planner state is
safe and what fallback behavior occurs. Reference s.planner.Init,
s.entryReader.DAGs, logger.Error and the scheduler start-up path when applying
the change.

In `@internal/service/scheduler/tick_planner_test.go`:
- Around line 1123-1135: The test is calling mustParseSchedule(t, ...) inside a
spawned goroutine which is unsafe; pre-construct the DAGs and schedules on the
main test goroutine and then send them from the goroutine. Specifically, before
launching the anonymous goroutine that uses wg and eventCh, build a slice (e.g.,
events := make([]DAGChangeEvent, 50)) filling each entry with
DAGChangeEvent{Type: DAGChangeAdded, DAG: &core.DAG{Name: fmt.Sprintf("dag-%d",
i), Schedule: []core.Schedule{mustParseSchedule(t, "0 * * * *")}}, DAGName:
fmt.Sprintf("dag-%d", i)} on the main goroutine, then start the goroutine to
range over that prebuilt slice and send each event to eventCh and call wg.Done
when finished.

In `@internal/service/scheduler/tick_planner.go`:
- Around line 510-532: computePrevExecTime currently seeds the forward-walk from
next.Add(-7*24*time.Hour) which misses schedules sparser than weekly; change the
seed to a much larger lookback (e.g., seed := next.Add(-366 * 24 * time.Hour) or
one year) so monthly/annual cron entries are found; keep the existing forward
loop using schedule.Parsed.Next(t) and retain the interval-heuristic fallback,
and add/update a comment explaining the extended lookback to avoid missing
sparse schedules (referencing computePrevExecTime, seed, schedule.Parsed).
🧹 Nitpick comments (6)
internal/service/scheduler/queue_processor.go (1)

308-312: Item-level panic recovery is missing queue context in the log.

The outer per-queue recovery (Line 220) logs tag.Queue(queueName), but this per-item recovery only logs the panic value. Since queueName is available in scope (line 245's caller passes it via ProcessQueueItems), adding it here would make debugging easier. Also consider logging the item identifier if available.

🔧 Suggested improvement
 			defer func() {
 				if r := recover(); r != nil {
-					logger.Error(ctx, "Queue item processing panicked", tag.Error(panicToError(r)))
+					logger.Error(ctx, "Queue item processing panicked",
+						tag.Queue(queueName),
+						tag.Error(panicToError(r)),
+					)
 				}
 			}()

Note: ctx at this point already has tag.Queue attached (via queueCtx at Line 226), so the queue name may already appear in logs depending on your structured logging setup. If queueCtx propagates the field, this is a non-issue — please verify.

internal/core/duration_test.go (1)

7-7: Use require.Equal instead of assert.Equal for consistency with coding guidelines.

The overlap_policy_test.go was already updated to use require.Equal exclusively. This file should follow the same pattern, and the assert import can be dropped.

♻️ Proposed fix
 import (
 	"testing"
 	"time"
 
-	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
 			require.NoError(t, err)
-			assert.Equal(t, tt.want, got)
+			require.Equal(t, tt.want, got)

As per coding guidelines, **/*_test.go: "Use stretchr/testify/require for assertions."

Also applies to: 38-38

rfcs/004-schedule-catchup.md (1)

163-169: Add language specifiers to fenced code blocks.

Several fenced code blocks (lines 163, 272, 277, 284, 430) lack language specifiers, flagged by markdownlint (MD040). For example, the replay boundary pseudocode block at line 163 could use ```text or ```pseudocode.

internal/persis/filewatermark/store.go (1)

71-102: Consider syncing the parent directory after rename for full crash durability.

The temp-file + rename pattern ensures atomicity, but on some Linux filesystems the directory entry update from os.Rename may not be durable without an fsync on the parent directory. Given this is scheduler state (recoverable on next restart), this is a minor concern — but worth noting if you want belt-and-suspenders durability.

internal/service/scheduler/scheduler_test.go (1)

55-56: Sleep-based synchronization is inherently flaky.

time.Sleep(time.Second + 100ms) followed by assertions on atomic counters can fail under CI load. Consider polling with a deadline (similar to the pattern used in TestScheduler_GracefulShutdown Lines 236-244) instead of a fixed sleep for more deterministic results.

This applies to all subtests (Restart, Start, StopSchedule) that use the same pattern.

Example polling pattern
-		time.Sleep(time.Second + time.Millisecond*100)
-		require.GreaterOrEqual(t, restartCount.Load(), int32(1))
+		deadline := time.After(5 * time.Second)
+		for restartCount.Load() < 1 {
+			select {
+			case <-deadline:
+				t.Fatal("restart was not called within deadline")
+			default:
+				time.Sleep(10 * time.Millisecond)
+			}
+		}
internal/service/scheduler/scheduler.go (1)

86-91: Type assertion on EntryReader is fragile and couples construction to the concrete type.

If er is a mock, decorator, or any non-*entryReaderImpl type, IsSuspended silently becomes nil (defaulting to "never suspended") and no DAG change events are wired. In production, this means any future refactor that wraps the entry reader will silently break suspension checks and live-reload.

Consider adding IsSuspended and the event channel as explicit parameters to the constructor or to the EntryReader interface, rather than relying on a type assertion.

@yottahmd yottahmd merged commit 86d0e9d into main Feb 11, 2026
6 checks passed
@yottahmd yottahmd deleted the 004-schedule-catchup branch February 11, 2026 10:42
@codecov
Copy link

codecov bot commented Feb 11, 2026

Codecov Report

❌ Patch coverage is 77.47626% with 166 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.11%. Comparing base (c62a7e8) to head (b4cc041).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/scheduler/tick_planner.go 78.22% 57 Missing and 34 partials ⚠️
internal/persis/filewatermark/store.go 57.81% 17 Missing and 10 partials ⚠️
internal/service/scheduler/scheduler.go 70.96% 15 Missing and 3 partials ⚠️
internal/service/scheduler/entryreader.go 88.73% 6 Missing and 2 partials ⚠️
internal/service/scheduler/queue_processor.go 36.36% 5 Missing and 2 partials ⚠️
internal/core/duration.go 73.68% 3 Missing and 2 partials ⚠️
internal/core/status.go 0.00% 4 Missing ⚠️
internal/service/scheduler/catchup.go 85.71% 2 Missing and 2 partials ⚠️
internal/cmd/helper.go 0.00% 1 Missing ⚠️
internal/core/spec/dag.go 83.33% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1649      +/-   ##
==========================================
+ Coverage   69.88%   70.11%   +0.22%     
==========================================
  Files         335      341       +6     
  Lines       37440    37992     +552     
==========================================
+ Hits        26166    26639     +473     
- Misses       9204     9237      +33     
- Partials     2070     2116      +46     
Files with missing lines Coverage Δ
internal/cmd/context.go 70.62% <100.00%> (+0.09%) ⬆️
internal/cmd/start.go 37.17% <ø> (ø)
internal/core/dag.go 91.20% <ø> (ø)
internal/core/overlap_policy.go 100.00% <100.00%> (ø)
internal/service/scheduler/dag_executor.go 82.43% <ø> (-0.24%) ⬇️
internal/service/scheduler/schedule.go 80.00% <ø> (+20.00%) ⬆️
internal/service/scheduler/schedule_buffer.go 100.00% <100.00%> (ø)
internal/service/scheduler/watermark.go 100.00% <100.00%> (ø)
internal/cmd/helper.go 64.70% <0.00%> (ø)
internal/core/spec/dag.go 85.34% <83.33%> (-0.02%) ⬇️
... and 8 more

... and 10 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update df60962...b4cc041. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant