Skip to content

Commit 3d3312f

Browse files
chrisguidryclaude
andcommitted
Simplify admission control exceptions and relax Cooldown single restriction
Drop CooldownBlocked and DebounceBlocked in favor of raising AdmissionBlocked directly with reschedule/retry_delay as constructor arguments. Less ceremony, same behavior. Cooldown no longer enforces single=True — multiple per-parameter cooldowns on the same task are independent and work fine. Debounce keeps single=True because its reschedule mechanism means multiple debounces would loop forever. Also trims implementation details (Redis key patterns, Lua scripts) from the user-facing docs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4138d29 commit 3d3312f

File tree

7 files changed

+78
-82
lines changed

7 files changed

+78
-82
lines changed

docs/task-behaviors.md

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ async def monitor_concurrency_usage() -> None:
423423

424424
## Cooldown
425425

426-
Cooldown executes the first submission immediately, then drops duplicates within a window. On entry it atomically sets a Redis key with a TTL (`SET key 1 NX PX window_ms`). If the key already exists the task is silently dropped. The key expires naturally after the window.
426+
Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's silently dropped.
427427

428428
### Per-Task Cooldown
429429

@@ -510,14 +510,6 @@ await docket.add(sync_customer)(customer_id=1001) # resets 1001's timer
510510
await docket.add(sync_customer)(customer_id=2002) # independent window
511511
```
512512

513-
### How It Works
514-
515-
Debounce uses two Redis keys per scope — a **winner** key (which task gets to proceed) and a **last_seen** timestamp — managed by an atomic Lua script:
516-
517-
1. **No winner exists** — the task becomes the winner and gets rescheduled for the full settle window.
518-
2. **Winner returns from reschedule** — if enough time has passed since the last submission, it proceeds. Otherwise it reschedules for the remaining time.
519-
3. **Non-winner arrives** — updates the last_seen timestamp (resetting the settle timer) and is immediately dropped.
520-
521513
### Debounce vs. Cooldown
522514

523515
| | Cooldown | Debounce |
@@ -526,6 +518,34 @@ Debounce uses two Redis keys per scope — a **winner** key (which task gets to
526518
| **Window anchored to** | First execution | Last submission |
527519
| **Good for** | Deduplicating rapid-fire events | Batching bursts into one action |
528520

521+
### Multiple Cooldowns
522+
523+
You can annotate multiple parameters with `Cooldown` on the same task. Each gets its own independent window scoped to that parameter's value. A task must pass *all* of its cooldown checks to start — if any one blocks, the task is dropped:
524+
525+
```python
526+
from typing import Annotated
527+
528+
async def sync_data(
529+
customer_id: Annotated[int, Cooldown(timedelta(seconds=30))],
530+
region: Annotated[str, Cooldown(timedelta(seconds=60))],
531+
) -> None:
532+
await refresh_data(customer_id, region)
533+
534+
# Runs immediately — both windows are clear
535+
await docket.add(sync_data)(customer_id=1, region="us")
536+
537+
# Blocked — customer_id=1 is still in cooldown
538+
await docket.add(sync_data)(customer_id=1, region="eu")
539+
540+
# Blocked — region="us" is still in cooldown
541+
await docket.add(sync_data)(customer_id=2, region="us")
542+
543+
# Runs — both customer_id=2 and region="eu" are clear
544+
await docket.add(sync_data)(customer_id=2, region="eu")
545+
```
546+
547+
Only one `Debounce` is allowed per task — its reschedule mechanism requires a single settle window.
548+
529549
### Combining with Other Controls
530550

531551
Debounce, cooldown, and concurrency limits can all coexist on the same task:

src/docket/dependencies/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
format_duration,
2020
)
2121
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
22-
from ._cooldown import Cooldown, CooldownBlocked
23-
from ._debounce import Debounce, DebounceBlocked
22+
from ._cooldown import Cooldown
23+
from ._debounce import Debounce
2424
from ._cron import Cron
2525
from ._contextual import (
2626
CurrentDocket,
@@ -86,9 +86,7 @@
8686
"ConcurrencyBlocked",
8787
"ConcurrencyLimit",
8888
"Cooldown",
89-
"CooldownBlocked",
9089
"Debounce",
91-
"DebounceBlocked",
9290
"Cron",
9391
"Perpetual",
9492
"Progress",

src/docket/dependencies/_base.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,18 @@ class AdmissionBlocked(Exception):
6464
``retry_delay`` overrides the default reschedule delay when set.
6565
"""
6666

67-
reschedule: bool = True
68-
retry_delay: timedelta | None = None
69-
70-
def __init__(self, execution: Execution, reason: str = "admission control"):
67+
def __init__(
68+
self,
69+
execution: Execution,
70+
reason: str = "admission control",
71+
*,
72+
reschedule: bool = True,
73+
retry_delay: timedelta | None = None,
74+
):
7175
self.execution = execution
7276
self.reason = reason
77+
self.reschedule = reschedule
78+
self.retry_delay = retry_delay
7379
super().__init__(f"Task {execution.key} blocked by {reason}")
7480

7581

src/docket/dependencies/_cooldown.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,10 @@
88

99
from datetime import timedelta
1010
from types import TracebackType
11-
from typing import TYPE_CHECKING, Any
11+
from typing import Any
1212

1313
from ._base import AdmissionBlocked, Dependency, current_docket, current_execution
1414

15-
if TYPE_CHECKING: # pragma: no cover
16-
from ..execution import Execution
17-
18-
19-
class CooldownBlocked(AdmissionBlocked):
20-
"""Raised when a task is blocked by cooldown."""
21-
22-
reschedule = False
23-
24-
def __init__(self, execution: Execution, cooldown_key: str, window: timedelta):
25-
self.cooldown_key = cooldown_key
26-
self.window = window
27-
reason = f"cooldown ({window}) on {cooldown_key}"
28-
super().__init__(execution, reason=reason)
29-
3015

3116
class Cooldown(Dependency["Cooldown"]):
3217
"""Execute first, drop duplicates within window.
@@ -47,8 +32,6 @@ async def process_customer(
4732
) -> None: ...
4833
"""
4934

50-
single: bool = True
51-
5235
def __init__(self, window: timedelta, *, scope: str | None = None) -> None:
5336
self.window = window
5437
self.scope = scope
@@ -79,7 +62,11 @@ async def __aenter__(self) -> Cooldown:
7962
acquired = await redis.set(cooldown_key, 1, nx=True, px=window_ms)
8063

8164
if not acquired:
82-
raise CooldownBlocked(execution, cooldown_key, self.window)
65+
raise AdmissionBlocked(
66+
execution,
67+
reason=f"cooldown ({self.window}) on {cooldown_key}",
68+
reschedule=False,
69+
)
8370

8471
return self
8572

src/docket/dependencies/_debounce.py

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@
1010
import time
1111
from datetime import timedelta
1212
from types import TracebackType
13-
from typing import TYPE_CHECKING, Any
13+
from typing import Any
1414

1515
from ._base import AdmissionBlocked, Dependency, current_docket, current_execution
1616

17-
if TYPE_CHECKING: # pragma: no cover
18-
from ..execution import Execution
19-
2017
# Lua script for atomic debounce logic.
2118
#
2219
# KEYS[1] = winner key (holds the execution key of the chosen task)
@@ -76,26 +73,6 @@
7673
_ACTION_DROP = 3
7774

7875

79-
class DebounceBlocked(AdmissionBlocked):
80-
"""Raised when a task is blocked by debounce."""
81-
82-
def __init__(
83-
self,
84-
execution: Execution,
85-
debounce_key: str,
86-
settle: timedelta,
87-
*,
88-
reschedule: bool,
89-
retry_delay: timedelta | None = None,
90-
):
91-
self.debounce_key = debounce_key
92-
self.settle = settle
93-
self.reschedule = reschedule # type: ignore[assignment]
94-
self.retry_delay = retry_delay # type: ignore[assignment]
95-
reason = f"debounce ({settle}) on {debounce_key}"
96-
super().__init__(execution, reason=reason)
97-
98-
9976
class Debounce(Dependency["Debounce"]):
10077
"""Wait for submissions to settle, then fire once.
10178
@@ -160,22 +137,17 @@ async def __aenter__(self) -> Debounce:
160137
if action == _ACTION_PROCEED:
161138
return self
162139

140+
reason = f"debounce ({self.settle}) on {base_key}"
141+
163142
if action == _ACTION_RESCHEDULE:
164-
raise DebounceBlocked(
143+
raise AdmissionBlocked(
165144
execution,
166-
base_key,
167-
self.settle,
168-
reschedule=True,
145+
reason=reason,
169146
retry_delay=timedelta(milliseconds=remaining_ms),
170147
)
171148

172149
# DROP
173-
raise DebounceBlocked(
174-
execution,
175-
base_key,
176-
self.settle,
177-
reschedule=False,
178-
)
150+
raise AdmissionBlocked(execution, reason=reason, reschedule=False)
179151

180152
async def __aexit__(
181153
self,

tests/test_cooldown.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
from datetime import timedelta
77
from typing import Annotated
88

9-
import pytest
10-
119
from docket import ConcurrencyLimit, Docket, Worker
1210
from docket.dependencies import Cooldown
1311

@@ -71,16 +69,31 @@ async def cooled_task(
7169
assert results.count(1) == 1
7270

7371

74-
async def test_cooldown_single_rejects_two(docket: Docket):
75-
"""single=True rejects two Cooldown on the same task."""
76-
with pytest.raises(ValueError, match="Only one Cooldown"):
72+
async def test_multiple_cooldowns_on_different_parameters(
73+
docket: Docket, worker: Worker
74+
):
75+
"""Multiple Cooldown annotations on different parameters are independent."""
76+
results: list[tuple[int, str]] = []
77+
78+
async def task(
79+
customer_id: Annotated[int, Cooldown(timedelta(seconds=5))],
80+
region: Annotated[str, Cooldown(timedelta(seconds=5))],
81+
):
82+
results.append((customer_id, region))
83+
84+
await docket.add(task)(customer_id=1, region="us")
85+
await docket.add(task)(
86+
customer_id=1, region="eu"
87+
) # same customer, different region
88+
await docket.add(task)(
89+
customer_id=2, region="us"
90+
) # different customer, same region
7791

78-
async def task(
79-
a: Annotated[int, Cooldown(timedelta(seconds=1))],
80-
b: Annotated[str, Cooldown(timedelta(seconds=2))],
81-
): ... # pragma: no cover
92+
worker.concurrency = 10
93+
await worker.run_until_finished()
8294

83-
await docket.add(task)(a=1, b="x")
95+
# First call runs. Second is blocked by customer_id=1. Third is blocked by region="us".
96+
assert results == [(1, "us")]
8497

8598

8699
async def test_cooldown_coexists_with_concurrency_limit(docket: Docket, worker: Worker):

tests/test_debounce.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ async def debounced_task(
6666
assert results.count(1) == 1
6767

6868

69-
async def test_debounce_single_rejects_two(docket: Docket):
70-
"""single=True rejects two Debounce on the same task."""
69+
async def test_multiple_debounces_rejected(docket: Docket):
70+
"""Only one Debounce is allowed per task."""
7171
with pytest.raises(ValueError, match="Only one Debounce"):
7272

7373
async def task(

0 commit comments

Comments
 (0)