Skip to content

Commit 6d83b40

Browse files
Fix OpenTelemetry span status for failed tasks (#136) (#137)
## Summary - Fix OpenTelemetry spans not being marked as failing when tasks fail and are retried - Set span status to ERROR when tasks fail, regardless of retries - Set span status to OK when tasks succeed - Use record_exception() for proper exception tracking - Capture span directly from context manager instead of get_current_span() Closes #136 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Chris Pickett <chris.pickett@gmail.com>
1 parent ca58e02 commit 6d83b40

File tree

2 files changed

+100
-3
lines changed

2 files changed

+100
-3
lines changed

src/docket/worker.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616

1717
from opentelemetry import trace
18-
from opentelemetry.trace import Tracer
18+
from opentelemetry.trace import Status, StatusCode, Tracer
1919
from redis.asyncio import Redis
2020
from redis.exceptions import ConnectionError, LockError
2121

@@ -531,7 +531,7 @@ async def _execute(self, execution: Execution) -> None:
531531
"code.function.name": execution.function.__name__,
532532
},
533533
links=execution.incoming_span_links(),
534-
):
534+
) as span:
535535
try:
536536
async with resolved_dependencies(self, execution) as dependencies:
537537
# Preemptively reschedule the perpetual task for the future, or clear
@@ -576,6 +576,8 @@ async def _execute(self, execution: Execution) -> None:
576576
duration = log_context["duration"] = time.time() - start
577577
TASKS_SUCCEEDED.add(1, counter_labels)
578578

579+
span.set_status(Status(StatusCode.OK))
580+
579581
rescheduled = await self._perpetuate_if_requested(
580582
execution, dependencies, timedelta(seconds=duration)
581583
)
@@ -584,10 +586,13 @@ async def _execute(self, execution: Execution) -> None:
584586
logger.info(
585587
"%s [%s] %s", arrow, ms(duration), call, extra=log_context
586588
)
587-
except Exception:
589+
except Exception as e:
588590
duration = log_context["duration"] = time.time() - start
589591
TASKS_FAILED.add(1, counter_labels)
590592

593+
span.record_exception(e)
594+
span.set_status(Status(StatusCode.ERROR, str(e)))
595+
591596
retried = await self._retry_if_requested(execution, dependencies)
592597
if not retried:
593598
retried = await self._perpetuate_if_requested(

tests/test_instrumentation.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from opentelemetry.metrics import Counter, Histogram, UpDownCounter
1111
from opentelemetry.metrics import _Gauge as Gauge
1212
from opentelemetry.sdk.trace import Span, TracerProvider
13+
from opentelemetry.trace import StatusCode
1314

1415
from docket import Docket, Worker
1516
from docket.dependencies import Retry
@@ -98,6 +99,97 @@ async def the_task():
9899
assert link.context.span_id == originating_span.context.span_id
99100

100101

102+
async def test_failed_task_span_has_error_status(docket: Docket, worker: Worker):
103+
"""When a task fails, its span should have ERROR status."""
104+
captured: list[Span] = []
105+
106+
async def the_failing_task():
107+
span = trace.get_current_span()
108+
assert isinstance(span, Span)
109+
captured.append(span)
110+
raise ValueError("Task failed")
111+
112+
await docket.add(the_failing_task)()
113+
await worker.run_until_finished()
114+
115+
assert len(captured) == 1
116+
(task_span,) = captured
117+
118+
assert isinstance(task_span, Span)
119+
assert task_span.status is not None
120+
assert task_span.status.status_code == StatusCode.ERROR
121+
assert task_span.status.description is not None
122+
assert "Task failed" in task_span.status.description
123+
124+
125+
async def test_retried_task_spans_have_error_status(docket: Docket, worker: Worker):
126+
"""When a task fails and is retried, each failed attempt's span should have ERROR status."""
127+
captured: list[Span] = []
128+
attempt_count = 0
129+
130+
async def the_retrying_task(retry: Retry = Retry(attempts=3)):
131+
nonlocal attempt_count
132+
attempt_count += 1
133+
span = trace.get_current_span()
134+
assert isinstance(span, Span)
135+
captured.append(span)
136+
137+
if attempt_count < 3:
138+
raise ValueError(f"Attempt {attempt_count} failed")
139+
# Third attempt succeeds
140+
141+
await docket.add(the_retrying_task)()
142+
await worker.run_until_finished()
143+
144+
assert len(captured) == 3
145+
146+
# First two attempts should have ERROR status
147+
for i in range(2):
148+
span = captured[i]
149+
assert isinstance(span, Span)
150+
assert span.status is not None
151+
assert span.status.status_code == StatusCode.ERROR
152+
assert span.status.description is not None
153+
assert f"Attempt {i + 1} failed" in span.status.description
154+
155+
# Third attempt should have OK status (or no status set, which is treated as OK)
156+
success_span = captured[2]
157+
assert isinstance(success_span, Span)
158+
assert (
159+
success_span.status is None or success_span.status.status_code == StatusCode.OK
160+
)
161+
162+
163+
async def test_infinitely_retrying_task_spans_have_error_status(
164+
docket: Docket, worker: Worker
165+
):
166+
"""When a task with infinite retries fails, each attempt's span should have ERROR status."""
167+
captured: list[Span] = []
168+
attempt_count = 0
169+
170+
async def the_infinite_retry_task(retry: Retry = Retry(attempts=None)):
171+
nonlocal attempt_count
172+
attempt_count += 1
173+
span = trace.get_current_span()
174+
assert isinstance(span, Span)
175+
captured.append(span)
176+
raise ValueError(f"Attempt {attempt_count} failed")
177+
178+
execution = await docket.add(the_infinite_retry_task)()
179+
180+
# Run worker for only 3 task executions of this specific task
181+
await worker.run_at_most({execution.key: 3})
182+
183+
# All captured spans should have ERROR status
184+
assert len(captured) == 3
185+
for i, span in enumerate(captured):
186+
assert isinstance(span, Span)
187+
assert span.status is not None
188+
assert span.status.status_code == StatusCode.ERROR
189+
assert span.status.description is not None
190+
assert f"Attempt {i + 1} failed" in span.status.description
191+
192+
101193
async def test_message_getter_returns_none_for_missing_key():
102194
"""Should return None when a key is not present in the message."""
103195

0 commit comments

Comments
 (0)