Skip to content

Commit af039e5

Browse files
chrisguidryclaude
andcommitted
Log errors from failing Perpetual tasks instead of swallowing them
When a Perpetual task fails, `on_complete()` returns `True` which tells the Worker "I handled it" — skipping the normal error log. The reschedule works fine, but the exception is completely invisible. Now we log at ERROR with the full traceback before the reschedule info line. Also fixes the logger names in the private dependency modules (`_perpetual`, `_concurrency`, `_retry`) to use `docket.dependencies` instead of leaking internal names like `docket.dependencies._perpetual` into log output. Closes #360 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7f14054 commit af039e5

File tree

4 files changed

+62
-4
lines changed

4 files changed

+62
-4
lines changed

src/docket/dependencies/_concurrency.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
current_worker,
1717
)
1818

19-
logger = logging.getLogger(__name__)
19+
logger = logging.getLogger("docket.dependencies")
2020

2121
if TYPE_CHECKING: # pragma: no cover
2222
from redis.asyncio import Redis

src/docket/dependencies/_perpetual.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from ..instrumentation import TASKS_PERPETUATED, TASKS_SUPERSEDED
2222

23-
logger = logging.getLogger(__name__)
23+
logger = logging.getLogger("docket.dependencies")
2424

2525

2626
class Perpetual(CompletionHandler["Perpetual"]):
@@ -131,6 +131,15 @@ async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
131131
)
132132

133133
TASKS_PERPETUATED.add(1, {**worker.labels(), **execution.general_labels()})
134+
135+
if outcome.exception:
136+
logger.error(
137+
"↩ [%s] %s",
138+
format_duration(outcome.duration.total_seconds()),
139+
execution.call_repr(),
140+
exc_info=outcome.exception,
141+
)
142+
134143
logger.info(
135144
"↫ [%s] %s",
136145
format_duration(outcome.duration.total_seconds()),

src/docket/dependencies/_retry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from ..instrumentation import TASKS_RETRIED
2121

22-
logger = logging.getLogger(__name__)
22+
logger = logging.getLogger("docket.dependencies")
2323

2424

2525
class ForcedRetry(Exception):

tests/fundamentals/test_perpetual.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
"""Tests for Perpetual dependency (automatically rescheduling tasks)."""
22

3+
import logging
34
from datetime import datetime, timedelta, timezone
45

5-
from docket import Docket, ExecutionState, Perpetual, Worker
6+
import pytest
7+
8+
from docket import Depends, Docket, ExecutionState, Perpetual, Worker
69

710

811
async def test_perpetual_tasks(docket: Docket, worker: Worker):
@@ -106,6 +109,52 @@ async def perpetual_task(
106109
assert calls == 3
107110

108111

112+
async def test_perpetual_tasks_log_errors(
113+
docket: Docket, worker: Worker, caplog: pytest.LogCaptureFixture
114+
):
115+
"""Failing Perpetual tasks log the exception instead of swallowing it."""
116+
117+
async def perpetual_task(
118+
perpetual: Perpetual = Perpetual(every=timedelta(milliseconds=50)),
119+
):
120+
raise ValueError("something broke")
121+
122+
execution = await docket.add(perpetual_task)()
123+
124+
with caplog.at_level(logging.ERROR):
125+
await worker.run_at_most({execution.key: 1})
126+
127+
error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
128+
assert any(
129+
"something broke" in r.getMessage()
130+
or (r.exc_info and "something broke" in str(r.exc_info[1]))
131+
for r in error_records
132+
)
133+
134+
135+
async def test_perpetual_tasks_log_dependency_errors(
136+
docket: Docket, worker: Worker, caplog: pytest.LogCaptureFixture
137+
):
138+
"""Failing dependencies in Perpetual tasks log the error instead of swallowing it."""
139+
140+
async def broken_dep() -> str:
141+
raise RuntimeError("dependency failed")
142+
143+
async def perpetual_task(
144+
value: str = Depends(broken_dep),
145+
perpetual: Perpetual = Perpetual(every=timedelta(milliseconds=50)),
146+
):
147+
pass
148+
149+
execution = await docket.add(perpetual_task)()
150+
151+
with caplog.at_level(logging.ERROR):
152+
await worker.run_at_most({execution.key: 1})
153+
154+
error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
155+
assert any("dependency failed" in (r.exc_text or "") for r in error_records)
156+
157+
109158
async def test_perpetual_tasks_can_be_automatically_scheduled(
110159
docket: Docket, worker: Worker
111160
):

0 commit comments

Comments
 (0)