Skip to content

Commit bf2446b

Browse files
jiaoew1991claude
andcommitted
fix(test): test_offset_commit_atomicity must not kill the source worker
The test was calling kill_random_worker() without a stage filter, so ~1/3 of runs picked the source worker. Source has parallelism=(1,1) and no offset-checkpoint logic, so once the source actor dies the pipeline cannot resume production — and the test ends with ~600 source records never produced, which after the modulo-3 filter shows up as ~199 records missing (matches the CI failure exactly). The chaos tests in test_chaos_stress.py already restrict to stage_id="transform" with a one-line comment explaining why; copy that here. This is the *test* bug. The earlier publish-commit race (PR #84 storage.rs fixes) was a real engine bug and stays fixed. The test also exercises that bug path — but on top of it, this misuse of kill_random_worker was masking the test as still-broken even after the engine-side fix landed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent aea1d19 commit bf2446b

1 file changed

Lines changed: 9 additions & 2 deletions

File tree

engine/tests/test_stability_worker_recovery.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,11 +517,18 @@ async def test_offset_commit_atomicity(self, ray_cluster):
517517
await runner.initialize()
518518
run_task = asyncio.create_task(runner.run())
519519

520-
# Kill one worker after initial progress
520+
# Kill one worker after initial progress.
521+
# Restrict to the transform stage: killing the source worker is
522+
# unrecoverable in this pipeline (source has parallelism=(1,1)
523+
# and no offset checkpoint), so a kill there bypasses the
524+
# offset-commit-atomicity property the test is meant to
525+
# validate and just shows up as ~600 source records never
526+
# produced. The chaos tests in test_chaos_stress.py already
527+
# apply this same restriction for the same reason.
521528
await wait_for_progress(
522529
runner, min_processed=200, timeout=30, collector_name=self.collector_name
523530
)
524-
await kill_random_worker(runner)
531+
await kill_random_worker(runner, stage_id="transform")
525532

526533
await asyncio.wait_for(run_task, timeout=60)
527534
finally:

0 commit comments

Comments
 (0)