fix(scheduler): run scheduled jobs in deterministic order#65
fix(scheduler): run scheduled jobs in deterministic order#65ferntheplant wants to merge 1 commit intoget-convex:mainfrom
Conversation
… concurrent drains
📝 WalkthroughWalkthroughThe change introduces a deterministic, queue-based scheduling mechanism for scheduled jobs with configurable drain limits, improved error handling via try/finally blocks, and updated public API signatures to support optional iteration parameters. Changes
Sequence DiagramsequenceDiagram
participant Test as Test Framework
participant Queue as Job Queue
participant Scheduler as Scheduler/Timer
participant Job as Job Executor
participant Backend as Backend State
Test->>Queue: schedule(job)
activate Queue
Queue->>Queue: Push to scheduledJobQueue<br/>(with scheduledTime, order)
deactivate Queue
Test->>Scheduler: advanceTimers()
activate Scheduler
Scheduler->>Scheduler: scheduleDrain()<br/>(compute delay to next due job)
deactivate Scheduler
Scheduler->>Job: Timer fires
activate Job
Job->>Job: drainScheduledJobs()
Job->>Job: Set drainInProgress = true
loop Process all due jobs in order
Job->>Job: runOneScheduledJob()
Job->>Job: pending → inProgress
Job->>Backend: Update job state
Job->>Job: Execute job function
Job->>Job: inProgress → success/failed
Job->>Backend: Update result
end
Job->>Job: Set drainInProgress = false
deactivate Job
Test->>Test: Verify job results
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
I was running into flaky tests when using the test backend with the Workflow and Workpool components. I spent a while bashing my head into a wall trying to figure out why until I asked an LLM to look at the convex-test and Workpool source code. The following mostly AI generated but I can confirm it work on my large test suite including complex nested workflows, enqueued workpool actions, and other scheduled functions. I includes notes that it generated to provide more context.
so workpool main→updateRunStatus ordering is preserved and generation mismatch is avoided
in convexTest()
Convex-test + Workpool/Workflow Debugging Summary
Summary of issues encountered when testing long workflows that use workpool with the convex-test backend, and the changes made to convex-test to fix them.
Context
Convexand implements the same syscall interface the real backend uses; your backend code runs in the same Node/Vitest process.main(generation, segment)runs, increments generation, does work, then schedulesupdateRunStatus(newGeneration, segment)withrunAfter(0, ...). WhenupdateRunStatusruns, it expectsstate.generation === generation; otherwise it throwsgeneration mismatch: X !== Y.runAfter(0, ...).Issue 1: Generation mismatch and workpool spin
Symptoms
Error: generation mismatch: 12 !== 4when running scheduled functionloop:updateRunStatus.[complete] … work is done, but its work is goneand workpool reportingrunning: 1indefinitely untilfinishAllScheduledFunctionshit its iteration limit.Root cause
In the original convex-test implementation, every
runAfter(0, ...)became its ownsetTimeout(callback, 0). All such callbacks (from workflow, workpool main, workpool updateRunStatus, kick, etc.) went into the same timer queue and ran in event-loop order, not in the order workpool expects.So:
main(4)runs, commits, schedulesupdateRunStatus(5)withsetTimeout(..., 0).complete()→kick()→main(5)withrunAfter(0)) also schedules withsetTimeout(0).main(5)’s callback runs first, generation advances to 6, 7, … WhenupdateRunStatus(5)finally runs, it seesstate.generation === 12→ generation mismatch. The loop then gets into an inconsistent state (e.g. run status still says “running” but the work doc is gone), leading to the “work is gone” log and the spin.Workpool alone controls the generation number; the bug was ordering: other workpool callbacks (e.g.
mainfrom kick) were running before theupdateRunStatusthat belonged to the previousmain.Fix: Queue-based scheduler with deterministic order
setTimeoutwith a single queue of scheduled jobs (scheduledJobQueue) and a single drain driven by one timer.1.0/schedule: push aScheduledJobEntry(scheduledTime, insertionOrder, componentPath, functionPath, args, jobId, name) onto the queue and callscheduleDrain().scheduleDrain(): if the queue is non-empty, set onesetTimeout(drainScheduledJobs, delay)wheredelay = max(0, nextDue - now).drainScheduledJobs(): in a loop, take all jobs withscheduledTime <= now, sort by (scheduledTime, insertionOrder), remove them from the queue, and run each withrunOneScheduledJob. Then callscheduleDrain()again for any remaining jobs.Effect: “Run now” jobs run in insertion order. So when workpool’s
main(4)schedulesupdateRunStatus(5)withrunAfter(0), that job is the next in line and runs before any laterrunAfter(0)(e.g.main(5)from kick), eliminating the generation mismatch.Issue 2: “Unexpected scheduled function state when starting it: inProgress”
Symptoms
convexTest invariant error: Unexpected scheduled function state when starting it: inProgress.Root cause
Two drains could run at the same time:
runOneScheduledJob(A)(sets A to inProgress, thenawait withAuth().fun(A)).scheduleDrain()(e.g. from a job scheduling more work) fires and Drain 2 starts.duelist was computed earlier and still includes B. Drain 1 then runs B again → job is already inProgress → invariant.So the same job could be executed by two concurrent drains.
Fix: Single drain at a time + defensive skip
drainInProgresson the Convex global. At the start ofdrainScheduledJobs(), ifdrainInProgressis true, return immediately. Set it to true for the duration of the drain and clear it in afinallybefore callingscheduleDrain(). Only one drain runs at a time; a timer that fires while a drain is in progress does nothing, and the current drain will callscheduleDrain()when it finishes.runOneScheduledJob, if the job is alreadyinProgresswhen we’re about to start it (shouldn’t happen with the lock), treat it as a duplicate and return without running or throwing, so we don’t run the same job twice.Code changes in convex-test (summary)
Types and global state
ScheduledJobEntry(scheduledTime, insertionOrder, componentPath, functionPath, parsedArgs, jobId, name).scheduledJobQueue,nextDrainTimerId,scheduledJobInsertionCounter,drainInProgress.scheduleDrain()setTimeout(drainScheduledJobs, delay).drainScheduledJobs()drainInProgress, return. SetdrainInProgress = true, then in a loop: collect due jobs (scheduledTime ≤ now), sort by (scheduledTime, insertionOrder), remove from queue, run each withrunOneScheduledJob. Infinally, setdrainInProgress = falseand callscheduleDrain().runOneScheduledJob(job)1.0/schedulehandlerscheduledJobQueue(with incrementedscheduledJobInsertionCounter), then callscheduleDrain(); no per-jobsetTimeout.convexTest()scheduledJobQueue: [],nextDrainTimerId: null,scheduledJobInsertionCounter: 0,drainInProgress: false.Other learnings
"workflow","transactWorkflow","workflow/workpool", etc.) is supported; each component path gets its own DatabaseFake and module cache, and reference resolution uses the path from the API so tests work as expected.Why
finishAllScheduledFunctionsmay need a highermaxIterationsFor a workflow with ~7 steps you might expect ~14 scheduled function runs (e.g. one per step + one per step complete). In practice,
finishAllScheduledFunctions(maxIterations)can hit the limit with the default 100 and require 200 (or more) even for “small” workflows.How it works: Each iteration does (1)
advanceTimers()(e.g.vi.runAllTimers()), then (2)waitForInProgressScheduledFunctions()until no jobs are in progress. So one iteration = one timer advance + wait for that batch of work to finish.Why the count is higher than “steps × 2”:
One iteration ≠ one scheduled function. With the queue-based scheduler there is one timer per “next due time”. Advancing timers runs one drain, which can run several jobs (e.g.
mainthenupdateRunStatus). So one iteration can run 1–N jobs. The number of iterations is roughly the number of drains (timer fires), not the number of scheduled function executions.Workpool adds many scheduled calls. Besides workflow step run + step complete, workpool’s loop runs main + updateRunStatus per “tick”, and there can be many ticks per step (pending start, completion, cancellation, recovery checks, etc.). So 7 workflow steps can trigger many more than 14 scheduled runs (e.g. dozens of workpool loop ticks).
runAt(future) creates more “waves”. When jobs use
runAt(segmentTime)or recovery intervals, each distinct time gets its own timer. So you get one iteration per such time. Many segments/recovery times ⇒ many iterations.So needing
maxIterationsaround 200 for a 7-step workflow is expected: the real number of “advance + wait” cycles is driven by workpool loop ticks and distinct scheduled times, not just “steps × 2”. Bumping to 200 (or a bit more) for workflow+workpool tests is reasonable; if you still hit the limit, check for unexpectedly many loop ticks or recursive scheduling.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Summary by CodeRabbit
New Features
maxIterationsparameter tofinishAllScheduledFunctions(default increased to 500).Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.