Skip to content

Commit 58a9bf5

Browse files
chrisguidryclaude
andauthored
Debounce and Cooldown admission control dependencies (#355)
## Summary Reworked the Debounce and Cooldown semantics from #322 after realizing what we originally called "Debounce" was really a cooldown, and we were missing true debounce (wait for things to settle, then fire). **Cooldown** (formerly Debounce) — execute first, drop duplicates within a window. Sets a Redis key on entry with TTL (`SET NX PX`). Blocked tasks are silently dropped. **Debounce** (new) — wait for submissions to settle, then fire once. Uses a Lua script with two Redis keys (winner + last_seen). The first submission becomes the "winner" and gets rescheduled for the settle window. Subsequent submissions reset the timer and are dropped. Once the settle window passes with no new activity, the winner proceeds. Also adds a `retry_delay` field to `AdmissionBlocked` so Debounce can tell the worker exactly how long to wait before rescheduling (the remaining settle time), rather than using the fixed default delay. The old success-anchored Cooldown (check on entry, set on successful exit) is removed — you can get the same effect with Cooldown + Retry. Closes #322, closes #161. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7180b79 commit 58a9bf5

File tree

12 files changed

+703
-94
lines changed

12 files changed

+703
-94
lines changed

docs/getting-started.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ You now know the core concepts: creating dockets, scheduling work with idempoten
244244

245245
Ready for more? Check out:
246246

247-
- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, and concurrency control
247+
- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, concurrency control, debounce, and cooldown
248248
- **[Dependency Injection](dependency-injection.md)** - Access current docket, custom dependencies, shared resources
249249
- **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests
250250
- **[Task Design Patterns](task-patterns.md)** - Find & flood, task scattering, logging, and task chains

docs/task-behaviors.md

Lines changed: 164 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -305,18 +305,18 @@ For more details on progress monitoring patterns and real-time observation, see
305305

306306
## Concurrency Control
307307

308-
Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections.
308+
Docket provides fine-grained concurrency control that limits how many tasks can run at the same time, based on specific argument values. This is useful for protecting shared resources, preventing overwhelming external services, and managing database connections.
309309

310-
### Basic Concurrency Limits
310+
### Per-Argument Concurrency
311311

312-
Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments:
312+
Annotate a parameter with `ConcurrencyLimit` to limit concurrency based on its value. Each distinct value gets its own independent limit:
313313

314314
```python
315+
from typing import Annotated
315316
from docket import ConcurrencyLimit
316317

317318
async def process_customer_data(
318-
customer_id: int,
319-
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
319+
customer_id: Annotated[int, ConcurrencyLimit(1)],
320320
) -> None:
321321
# Only one task per customer_id can run at a time
322322
await update_customer_profile(customer_id)
@@ -325,11 +325,22 @@ async def process_customer_data(
325325
# These will run sequentially for the same customer
326326
await docket.add(process_customer_data)(customer_id=1001)
327327
await docket.add(process_customer_data)(customer_id=1001)
328-
await docket.add(process_customer_data)(customer_id=1001)
329328

330329
# But different customers can run concurrently
331330
await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel
332-
await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel
331+
```
332+
333+
### Per-Task Concurrency
334+
335+
Use a default parameter to limit the total number of concurrent executions of a task, regardless of arguments:
336+
337+
```python
338+
async def expensive_computation(
339+
input_data: str,
340+
concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3),
341+
) -> None:
342+
# At most 3 of these tasks can run at once across all arguments
343+
await run_computation(input_data)
333344
```
334345

335346
### Database Connection Pooling
@@ -338,9 +349,8 @@ Limit concurrent database operations to prevent overwhelming your database:
338349

339350
```python
340351
async def backup_database_table(
341-
db_name: str,
352+
db_name: Annotated[str, ConcurrencyLimit(2)],
342353
table_name: str,
343-
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2)
344354
) -> None:
345355
# Maximum 2 backup operations per database at once
346356
await create_table_backup(db_name, table_name)
@@ -360,8 +370,7 @@ Protect external APIs from being overwhelmed:
360370
```python
361371
async def sync_user_with_external_service(
362372
user_id: int,
363-
service_name: str,
364-
concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5)
373+
service_name: Annotated[str, ConcurrencyLimit(5)],
365374
) -> None:
366375
# Limit to 5 concurrent API calls per external service
367376
api_client = get_api_client(service_name)
@@ -374,84 +383,22 @@ await docket.add(sync_user_with_external_service)(456, "salesforce") # Will que
374383
await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel
375384
```
376385

377-
### File Processing Limits
378-
379-
Control concurrent file operations to manage disk I/O:
380-
381-
```python
382-
async def process_media_file(
383-
file_path: str,
384-
operation_type: str,
385-
concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3)
386-
) -> None:
387-
# Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes)
388-
if operation_type == "video_transcode":
389-
await transcode_video(file_path)
390-
elif operation_type == "image_resize":
391-
await resize_image(file_path)
392-
elif operation_type == "audio_compress":
393-
await compress_audio(file_path)
394-
395-
# Different operation types can run concurrently, but each type is limited
396-
await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode")
397-
await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode")
398-
await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel
399-
```
400-
401386
### Custom Scopes
402387

403388
Use custom scopes to create independent concurrency limits:
404389

405390
```python
406391
async def process_tenant_data(
407-
tenant_id: str,
392+
tenant_id: Annotated[str, ConcurrencyLimit(2, scope="tenant_operations")],
408393
operation: str,
409-
concurrency: ConcurrencyLimit = ConcurrencyLimit(
410-
"tenant_id",
411-
max_concurrent=2,
412-
scope="tenant_operations"
413-
)
414394
) -> None:
415395
# Each tenant can have up to 2 concurrent operations
416396
await perform_tenant_operation(tenant_id, operation)
417-
418-
async def process_global_data(
419-
data_type: str,
420-
concurrency: ConcurrencyLimit = ConcurrencyLimit(
421-
"data_type",
422-
max_concurrent=1,
423-
scope="global_operations" # Separate from tenant operations
424-
)
425-
) -> None:
426-
# Global operations have their own concurrency limits
427-
await process_global_data_type(data_type)
428397
```
429398

430-
### Multi-Level Concurrency
431-
432-
Combine multiple concurrency controls for complex scenarios:
433-
434-
```python
435-
async def process_user_export(
436-
user_id: int,
437-
export_type: str,
438-
region: str,
439-
user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1),
440-
type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3),
441-
region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10)
442-
) -> None:
443-
# This task respects ALL concurrency limits:
444-
# - Only 1 export per user at a time
445-
# - Only 3 exports of each type globally
446-
# - Only 10 exports per region
447-
await generate_user_export(user_id, export_type, region)
448-
```
449-
450-
**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start.
451-
452399
### Monitoring Concurrency
453400

454-
Concurrency limits are enforced using Redis sets, so you can monitor them:
401+
Concurrency limits are enforced using Redis sorted sets, so you can monitor them:
455402

456403
```python
457404
async def monitor_concurrency_usage() -> None:
@@ -467,14 +414,150 @@ async def monitor_concurrency_usage() -> None:
467414
print(f"{key}: {count} active tasks")
468415
```
469416

470-
### Tips
417+
!!! note "Legacy default-parameter style"
418+
Prior to 0.18, `ConcurrencyLimit` required passing the argument name as a
419+
string: `ConcurrencyLimit("customer_id", max_concurrent=1)`. This style
420+
still works but `Annotated` is preferred — it avoids the string-name
421+
duplication and is consistent with Debounce, Cooldown, and other
422+
dependencies.
423+
424+
## Cooldown
425+
426+
Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's quietly dropped with an INFO-level log.
427+
428+
### Per-Task Cooldown
429+
430+
```python
431+
from datetime import timedelta
432+
from docket import Cooldown
433+
434+
async def process_webhooks(
435+
cooldown: Cooldown = Cooldown(timedelta(seconds=30)),
436+
) -> None:
437+
events = await fetch_pending_webhook_events()
438+
await process_events(events)
439+
440+
# First call starts immediately and sets a 30-second window
441+
await docket.add(process_webhooks)()
442+
443+
# This one arrives 5 seconds later — quietly dropped
444+
await docket.add(process_webhooks)()
445+
```
446+
447+
### Per-Parameter Cooldown
448+
449+
Annotate a parameter with `Cooldown` to apply independent windows per value:
450+
451+
```python
452+
from typing import Annotated
453+
454+
async def sync_customer(
455+
customer_id: Annotated[int, Cooldown(timedelta(seconds=30))],
456+
) -> None:
457+
await refresh_customer_data(customer_id)
458+
459+
# First sync for customer 1001 starts immediately
460+
await docket.add(sync_customer)(customer_id=1001)
461+
462+
# Duplicate for 1001 within 30s — dropped
463+
await docket.add(sync_customer)(customer_id=1001)
464+
465+
# Different customer — runs immediately
466+
await docket.add(sync_customer)(customer_id=2002)
467+
```
468+
469+
## Debounce
471470

472-
1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint).
471+
Debounce waits for submissions to settle before firing. When rapid-fire events arrive, only one task runs — after a quiet period equal to the settle window. This is the classic "trailing-edge" debounce: keep resetting the timer on each new event, then fire once things calm down.
472+
473+
### Per-Task Debounce
474+
475+
```python
476+
from datetime import timedelta
477+
from docket import Debounce
478+
479+
async def process_webhooks(
480+
debounce: Debounce = Debounce(timedelta(seconds=5)),
481+
) -> None:
482+
events = await fetch_pending_webhook_events()
483+
await process_events(events)
473484

