Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/docket/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,6 @@ def task_workers_set(self, task_name: str) -> str:

async def _heartbeat(self) -> None:
while True:
await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())
try:
now = datetime.now(timezone.utc).timestamp()
maximum_age = (
Expand Down Expand Up @@ -974,6 +973,8 @@ async def _heartbeat(self) -> None:
extra=self._log_context(),
)

await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())

async def _can_start_task(self, redis: Redis, execution: Execution) -> bool:
"""Check if a task can start based on concurrency limits."""
# Check if task has a concurrency limit dependency
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ async def key_leak_checker(
scheduling_resolution=timedelta(milliseconds=5),
) as temp_worker:
await temp_worker.run_until_finished()
# Clean up heartbeat data to avoid polluting tests that check worker counts
async with docket.redis() as r:
await r.zrem(docket.workers_set, temp_worker.name)
for task_name in docket.tasks:
await r.zrem(docket.task_workers_set(task_name), temp_worker.name)
await r.delete(docket.worker_tasks_set(temp_worker.name))

await checker.capture_baseline()

Expand Down
4 changes: 2 additions & 2 deletions tests/test_instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,8 @@ async def test_worker_publishes_depth_gauges(
async with Worker(docket):
await asyncio.sleep(0.2) # enough for a heartbeat to be published

QUEUE_DEPTH.assert_called_once_with(2, docket_labels)
SCHEDULE_DEPTH.assert_called_once_with(3, docket_labels)
QUEUE_DEPTH.assert_called_with(2, docket_labels)
SCHEDULE_DEPTH.assert_called_with(3, docket_labels)


@pytest.fixture
Expand Down