Skip to content

Commit bba3211

Browse files
chrisguidryclaude
andcommitted
Add TASKS_SUPERSEDED metric and move claim() before all metrics/timing
The supersession check now happens right after the strikelist check, before execution counts, timing, or any lifecycle metrics. This means superseded tasks don't affect any counters at all. A new TASKS_SUPERSEDED counter tracks how often this happens, with a docket.where label to distinguish the two check points: - "worker": head check in claim() before the task runs - "on_complete": tail check in Perpetual.on_complete() after it runs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b76dd8d commit bba3211

File tree

4 files changed

+119
-8
lines changed

4 files changed

+119
-8
lines changed

src/docket/dependencies/_perpetual.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
if TYPE_CHECKING: # pragma: no cover
1212
from ..execution import Execution
1313

14-
from ..instrumentation import TASKS_PERPETUATED
14+
from ..instrumentation import TASKS_PERPETUATED, TASKS_SUPERSEDED
1515

1616
logger = logging.getLogger(__name__)
1717

@@ -95,6 +95,15 @@ async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
9595
return False
9696

9797
if await execution.is_superseded():
98+
worker = self.worker.get()
99+
TASKS_SUPERSEDED.add(
100+
1,
101+
{
102+
**worker.labels(),
103+
**execution.general_labels(),
104+
"docket.where": "on_complete",
105+
},
106+
)
98107
logger.info(
99108
"↬ [%s] %s (superseded)",
100109
format_duration(outcome.duration.total_seconds()),

src/docket/instrumentation.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
unit="1",
5151
)
5252

53+
TASKS_SUPERSEDED = meter.create_counter(
54+
"docket_tasks_superseded",
55+
description="How many tasks were superseded by a newer schedule before execution",
56+
unit="1",
57+
)
58+
5359
TASKS_COMPLETED = meter.create_counter(
5460
"docket_tasks_completed",
5561
description="How many tasks that have completed in any state",

src/docket/worker.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
TASKS_STARTED,
7878
TASKS_STRICKEN,
7979
TASKS_SUCCEEDED,
80+
TASKS_SUPERSEDED,
8081
healthcheck_server,
8182
metrics_server,
8283
)
@@ -799,6 +800,12 @@ async def _execute(self, execution: Execution) -> None:
799800
TASKS_STRICKEN.add(1, counter_labels | {"docket.where": "worker"})
800801
return
801802

803+
# Atomically check supersession and claim task in a single round-trip
804+
if not await execution.claim(self.name):
805+
logger.info("↬ %s (superseded)", call, extra=log_context)
806+
TASKS_SUPERSEDED.add(1, counter_labels | {"docket.where": "worker"})
807+
return
808+
802809
if execution.key in self._execution_counts:
803810
self._execution_counts[execution.key] += 1
804811

@@ -818,11 +825,6 @@ async def _execute(self, execution: Execution) -> None:
818825
"%s [%s] %s", arrow, format_duration(punctuality), call, extra=log_context
819826
)
820827

821-
# Atomically check supersession and claim task in a single round-trip
822-
if not await execution.claim(self.name):
823-
logger.info("↬ %s (superseded)", call, extra=log_context)
824-
return
825-
826828
dependencies: dict[str, Dependency] = {}
827829

