Skip to content

Commit 4ab39d2

Browse files
chrisguidryclaude
andauthored
Log errors from failing Perpetual tasks (#361)
When a Perpetual task fails, `on_complete()` returns `True` which tells the Worker "I handled it" — skipping the normal error log. The reschedule works fine, but the exception is completely invisible. Now we log at ERROR with the full traceback before the reschedule info line. Also fixes the logger names in the private dependency modules (`_perpetual`, `_concurrency`, `_retry`) to use `docket.dependencies` instead of leaking internal names like `docket.dependencies._perpetual` into log output. Closes #360 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7f14054 commit 4ab39d2

File tree

5 files changed

+97
-4
lines changed

5 files changed

+97
-4
lines changed

src/docket/dependencies/_concurrency.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
current_worker,
1717
)
1818

19-
logger = logging.getLogger(__name__)
19+
logger = logging.getLogger("docket.dependencies")
2020

2121
if TYPE_CHECKING: # pragma: no cover
2222
from redis.asyncio import Redis

src/docket/dependencies/_perpetual.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from ..instrumentation import TASKS_PERPETUATED, TASKS_SUPERSEDED
2222

23-
logger = logging.getLogger(__name__)
23+
logger = logging.getLogger("docket.dependencies")
2424

2525

2626
class Perpetual(CompletionHandler["Perpetual"]):
@@ -131,6 +131,15 @@ async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
131131
)
132132

133133
TASKS_PERPETUATED.add(1, {**worker.labels(), **execution.general_labels()})
134+
135+
if outcome.exception:
136+
logger.error(
137+
"↩ [%s] %s",
138+
format_duration(outcome.duration.total_seconds()),
139+
execution.call_repr(),
140+
exc_info=outcome.exception,
141+
)
142+
134143
logger.info(
135144
"↫ [%s] %s",
136145
format_duration(outcome.duration.total_seconds()),

src/docket/dependencies/_retry.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from ..instrumentation import TASKS_RETRIED
2121

22-
logger = logging.getLogger(__name__)
22+
logger = logging.getLogger("docket.dependencies")
2323

2424

2525
class ForcedRetry(Exception):
@@ -89,6 +89,15 @@ async def handle_failure(self, execution: Execution, outcome: TaskOutcome) -> bo
8989

9090
worker = current_worker.get()
9191
TASKS_RETRIED.add(1, {**worker.labels(), **execution.general_labels()})
92+
93+
if outcome.exception:
94+
logger.error(
95+
"↩ [%s] %s",
96+
format_duration(outcome.duration.total_seconds()),
97+
execution.call_repr(),
98+
exc_info=outcome.exception,
99+
)
100+
92101
logger.info(
93102
"↫ [%s] %s",
94103
format_duration(outcome.duration.total_seconds()),

tests/fundamentals/test_perpetual.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
"""Tests for Perpetual dependency (automatically rescheduling tasks)."""
22

3+
import logging
34
from datetime import datetime, timedelta, timezone
45

5-
from docket import Docket, ExecutionState, Perpetual, Worker
6+
import pytest
7+
8+
from docket import Depends, Docket, ExecutionState, Perpetual, Worker
69

710

811
async def test_perpetual_tasks(docket: Docket, worker: Worker):
@@ -106,6 +109,52 @@ async def perpetual_task(
106109
assert calls == 3
107110

108111

112+
async def test_perpetual_tasks_log_errors(
113+
docket: Docket, worker: Worker, caplog: pytest.LogCaptureFixture
114+
):
115+
"""Failing Perpetual tasks log the exception instead of swallowing it."""
116+
117+
async def perpetual_task(
118+
perpetual: Perpetual = Perpetual(every=timedelta(milliseconds=50)),
119+
):
120+
raise ValueError("something broke")
121+
122+
execution = await docket.add(perpetual_task)()
123+
124+
with caplog.at_level(logging.ERROR):
125+
await worker.run_at_most({execution.key: 1})
126+
127+
error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
128+
assert any(
129+
"something broke" in r.getMessage()
130+
or (r.exc_info and "something broke" in str(r.exc_info[1]))
131+
for r in error_records
132+
)
133+
134+
135+
async def test_perpetual_tasks_log_dependency_errors(
136+
docket: Docket, worker: Worker, caplog: pytest.LogCaptureFixture
137+
):
138+
"""Failing dependencies in Perpetual tasks log the error instead of swallowing it."""
139+
140+
async def broken_dep() -> str:
141+
raise RuntimeError("dependency failed")
142+
143+
async def perpetual_task(
144+
value: str = Depends(broken_dep),
145+
perpetual: Perpetual = Perpetual(every=timedelta(milliseconds=50)),
146+
):
147+
pass
148+
149+
execution = await docket.add(perpetual_task)()
150+
151+
with caplog.at_level(logging.ERROR):
152+
await worker.run_at_most({execution.key: 1})
153+
154+
error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
155+
assert any("dependency failed" in (r.exc_text or "") for r in error_records)
156+
157+
109158
async def test_perpetual_tasks_can_be_automatically_scheduled(
110159
docket: Docket, worker: Worker
111160
):

tests/fundamentals/test_retries.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for error handling and retry strategies."""
22

3+
import logging
34
from datetime import datetime, timedelta
45
from typing import Callable
56
from unittest.mock import AsyncMock
@@ -28,6 +29,31 @@ async def test_errors_are_logged(
2829
assert "Faily McFailerson" in caplog.text
2930

3031

32+
async def test_retry_errors_are_logged(
33+
docket: Docket, worker: Worker, caplog: pytest.LogCaptureFixture
34+
):
35+
"""Each retry attempt logs the exception at ERROR level."""
36+
calls = 0
37+
38+
async def the_task(retry: Retry = Retry(attempts=3)):
39+
nonlocal calls
40+
calls += 1
41+
raise ValueError("attempt failed")
42+
43+
await docket.add(the_task)()
44+
45+
with caplog.at_level(logging.ERROR):
46+
await worker.run_until_finished()
47+
48+
assert calls == 3
49+
error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
50+
assert any(
51+
"attempt failed" in r.getMessage()
52+
or (r.exc_info and "attempt failed" in str(r.exc_info[1]))
53+
for r in error_records
54+
)
55+
56+
3157
async def test_supports_simple_linear_retries(
3258
docket: Docket, worker: Worker, now: Callable[[], datetime]
3359
):

0 commit comments

Comments
 (0)