fix(runs): serialize run rollback to prevent state overwrites#2598
Open
mvanhorn wants to merge 1 commit intobytedance:mainfrom
Open
fix(runs): serialize run rollback to prevent state overwrites#2598mvanhorn wants to merge 1 commit intobytedance:mainfrom
mvanhorn wants to merge 1 commit intobytedance:mainfrom
Conversation
Closes bytedance#2505. A run could be marked `interrupted` before its worker had finished cleanup, allowing a newer run for the same thread to start and write fresh checkpoints. The old worker would then complete its rollback, restoring an older snapshot over the new state. Two changes close the race: 1. Two new RunStatus values, `cancelling` and `rolling_back`, sit between `running` and the terminal states. `cancel()` now transitions to `cancelling` (not `interrupted`); the worker is responsible for `rolling_back -> interrupted` (or `-> error` on rollback failure). 2. `create_or_reject` for `interrupt`/`rollback` strategies now awaits the cancelled workers' tasks before creating the new run. `has_inflight` and the inflight check both include the new states, so a third concurrent request also blocks until cleanup completes. Three new tests cover the new behavior: - `has_inflight` includes `cancelling` and `rolling_back` - `create_or_reject` awaits cancelled workers before creating (the regression test for the race) - `create_or_reject` does not stomp `abort_action` on already- cancelling runs The existing `test_cancel` was updated to assert the intermediate `cancelling` state; the worker tests for actual rollback behavior remain green.
Contributor
There was a problem hiding this comment.
Pull request overview
Implements the rollback/interrupt serialization design for #2505 by introducing intermediate run states and ensuring new runs for the same thread don’t start until cancelled workers have finished cleanup/rollback, preventing checkpoint state overwrites.
Changes:
- Added
cancellingandrolling_backrun statuses and treated them as in-flight. - Updated
RunManager.create_or_reject()to cancel inflight runs and await their worker tasks before inserting a new run (for interrupt/rollback strategies). - Updated rollback paths in
run_agent()to markrolling_backbefore performing rollback and to set terminal status afterward; added regression tests.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| backend/packages/harness/deerflow/runtime/runs/schemas.py | Adds cancelling / rolling_back to the RunStatus enum. |
| backend/packages/harness/deerflow/runtime/runs/manager.py | Defines inflight statuses, updates cancellation semantics, and serializes new run creation behind cancelled worker completion. |
| backend/packages/harness/deerflow/runtime/runs/worker.py | Marks rolling_back before rollback work and sets final status after rollback completes/fails. |
| backend/tests/test_run_manager.py | Updates cancel test expectations and adds regression tests covering inflight detection and create-or-reject serialization behavior. |
Comment on lines
+187
to
+190
| # Mark rolling_back BEFORE the rollback work so create_or_reject | ||
| # for the same thread sees this run as still inflight and | ||
| # blocks until rollback completes (#2505). | ||
| await run_manager.set_status(run_id, RunStatus.rolling_back) |
Comment on lines
+163
to
+167
| # Hold the lock only for the inflight check + cancel-signal phase. We | ||
| # then release it to await the worker tasks (so other operations on | ||
| # other threads aren't blocked), and reacquire it to insert the new | ||
| # record. The new record is created with a fresh run_id and is keyed | ||
| # by run_id, so the brief unlocked window cannot collide. |
Comment on lines
+183
to
190
| if r.status in (RunStatus.pending, RunStatus.running): | ||
| r.abort_action = multitask_strategy | ||
| r.abort_event.set() | ||
| if r.task is not None and not r.task.done(): | ||
| r.task.cancel() | ||
| r.status = RunStatus.cancelling | ||
| r.updated_at = now | ||
| if r.task is not None and not r.task.done(): |
Comment on lines
128
to
135
| if record.status not in (RunStatus.pending, RunStatus.running): | ||
| return False | ||
| record.abort_action = action | ||
| record.abort_event.set() | ||
| if record.task is not None and not record.task.done(): | ||
| record.task.cancel() | ||
| record.status = RunStatus.interrupted | ||
| record.status = RunStatus.cancelling | ||
| record.updated_at = _now_iso() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #2505 (P1)
A run could be marked
interruptedbefore its worker had actually finished cleanup. A newer run for the same thread could start and write fresh checkpoints, only for the old worker's rollback to restore an older snapshot over the new state.The closed RFC PR #2504 documented the fix design; this PR is the implementation.
Change
Two pieces close the race.
1. New intermediate states
backend/packages/harness/deerflow/runtime/runs/schemas.py:cancel()now transitions tocancelling(not directly tointerrupted). The worker is responsible for the subsequent progression:2.
create_or_rejectawaits cleanupFor
interrupt/rollbackmultitask strategies, the manager now waits for the cancelled workers' tasks to finish before inserting the new run record. This is the serialization point that closes the race - an old worker can no longer write rollback state on top of a new run because the new run hasn't been created yet.The wait happens with the manager lock released, so other threads' operations are not blocked. Re-entry is safe because the new run gets a fresh
run_idkeying.has_inflightand the internal inflight check now includecancellingandrolling_back, so a third concurrent request for the same thread also blocks until cleanup completes.Tests
backend/tests/test_run_manager.pyadds three regression tests under# --- #2505: rollback-serialization regression tests ---:test_has_inflight_includes_cancelling_and_rolling_back- the new states count as inflight.test_create_or_reject_awaits_cancelled_workers_before_creating- the race-condition guard. Spawns a worker task that performs simulated rollback cleanup, callscreate_or_reject, and asserts the worker finished BEFORE the new run was created (assert order:[\"worker_done\", \"new_created\"]).test_create_or_reject_skips_already_cancelling_runs- re-cancelling acancellingrun does not stomp the originalabort_action.The existing
test_cancelwas updated to assert the intermediatecancellingstate; the docstring now explains that the final state is set by the worker per the new state machine.Adversarial verification
I confirmed the new race test catches the regression by temporarily commenting out the
await asyncio.gather(*tasks_to_await, ...)line. The race test fails with:Restoring the line returns all 14 manager tests to green, plus the 7 existing worker-rollback tests.
Verification
cd backend && PYTHONPATH=. uv run pytest tests/test_run_manager.py tests/test_run_worker_rollback.py -v- 21 passedcd backend && uvx ruff check packages/harness/deerflow/runtime/runs/ tests/test_run_manager.py- clean🤖 Built with assistance from Claude Code.