Skip to content

Commit d2fa250

Browse files
chrisguidryclaude
andauthored
RateLimit admission control dependency (#356)
Adds `RateLimit(limit, per=timedelta)` — caps how many times a task (or a per-parameter scope) can execute within a sliding window. Uses a single Redis sorted set per scope with a Lua script that atomically prunes old entries, counts remaining, and either records the execution or computes `retry_after` from the oldest entry. By default, excess tasks are rescheduled to exactly when a slot opens. `drop=True` quietly drops them instead (like Cooldown). Works both as a default parameter and as `Annotated` metadata for per-parameter scoping — same patterns as Cooldown and Debounce. Closes #161. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 58a9bf5 commit d2fa250

File tree

5 files changed

+469
-0
lines changed

5 files changed

+469
-0
lines changed

docs/task-behaviors.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,66 @@ await docket.add(sync_data)(customer_id=2, region="eu")
546546

547547
Only one `Debounce` is allowed per task — its reschedule mechanism requires a single settle window.
548548

549+
## Rate Limiting
550+
551+
Rate limiting caps how many times a task can execute within a sliding time window. Unlike cooldown (which drops duplicates) or debounce (which waits for quiet), rate limiting counts executions and blocks when the count exceeds a threshold.
552+
553+
By default, excess tasks are rescheduled to exactly when a slot opens. With `drop=True`, they're quietly dropped instead.
554+
555+
### Per-Task Rate Limit
556+
557+
```python
558+
from datetime import timedelta
559+
from docket import RateLimit
560+
561+
async def sync_data(
562+
rate: RateLimit = RateLimit(10, per=timedelta(minutes=1)),
563+
) -> None:
564+
await perform_sync()
565+
566+
# The first 10 calls within a minute execute immediately.
567+
# The 11th is rescheduled to when the oldest slot frees up.
568+
```
569+
570+
### Per-Parameter Rate Limit
571+
572+
Annotate a parameter with `RateLimit` to apply independent limits per value:
573+
574+
```python
575+
from typing import Annotated
576+
577+
async def process_customer(
578+
customer_id: Annotated[int, RateLimit(5, per=timedelta(minutes=1))],
579+
) -> None:
580+
await refresh_customer_data(customer_id)
581+
582+
# Each customer_id gets its own independent sliding window.
583+
# Customer 1001 can hit 5/min while customer 2002 independently hits 5/min.
584+
```
585+
586+
### Dropping Excess Tasks
587+
588+
When rescheduling isn't appropriate, use `drop=True` to silently discard excess tasks:
589+
590+
```python
591+
async def fire_webhook(
592+
endpoint: Annotated[str, RateLimit(100, per=timedelta(hours=1), drop=True)],
593+
) -> None:
594+
await send_webhook(endpoint)
595+
596+
# After 100 webhook calls to the same endpoint in an hour,
597+
# additional calls are dropped with an INFO log.
598+
```
599+
600+
### Rate Limit vs. Cooldown vs. Debounce
601+
602+
| | RateLimit | Cooldown | Debounce |
603+
|---|---|---|---|
604+
| **Behavior** | Allow N per window | Execute first, drop rest | Wait for quiet, then execute |
605+
| **Window anchored to** | Sliding (each execution) | First execution | Last submission |
606+
| **Over-limit default** | Reschedule | Drop | Drop (losers) / Reschedule (winner) |
607+
| **Good for** | Enforcing throughput caps | Deduplicating rapid-fire | Batching bursts into one action |
608+
549609
### Combining with Other Controls
550610

551611
Debounce, cooldown, and concurrency limits can all coexist on the same task:

src/docket/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Cooldown,
1616
Cron,
1717
Debounce,
18+
RateLimit,
1819
CurrentDocket,
1920
CurrentExecution,
2021
CurrentWorker,
@@ -42,6 +43,7 @@
4243
"Cooldown",
4344
"Cron",
4445
"Debounce",
46+
"RateLimit",
4547
"CurrentDocket",
4648
"CurrentExecution",
4749
"CurrentWorker",

src/docket/dependencies/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
2222
from ._cooldown import Cooldown
2323
from ._debounce import Debounce
24+
from ._ratelimit import RateLimit
2425
from ._cron import Cron
2526
from ._contextual import (
2627
CurrentDocket,
@@ -87,6 +88,7 @@
8788
"ConcurrencyLimit",
8889
"Cooldown",
8990
"Debounce",
91+
"RateLimit",
9092
"Cron",
9193
"Perpetual",
9294
"Progress",
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""Rate limit admission control dependency.
2+
3+
Caps how many times a task (or a per-parameter scope) can execute within a
4+
sliding window. Uses a Redis sorted set as a sliding window log: members are
5+
``{execution_key}:{now_ms}`` strings (unique per attempt), scores are
6+
millisecond timestamps.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import time
12+
from datetime import timedelta
13+
from types import TracebackType
14+
from typing import Any
15+
16+
from ._base import AdmissionBlocked, Dependency, current_docket, current_execution
17+
18+
# Lua script for atomic sliding-window rate limit check.
19+
#
20+
# KEYS[1] = sorted set key (one per scope)
21+
# ARGV[1] = member (execution key + timestamp, unique per attempt)
22+
# ARGV[2] = current time in milliseconds
23+
# ARGV[3] = window size in milliseconds
24+
# ARGV[4] = max allowed count (limit)
25+
# ARGV[5] = key TTL in milliseconds (window * 2, safety net)
26+
#
27+
# Returns: {action, retry_after_ms}
28+
# action: 1=PROCEED, 2=BLOCKED
29+
# retry_after_ms: ms until the oldest entry expires (only for BLOCKED)
30+
_RATELIMIT_LUA = """
31+
local key = KEYS[1]
32+
local member = ARGV[1]
33+
local now_ms = tonumber(ARGV[2])
34+
local window_ms = tonumber(ARGV[3])
35+
local limit = tonumber(ARGV[4])
36+
local ttl_ms = tonumber(ARGV[5])
37+
38+
-- Prune entries older than the window
39+
local cutoff = now_ms - window_ms
40+
redis.call('ZREMRANGEBYSCORE', key, '-inf', cutoff)
41+
42+
-- Count remaining entries
43+
local count = redis.call('ZCARD', key)
44+
45+
if count < limit then
46+
-- Under limit: record this execution and set safety TTL
47+
redis.call('ZADD', key, now_ms, member)
48+
redis.call('PEXPIRE', key, ttl_ms)
49+
return {1, 0}
50+
end
51+
52+
-- Over limit: compute when the oldest entry will expire
53+
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
54+
local oldest_score = tonumber(oldest[2])
55+
local retry_after = oldest_score + window_ms - now_ms
56+
if retry_after < 1 then
57+
retry_after = 1
58+
end
59+
return {2, retry_after}
60+
"""
61+
62+
_ACTION_PROCEED = 1
63+
_ACTION_BLOCKED = 2
64+
65+
66+
class RateLimit(Dependency["RateLimit"]):
67+
"""Cap executions within a sliding time window.
68+
69+
Uses a Redis sorted set as a sliding window log. Each execution adds
70+
an entry; entries older than the window are pruned atomically.
71+
72+
When the limit is reached:
73+
- ``drop=False`` (default): the task is rescheduled to when a slot opens.
74+
- ``drop=True``: the task is quietly dropped.
75+
76+
Works both as a default parameter and as ``Annotated`` metadata::
77+
78+
# Per-task: max 10 per minute, excess rescheduled
79+
async def sync_data(
80+
rate: RateLimit = RateLimit(10, per=timedelta(minutes=1)),
81+
) -> None: ...
82+
83+
# Per-parameter: max 5 per minute per customer, excess dropped
84+
async def process_customer(
85+
customer_id: Annotated[int, RateLimit(5, per=timedelta(minutes=1), drop=True)],
86+
) -> None: ...
87+
"""
88+
89+
def __init__(
90+
self,
91+
limit: int,
92+
*,
93+
per: timedelta,
94+
drop: bool = False,
95+
scope: str | None = None,
96+
) -> None:
97+
self.limit = limit
98+
self.per = per
99+
self.drop = drop
100+
self.scope = scope
101+
self._argument_name: str | None = None
102+
self._argument_value: Any = None
103+
self._ratelimit_key: str | None = None
104+
self._member: str | None = None
105+
106+
def bind_to_parameter(self, name: str, value: Any) -> RateLimit:
107+
bound = RateLimit(self.limit, per=self.per, drop=self.drop, scope=self.scope)
108+
bound._argument_name = name
109+
bound._argument_value = value
110+
return bound
111+
112+
async def __aenter__(self) -> RateLimit:
113+
execution = current_execution.get()
114+
docket = current_docket.get()
115+
116+
scope = self.scope or docket.name
117+
if self._argument_name is not None:
118+
ratelimit_key = (
119+
f"{scope}:ratelimit:{self._argument_name}:{self._argument_value}"
120+
)
121+
else:
122+
ratelimit_key = f"{scope}:ratelimit:{execution.function_name}"
123+
124+
window_ms = int(self.per.total_seconds() * 1000)
125+
now_ms = int(time.time() * 1000)
126+
ttl_ms = window_ms * 2
127+
member = f"{execution.key}:{now_ms}"
128+
129+
async with docket.redis() as redis:
130+
script = redis.register_script(_RATELIMIT_LUA)
131+
result: list[int] = await script(
132+
keys=[ratelimit_key],
133+
args=[member, now_ms, window_ms, self.limit, ttl_ms],
134+
)
135+
136+
action = result[0]
137+
retry_after_ms = result[1]
138+
139+
if action == _ACTION_PROCEED:
140+
self._ratelimit_key = ratelimit_key
141+
self._member = member
142+
return self
143+
144+
reason = f"rate limit ({self.limit}/{self.per}) on {ratelimit_key}"
145+
146+
if self.drop:
147+
raise AdmissionBlocked(execution, reason=reason, reschedule=False)
148+
149+
raise AdmissionBlocked(
150+
execution,
151+
reason=reason,
152+
retry_delay=timedelta(milliseconds=retry_after_ms),
153+
)
154+
155+
async def __aexit__(
156+
self,
157+
exc_type: type[BaseException] | None,
158+
exc_value: BaseException | None,
159+
traceback: TracebackType | None,
160+
) -> None:
161+
if exc_type is not None and self._member is not None:
162+
if issubclass(exc_type, AdmissionBlocked):
163+
assert self._ratelimit_key is not None
164+
docket = current_docket.get()
165+
async with docket.redis() as redis:
166+
await redis.zrem(self._ratelimit_key, self._member)

0 commit comments

Comments
 (0)