Skip to content
2 changes: 1 addition & 1 deletion loq.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ max_lines = 750
# Source files that still need exceptions above 750
[[rules]]
path = "src/docket/worker.py"
max_lines = 1141
max_lines = 1150

[[rules]]
path = "src/docket/cli/__init__.py"
Expand Down
4 changes: 4 additions & 0 deletions src/docket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from .annotations import Logged
from .dependencies import (
ConcurrencyLimit,
Cooldown,
Cron,
Debounce,
CurrentDocket,
CurrentExecution,
CurrentWorker,
Expand All @@ -37,7 +39,9 @@
"__version__",
"Agenda",
"ConcurrencyLimit",
"Cooldown",
"Cron",
"Debounce",
"CurrentDocket",
"CurrentExecution",
"CurrentWorker",
Expand Down
6 changes: 6 additions & 0 deletions src/docket/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
format_duration,
)
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
from ._cooldown import Cooldown, CooldownBlocked
from ._debounce import Debounce, DebounceBlocked
from ._cron import Cron
from ._contextual import (
CurrentDocket,
Expand Down Expand Up @@ -83,6 +85,10 @@
"AdmissionBlocked",
"ConcurrencyBlocked",
"ConcurrencyLimit",
"Cooldown",
"CooldownBlocked",
"Debounce",
"DebounceBlocked",
"Cron",
"Perpetual",
"Progress",
Expand Down
7 changes: 7 additions & 0 deletions src/docket/dependencies/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ class AdmissionBlocked(Exception):

This is the base exception for admission control mechanisms like
concurrency limits, rate limits, or health gates.

When ``reschedule`` is True (default), the worker re-queues the task
with a short delay. When False, the task is silently acknowledged
and dropped (appropriate for debounce/cooldown where re-trying would
just hit the same window).
"""

reschedule: bool = True

def __init__(self, execution: Execution, reason: str = "admission control"):
self.execution = execution
self.reason = reason
Expand Down
94 changes: 94 additions & 0 deletions src/docket/dependencies/_cooldown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Cooldown (trailing-edge) admission control dependency."""

from __future__ import annotations

from datetime import timedelta
from types import TracebackType
from typing import TYPE_CHECKING, Any

from ._base import AdmissionBlocked, Dependency, current_docket, current_execution

if TYPE_CHECKING: # pragma: no cover
from ..execution import Execution


class CooldownBlocked(AdmissionBlocked):
"""Raised when a task is blocked by cooldown."""

reschedule = False

def __init__(self, execution: Execution, cooldown_key: str, window: timedelta):
self.cooldown_key = cooldown_key
self.window = window
reason = f"cooldown ({window}) on {cooldown_key}"
super().__init__(execution, reason=reason)


class Cooldown(Dependency["Cooldown"]):
"""Trailing-edge cooldown: blocks execution if one recently succeeded.

Checks for a Redis key on entry. If present, the task is blocked.
The key is only set on *successful* exit, so failed tasks don't
trigger the cooldown — they can be retried immediately.

Works both as a default parameter and as ``Annotated`` metadata::

# Per-task: don't start if one succeeded in the last 60s
async def send_digest(
cooldown: Cooldown = Cooldown(timedelta(seconds=60)),
) -> None: ...

# Per-parameter: don't start for this customer if one succeeded in the last 60s
async def send_notification(
customer_id: Annotated[int, Cooldown(timedelta(seconds=60))],
) -> None: ...
"""

single: bool = True

def __init__(self, window: timedelta, *, scope: str | None = None) -> None:
self.window = window
self.scope = scope
self._argument_name: str | None = None
self._argument_value: Any = None

def bind_to_parameter(self, name: str, value: Any) -> Cooldown:
bound = Cooldown(self.window, scope=self.scope)
bound._argument_name = name
bound._argument_value = value
return bound

def _cooldown_key(self, function_name: str) -> str:
scope = self.scope or current_docket.get().name
if self._argument_name is not None:
return f"{scope}:cooldown:{self._argument_name}:{self._argument_value}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use bound argument values for per-parameter cooldown keys

The per-parameter Cooldown key also depends on _argument_value, which is captured from pre-bound values and can be None for positional calls, so different positional inputs collapse onto the same Redis key (for example, task(1) and task(2) both map to ...:customer_id:None). That makes cooldown block unrelated argument values and silently drops valid executions.

Useful? React with 👍 / 👎.

return f"{scope}:cooldown:{function_name}"

async def __aenter__(self) -> Cooldown:
execution = current_execution.get()
docket = current_docket.get()

self._key = self._cooldown_key(execution.function_name)

async with docket.redis() as redis:
exists = await redis.exists(self._key)

if exists:
raise CooldownBlocked(execution, self._key, self.window)

return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if exc_type is not None:
return

docket = current_docket.get()
window_ms = int(self.window.total_seconds() * 1000)

async with docket.redis() as redis:
await redis.set(self._key, 1, px=window_ms)
88 changes: 88 additions & 0 deletions src/docket/dependencies/_debounce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Debounce (leading-edge) admission control dependency."""

from __future__ import annotations

from datetime import timedelta
from types import TracebackType
from typing import TYPE_CHECKING, Any

from ._base import AdmissionBlocked, Dependency, current_docket, current_execution

if TYPE_CHECKING: # pragma: no cover
from ..execution import Execution


class DebounceBlocked(AdmissionBlocked):
"""Raised when a task is blocked by debounce."""

reschedule = False

def __init__(self, execution: Execution, debounce_key: str, window: timedelta):
self.debounce_key = debounce_key
self.window = window
reason = f"debounce ({window}) on {debounce_key}"
super().__init__(execution, reason=reason)


class Debounce(Dependency["Debounce"]):
"""Leading-edge debounce: blocks execution if one was recently started.

Sets a Redis key on entry with a TTL equal to the window. If the key
already exists, the task is blocked via ``AdmissionBlocked``.

Works both as a default parameter and as ``Annotated`` metadata::

# Per-task: don't start if one started in the last 30s
async def process_webhooks(
debounce: Debounce = Debounce(timedelta(seconds=30)),
) -> None: ...

# Per-parameter: don't start for this customer if one started in the last 30s
async def process_customer(
customer_id: Annotated[int, Debounce(timedelta(seconds=30))],
) -> None: ...
"""

single: bool = True

def __init__(self, window: timedelta, *, scope: str | None = None) -> None:
self.window = window
self.scope = scope
self._argument_name: str | None = None
self._argument_value: Any = None

def bind_to_parameter(self, name: str, value: Any) -> Debounce:
bound = Debounce(self.window, scope=self.scope)
bound._argument_name = name
bound._argument_value = value
return bound

async def __aenter__(self) -> Debounce:
execution = current_execution.get()
docket = current_docket.get()

scope = self.scope or docket.name
if self._argument_name is not None:
debounce_key = (
f"{scope}:debounce:{self._argument_name}:{self._argument_value}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use bound argument values for per-parameter debounce keys

When Debounce is used via Annotated, the key is built from _argument_value, but that value is captured before positional args are rebound to parameter names; in resolved_dependencies this comes from execution.kwargs.get(...), so calls like await docket.add(task)(1) and await docket.add(task)(2) both produce a ...:customer_id:None key and one task is incorrectly dropped. This breaks per-value debouncing for positional invocation and causes false positives in normal task usage.

Useful? React with 👍 / 👎.

)
else:
debounce_key = f"{scope}:debounce:{execution.function_name}"

window_ms = int(self.window.total_seconds() * 1000)

async with docket.redis() as redis:
acquired = await redis.set(debounce_key, 1, nx=True, px=window_ms)

if not acquired:
raise DebounceBlocked(execution, debounce_key, self.window)

return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
pass
2 changes: 1 addition & 1 deletion src/docket/dependencies/_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_single_dependency_parameter_of_type(
for _, dependencies in get_annotation_dependencies(function).items():
for dependency in dependencies:
if isinstance(dependency, dependency_type):
return dependency # type: ignore[return-value]
return dependency
return None


Expand Down
27 changes: 18 additions & 9 deletions src/docket/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,15 +528,24 @@ async def process_completed_tasks() -> None:
await task
await ack_message(redis, message_id)
except AdmissionBlocked as e:
logger.debug(
"🔒 Task %s blocked by admission control, rescheduling",
e.execution.key,
extra=log_context,
)
e.execution.when = (
datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY
)
await e.execution.schedule(reschedule_message=message_id)
if e.reschedule:
logger.debug(
"⏳ Task %s blocked by admission control, rescheduling",
e.execution.key,
extra=log_context,
)
e.execution.when = (
datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY
)
await e.execution.schedule(reschedule_message=message_id)
else:
logger.debug(
"⏭ Task %s blocked by admission control, dropping",
e.execution.key,
extra=log_context,
)
await e.execution.mark_as_cancelled()
await ack_message(redis, message_id)

async def ack_message(redis: Redis, message_id: RedisMessageID) -> None:
logger.debug("Acknowledging message", extra=log_context)
Expand Down
Loading