828830
with tracer.start_as_current_span(

tests/instrumentation/test_counters.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
import asyncio
44
import sys
5-
from datetime import timedelta
5+
from datetime import datetime, timedelta, timezone
66
from unittest.mock import AsyncMock, Mock
77

88
import pytest
99

1010
if sys.version_info < (3, 11): # pragma: no cover
1111
from exceptiongroup import ExceptionGroup
12-
from opentelemetry.metrics import Counter
12+
from opentelemetry.metrics import Counter, UpDownCounter
1313

1414
from docket import Docket, Worker
1515
from docket.dependencies import Perpetual, Retry
@@ -361,3 +361,97 @@ async def test_task():
361361
await worker2.run_until_finished()
362362

363363
assert TASKS_REDELIVERED.call_count >= 1
364+
365+
366+
@pytest.fixture
367+
def TASKS_RUNNING(monkeypatch: pytest.MonkeyPatch) -> Mock:
368+
"""Mock for the TASKS_RUNNING up-down counter."""
369+
mock_obj = Mock(spec=UpDownCounter.add)
370+
monkeypatch.setattr("docket.instrumentation.TASKS_RUNNING.add", mock_obj)
371+
return mock_obj
372+
373+
374+
@pytest.fixture
375+
def TASKS_SUPERSEDED(monkeypatch: pytest.MonkeyPatch) -> Mock:
376+
"""Mock for the TASKS_SUPERSEDED counter."""
377+
mock_obj = Mock(spec=Counter.add)
378+
monkeypatch.setattr("docket.instrumentation.TASKS_SUPERSEDED.add", mock_obj)
379+
return mock_obj
380+
381+
382+
async def test_superseded_task_increments_superseded_counter(
383+
docket: Docket,
384+
worker: Worker,
385+
TASKS_STARTED: Mock,
386+
TASKS_COMPLETED: Mock,
387+
TASKS_RUNNING: Mock,
388+
TASKS_SUPERSEDED: Mock,
389+
):
390+
"""Superseded tasks increment TASKS_SUPERSEDED but not lifecycle metrics.
391+
392+
When claim() detects that a task has been superseded by a newer generation,
393+
the worker records TASKS_SUPERSEDED with docket.where=worker, but doesn't
394+
touch TASKS_STARTED, TASKS_RUNNING, or TASKS_COMPLETED.
395+
"""
396+
397+
async def superseded_task():
398+
pass # pragma: no cover
399+
400+
await docket.add(superseded_task, key="metrics-superseded")()
401+
402+
# Bump the generation so the worker sees the message as superseded
403+
async with docket.redis() as redis:
404+
await redis.hincrby( # type: ignore[misc]
405+
docket.key("runs:metrics-superseded"), "generation", 1
406+
)
407+
408+
await worker.run_until_finished()
409+
410+
TASKS_SUPERSEDED.assert_called_once_with(
411+
1,
412+
{
413+
"docket.name": docket.name,
414+
"docket.worker": worker.name,
415+
"docket.task": "superseded_task",
416+
"docket.where": "worker",
417+
},
418+
)
419+
TASKS_STARTED.assert_not_called()
420+
TASKS_COMPLETED.assert_not_called()
421+
TASKS_RUNNING.assert_not_called()
422+
423+
424+
async def test_replaced_task_only_counts_replacement(
425+
docket: Docket,
426+
worker: Worker,
427+
TASKS_STARTED: Mock,
428+
TASKS_COMPLETED: Mock,
429+
TASKS_RUNNING: Mock,
430+
TASKS_SUCCEEDED: Mock,
431+
TASKS_SUPERSEDED: Mock,
432+
):
433+
"""When a task is replaced before execution, only the replacement runs.
434+
435+
In the normal case, replace() successfully deletes the old stream message
436+
via XDEL, so the worker only sees the replacement. No supersession occurs
437+
because the stale message is already gone.
438+
"""
439+
440+
async def replaceable_task():
441+
pass
442+
443+
await docket.add(replaceable_task, key="metrics-replace")()
444+
await docket.replace(
445+
replaceable_task, datetime.now(timezone.utc), "metrics-replace"
446+
)()
447+
448+
await worker.run_until_finished()
449+
450+
TASKS_SUPERSEDED.assert_not_called()
451+
TASKS_STARTED.assert_called_once()
452+
TASKS_COMPLETED.assert_called_once()
453+
TASKS_SUCCEEDED.assert_called_once()
454+
# TASKS_RUNNING: +1 then -1
455+
assert TASKS_RUNNING.call_count == 2
456+
increments = [c.args[0] for c in TASKS_RUNNING.call_args_list]
457+
assert sum(increments) == 0, "running gauge should be balanced"

0 commit comments

Comments
 (0)