474-
2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints.
485+
# First submission becomes the "winner" and gets rescheduled
486+
await docket.add(process_webhooks)()
475487

476-
3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts.
488+
# More events arrive — they reset the settle timer but are dropped
489+
await docket.add(process_webhooks)()
490+
await docket.add(process_webhooks)()
491+
492+
# After 5 seconds of quiet, the winner proceeds
493+
```
494+
495+
### Per-Parameter Debounce
496+
497+
Annotate a parameter with `Debounce` to get independent settle windows per value:
498+
499+
```python
500+
from typing import Annotated
501+
502+
async def sync_customer(
503+
customer_id: Annotated[int, Debounce(timedelta(seconds=5))],
504+
) -> None:
505+
await refresh_customer_data(customer_id)
506+
507+
# Each customer_id gets its own independent settle window
508+
await docket.add(sync_customer)(customer_id=1001)
509+
await docket.add(sync_customer)(customer_id=1001) # resets 1001's timer
510+
await docket.add(sync_customer)(customer_id=2002) # independent window
511+
```
477512

478-
4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays.
513+
### Debounce vs. Cooldown
514+
515+
| | Cooldown | Debounce |
516+
|---|---|---|
517+
| **Behavior** | Execute first, drop duplicates | Wait for quiet, then execute |
518+
| **Window anchored to** | First execution | Last submission |
519+
| **Good for** | Deduplicating rapid-fire events | Batching bursts into one action |
520+
521+
### Multiple Cooldowns
522+
523+
You can annotate multiple parameters with `Cooldown` on the same task. Each gets its own independent window scoped to that parameter's value. A task must pass *all* of its cooldown checks to start — if any one blocks, the task is dropped:
524+
525+
```python
526+
from typing import Annotated
527+
528+
async def sync_data(
529+
customer_id: Annotated[int, Cooldown(timedelta(seconds=30))],
530+
region: Annotated[str, Cooldown(timedelta(seconds=60))],
531+
) -> None:
532+
await refresh_data(customer_id, region)
533+
534+
# Runs immediately — both windows are clear
535+
await docket.add(sync_data)(customer_id=1, region="us")
536+
537+
# Blocked — customer_id=1 is still in cooldown
538+
await docket.add(sync_data)(customer_id=1, region="eu")
539+
540+
# Blocked — region="us" is still in cooldown
541+
await docket.add(sync_data)(customer_id=2, region="us")
542+
543+
# Runs — both customer_id=2 and region="eu" are clear
544+
await docket.add(sync_data)(customer_id=2, region="eu")
545+
```
546+
547+
Only one `Debounce` is allowed per task — its reschedule mechanism requires a single settle window.
548+
549+
### Combining with Other Controls
550+
551+
Debounce, cooldown, and concurrency limits can all coexist on the same task:
552+
553+
```python
554+
from typing import Annotated
555+
556+
async def process_order(
557+
order_id: Annotated[int, ConcurrencyLimit(1)],
558+
cooldown: Cooldown = Cooldown(timedelta(seconds=60)),
559+
) -> None:
560+
await finalize_order(order_id)
561+
```
479562

480-
5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively.
563+
Each admission control is checked independently. A task must satisfy all of them to start.

loq.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ max_lines = 750
1010
# Source files that still need exceptions above 750
1111
[[rules]]
1212
path = "src/docket/worker.py"
13-
max_lines = 1141
13+
max_lines = 1150
1414

1515
[[rules]]
1616
path = "src/docket/cli/__init__.py"

src/docket/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
from .annotations import Logged
1313
from .dependencies import (
1414
ConcurrencyLimit,
15+
Cooldown,
1516
Cron,
17+
Debounce,
1618
CurrentDocket,
1719
CurrentExecution,
1820
CurrentWorker,
@@ -37,7 +39,9 @@
3739
"__version__",
3840
"Agenda",
3941
"ConcurrencyLimit",
42+
"Cooldown",
4043
"Cron",
44+
"Debounce",
4145
"CurrentDocket",
4246
"CurrentExecution",
4347
"CurrentWorker",

src/docket/dependencies/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
format_duration,
2020
)
2121
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
22+
from ._cooldown import Cooldown
23+
from ._debounce import Debounce
2224
from ._cron import Cron
2325
from ._contextual import (
2426
CurrentDocket,
@@ -83,6 +85,8 @@
8385
"AdmissionBlocked",
8486
"ConcurrencyBlocked",
8587
"ConcurrencyLimit",
88+
"Cooldown",
89+
"Debounce",
8690
"Cron",
8791
"Perpetual",
8892
"Progress",

src/docket/dependencies/_base.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,27 @@ class AdmissionBlocked(Exception):
5555
5656
This is the base exception for admission control mechanisms like
5757
concurrency limits, rate limits, or health gates.
58+
59+
When ``reschedule`` is True (default), the worker re-queues the task
60+
with a short delay. When False, the task is quietly acknowledged
61+
and dropped with an INFO-level log (appropriate for debounce/cooldown
62+
where re-trying would just hit the same window).
63+
64+
``retry_delay`` overrides the default reschedule delay when set.
5865
"""
5966

60-
def __init__(self, execution: Execution, reason: str = "admission control"):
67+
def __init__(
68+
self,
69+
execution: Execution,
70+
reason: str = "admission control",
71+
*,
72+
reschedule: bool = True,
73+
retry_delay: timedelta | None = None,
74+
):
6175
self.execution = execution
6276
self.reason = reason
77+
self.reschedule = reschedule
78+
self.retry_delay = retry_delay
6379
super().__init__(f"Task {execution.key} blocked by {reason}")
6480

6581

0 commit comments

Comments
 (0)