Skip to content

Commit 5f37218

Browse files
desertaxleclaude
andcommitted
Fix race condition in subscribe() for fast-completing tasks
The watch command was failing in CI when tasks completed quickly because: - subscribe() would sync() and find task already completed - Progress data would be deleted on completion - Watch would never see progress events Solution: subscribe() now emits initial progress event along with initial state event, ensuring subscribers capture progress state before deletion. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 5aea763 commit 5f37218

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

src/docket/execution.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,25 @@ async def subscribe(self) -> AsyncGenerator[dict[str, Any], None]:
757757

758758
yield initial_state
759759

760+
# Also emit initial progress data if available
761+
# This ensures subscribers see progress state even if task completes quickly
762+
# and progress data is deleted before pub/sub connection is established
763+
if self.progress.current is not None or self.progress.total != 100:
764+
progress_event: dict[str, Any] = {
765+
"type": "progress",
766+
"key": self.key,
767+
"current": self.progress.current
768+
if self.progress.current is not None
769+
else 0,
770+
"total": self.progress.total,
771+
}
772+
if self.progress.message:
773+
progress_event["message"] = self.progress.message
774+
if self.progress.updated_at:
775+
progress_event["updated_at"] = self.progress.updated_at.isoformat()
776+
777+
yield progress_event
778+
760779
# Then subscribe to real-time updates
761780
state_channel = f"{self.docket.name}:state:{self.key}"
762781
progress_channel = f"{self.docket.name}:progress:{self.key}"

tests/cli/test_watch.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,13 @@ async def task_with_progress(progress: ExecutionProgress = Progress()):
122122

123123
worker_task = asyncio.create_task(worker.run_until_finished())
124124

125-
# Give worker time to start the task
126-
await asyncio.sleep(0.05)
127-
128125
result = await run_cli(
129126
"watch",
130127
"progress-task",
131128
"--url",
132129
docket.url,
133130
"--docket",
134131
docket.name,
135-
timeout=3.0,
136132
)
137133

138134
await worker_task
@@ -263,7 +259,7 @@ async def task_that_waits_then_progresses(progress: ExecutionProgress = Progress
263259
docket.register(task_that_waits_then_progresses)
264260

265261
# Schedule task for slightly in future so watch can connect first
266-
when = datetime.now(timezone.utc) + timedelta(milliseconds=200)
262+
when = datetime.now(timezone.utc) + timedelta(seconds=1)
267263
await docket.add(task_that_waits_then_progresses, when=when, key="timing-test")()
268264

269265
worker_task = asyncio.create_task(worker.run_until_finished())

0 commit comments

Comments
 (0)