Skip to content

Commit 061a12f

Browse files
chrisguidryclaude
andcommitted
Add TASKS_REDELIVERED counter to track redelivered tasks
This implements issue #152 by adding a counter that tracks how many tasks are started that were redelivered from other workers. The implementation: - Adds transient `redelivered` field to Execution class (defaults to False) - Detects redeliveries in worker using Redis XAUTOCLAIM marker - Increments TASKS_REDELIVERED counter when execution.redelivered is True 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 8eb2b9b commit 061a12f

File tree

4 files changed

+73
-3
lines changed

4 files changed

+73
-3
lines changed

src/docket/execution.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(
5151
key: str,
5252
attempt: int,
5353
trace_context: opentelemetry.context.Context | None = None,
54+
redelivered: bool = False,
5455
) -> None:
5556
self.function = function
5657
self.args = args
@@ -59,6 +60,7 @@ def __init__(
5960
self.key = key
6061
self.attempt = attempt
6162
self.trace_context = trace_context
63+
self.redelivered = redelivered
6264

6365
def as_message(self) -> Message:
6466
return {
@@ -80,6 +82,7 @@ def from_message(cls, function: TaskFunction, message: Message) -> Self:
8082
key=message[b"key"].decode(),
8183
attempt=int(message[b"attempt"].decode()),
8284
trace_context=propagate.extract(message, getter=message_getter),
85+
redelivered=False, # Default to False, will be set to True in worker if it's a redelivery
8386
)
8487

8588
def general_labels(self) -> Mapping[str, str]:

src/docket/instrumentation.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
unit="1",
4141
)
4242

43+
TASKS_REDELIVERED = meter.create_counter(
44+
"docket_tasks_redelivered",
45+
description="How many tasks started that were redelivered from another worker",
46+
unit="1",
47+
)
48+
4349
TASKS_STRICKEN = meter.create_counter(
4450
"docket_tasks_stricken",
4551
description="How many tasks have been stricken from executing",

src/docket/worker.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
TASKS_COMPLETED,
4848
TASKS_FAILED,
4949
TASKS_PERPETUATED,
50+
TASKS_REDELIVERED,
5051
TASKS_RETRIED,
5152
TASKS_RUNNING,
5253
TASKS_STARTED,
@@ -286,7 +287,11 @@ async def get_new_deliveries(redis: Redis) -> RedisReadGroupResponse:
286287
count=available_slots,
287288
)
288289

