Skip to content

Commit 433df9b

Browse files
chrisguidryclaude
andauthored
Fix test_task_announcements flake with immediate heartbeat (#216)
Workers now send their first heartbeat immediately on startup instead of waiting for the first interval. This fixes a race condition where `task_workers()` could be called before any heartbeat was sent on heavily loaded CI machines. Changes: - Reorder `_heartbeat()` to send heartbeat before sleeping - Update `test_worker_publishes_depth_gauges` to allow multiple heartbeats - Clean up temp worker heartbeat data in `key_leak_checker` fixture Flake: https://github.com/chrisguidry/docket/actions/runs/20075820763/job/57589737809 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2507be2 commit 433df9b

File tree

3 files changed

+10
-3
lines changed

3 files changed

+10
-3
lines changed

src/docket/worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,6 @@ def task_workers_set(self, task_name: str) -> str:
915915

916916
async def _heartbeat(self) -> None:
917917
while True:
918-
await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())
919918
try:
920919
now = datetime.now(timezone.utc).timestamp()
921920
maximum_age = (
@@ -974,6 +973,8 @@ async def _heartbeat(self) -> None:
974973
extra=self._log_context(),
975974
)
976975

976+
await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())
977+
977978
async def _can_start_task(self, redis: Redis, execution: Execution) -> bool:
978979
"""Check if a task can start based on concurrency limits."""
979980
# Check if task has a concurrency limit dependency

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,12 @@ async def key_leak_checker(
234234
scheduling_resolution=timedelta(milliseconds=5),
235235
) as temp_worker:
236236
await temp_worker.run_until_finished()
237+
# Clean up heartbeat data to avoid polluting tests that check worker counts
238+
async with docket.redis() as r:
239+
await r.zrem(docket.workers_set, temp_worker.name)
240+
for task_name in docket.tasks:
241+
await r.zrem(docket.task_workers_set(task_name), temp_worker.name)
242+
await r.delete(docket.worker_tasks_set(temp_worker.name))
237243

238244
await checker.capture_baseline()
239245

tests/test_instrumentation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -766,8 +766,8 @@ async def test_worker_publishes_depth_gauges(
766766
async with Worker(docket):
767767
await asyncio.sleep(0.2) # enough for a heartbeat to be published
768768

769-
QUEUE_DEPTH.assert_called_once_with(2, docket_labels)
770-
SCHEDULE_DEPTH.assert_called_once_with(3, docket_labels)
769+
QUEUE_DEPTH.assert_called_with(2, docket_labels)
770+
SCHEDULE_DEPTH.assert_called_with(3, docket_labels)
771771

772772

773773
@pytest.fixture

0 commit comments

Comments
 (0)