289-
def start_task(message_id: RedisMessageID, message: RedisMessage) -> bool:
290+
def start_task(
291+
message_id: RedisMessageID,
292+
message: RedisMessage,
293+
is_redelivery: bool = False,
294+
) -> bool:
290295
function_name = message[b"function"].decode()
291296
if not (function := self.docket.tasks.get(function_name)):
292297
logger.warning(
@@ -297,6 +302,7 @@ def start_task(message_id: RedisMessageID, message: RedisMessage) -> bool:
297302
return False
298303

299304
execution = Execution.from_message(function, message)
305+
execution.redelivered = is_redelivery
300306

301307
task = asyncio.create_task(self._execute(execution), name=execution.key)
302308
active_tasks[task] = message_id
@@ -342,12 +348,15 @@ async def ack_message(redis: Redis, message_id: RedisMessageID) -> None:
342348
continue
343349

344350
for source in [get_redeliveries, get_new_deliveries]:
345-
for _, messages in await source(redis):
351+
for stream_key, messages in await source(redis):
352+
is_redelivery = stream_key == b"__redelivery__"
346353
for message_id, message in messages:
347354
if not message: # pragma: no cover
348355
continue
349356

350-
task_started = start_task(message_id, message)
357+
task_started = start_task(
358+
message_id, message, is_redelivery
359+
)
351360
if not task_started:
352361
# Other errors - delete and ack
353362
await self._delete_known_task(redis, message)
@@ -521,6 +530,8 @@ async def _execute(self, execution: Execution) -> None:
521530
duration = 0.0
522531

523532
TASKS_STARTED.add(1, counter_labels)
533+
if execution.redelivered:
534+
TASKS_REDELIVERED.add(1, counter_labels)
524535
TASKS_RUNNING.add(1, counter_labels)
525536
TASK_PUNCTUALITY.record(punctuality, counter_labels)
526537

tests/test_instrumentation.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,14 @@ def TASKS_RETRIED(monkeypatch: pytest.MonkeyPatch) -> Mock:
376376
return mock
377377

378378

379+
@pytest.fixture
380+
def TASKS_REDELIVERED(monkeypatch: pytest.MonkeyPatch) -> Mock:
381+
"""Mock for the TASKS_REDELIVERED counter."""
382+
mock = Mock(spec=Counter.add)
383+
monkeypatch.setattr("docket.instrumentation.TASKS_REDELIVERED.add", mock)
384+
return mock
385+
386+
379387
async def test_worker_execution_increments_task_counters(
380388
docket: Docket,
381389
worker: Worker,
@@ -386,6 +394,7 @@ async def test_worker_execution_increments_task_counters(
386394
TASKS_SUCCEEDED: Mock,
387395
TASKS_FAILED: Mock,
388396
TASKS_RETRIED: Mock,
397+
TASKS_REDELIVERED: Mock,
389398
):
390399
"""Should increment the appropriate task counters when a worker executes a task."""
391400
await docket.add(the_task)()
@@ -397,6 +406,7 @@ async def test_worker_execution_increments_task_counters(
397406
TASKS_SUCCEEDED.assert_called_once_with(1, worker_labels)
398407
TASKS_FAILED.assert_not_called()
399408
TASKS_RETRIED.assert_not_called()
409+
TASKS_REDELIVERED.assert_not_called()
400410

401411

402412
async def test_failed_task_increments_failure_counter(
@@ -409,6 +419,7 @@ async def test_failed_task_increments_failure_counter(
409419
TASKS_SUCCEEDED: Mock,
410420
TASKS_FAILED: Mock,
411421
TASKS_RETRIED: Mock,
422+
TASKS_REDELIVERED: Mock,
412423
):
413424
"""Should increment the TASKS_FAILED counter when a task fails."""
414425
the_task.side_effect = ValueError("Womp")
@@ -422,6 +433,7 @@ async def test_failed_task_increments_failure_counter(
422433
TASKS_FAILED.assert_called_once_with(1, worker_labels)
423434
TASKS_SUCCEEDED.assert_not_called()
424435
TASKS_RETRIED.assert_not_called()
436+
TASKS_REDELIVERED.assert_not_called()
425437

426438

427439
async def test_retried_task_increments_retry_counter(
@@ -433,6 +445,7 @@ async def test_retried_task_increments_retry_counter(
433445
TASKS_SUCCEEDED: Mock,
434446
TASKS_FAILED: Mock,
435447
TASKS_RETRIED: Mock,
448+
TASKS_REDELIVERED: Mock,
436449
):
437450
"""Should increment the TASKS_RETRIED counter when a task is retried."""
438451

@@ -448,6 +461,7 @@ async def the_task(retry: Retry = Retry(attempts=2)):
448461
assert TASKS_FAILED.call_count == 2
449462
assert TASKS_RETRIED.call_count == 1
450463
TASKS_SUCCEEDED.assert_not_called()
464+
TASKS_REDELIVERED.assert_not_called()
451465

452466

453467
async def test_exhausted_retried_task_increments_retry_counter(
@@ -459,6 +473,7 @@ async def test_exhausted_retried_task_increments_retry_counter(
459473
TASKS_SUCCEEDED: Mock,
460474
TASKS_FAILED: Mock,
461475
TASKS_RETRIED: Mock,
476+
TASKS_REDELIVERED: Mock,
462477
):
463478
"""Should increment the appropriate counters when retries are exhausted."""
464479

@@ -474,6 +489,41 @@ async def the_task(retry: Retry = Retry(attempts=1)):
474489
TASKS_FAILED.assert_called_once_with(1, worker_labels)
475490
TASKS_RETRIED.assert_not_called()
476491
TASKS_SUCCEEDED.assert_not_called()
492+
TASKS_REDELIVERED.assert_not_called()
493+
494+
495+
async def test_redelivered_tasks_increment_redelivered_counter(
496+
docket: Docket,
497+
worker_labels: dict[str, str],
498+
TASKS_STARTED: Mock,
499+
TASKS_COMPLETED: Mock,
500+
TASKS_SUCCEEDED: Mock,
501+
TASKS_FAILED: Mock,
502+
TASKS_RETRIED: Mock,
503+
TASKS_REDELIVERED: Mock,
504+
):
505+
"""Should increment the TASKS_REDELIVERED counter for redelivered tasks."""
506+
507+
async def test_task():
508+
await asyncio.sleep(0.01)
509+
510+
await docket.add(test_task)()
511+
512+
worker = Worker(docket, redelivery_timeout=timedelta(milliseconds=50))
513+
514+
async with worker:
515+
worker._execute = AsyncMock(side_effect=Exception("Simulated worker failure")) # type: ignore[assignment]
516+
517+
with pytest.raises(Exception, match="Simulated worker failure"):
518+
await worker.run_until_finished()
519+
520+
await asyncio.sleep(0.075)
521+
522+
worker2 = Worker(docket, redelivery_timeout=timedelta(milliseconds=100))
523+
async with worker2:
524+
await worker2.run_until_finished()
525+
526+
assert TASKS_REDELIVERED.call_count >= 1
477527

478528

479529
@pytest.fixture

0 commit comments

Comments
 (0)