Skip to content

Commit cd38e0e

Browse files
chrisguidryclaude
andcommitted
Document Debounce, Cooldown, and update ConcurrencyLimit to Annotated style
Adds docs for the new Debounce and Cooldown admission controls, and rewrites the ConcurrencyLimit section to use the Annotated style as the primary approach (with a backward-compat note for the old string-name style). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1b08852 commit cd38e0e

File tree

2 files changed

+144
-82
lines changed

2 files changed

+144
-82
lines changed

docs/getting-started.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ You now know the core concepts: creating dockets, scheduling work with idempoten
244244

245245
Ready for more? Check out:
246246

247-
- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, and concurrency control
247+
- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, concurrency control, debounce, and cooldown
248248
- **[Dependency Injection](dependency-injection.md)** - Access current docket, custom dependencies, shared resources
249249
- **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests
250250
- **[Task Design Patterns](task-patterns.md)** - Find & flood, task scattering, logging, and task chains

docs/task-behaviors.md

Lines changed: 143 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -305,18 +305,18 @@ For more details on progress monitoring patterns and real-time observation, see
305305

306306
## Concurrency Control
307307

308-
Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections.
308+
Docket provides fine-grained concurrency control that limits how many tasks can run at the same time, based on specific argument values. This is useful for protecting shared resources, preventing overwhelming external services, and managing database connections.
309309

310-
### Basic Concurrency Limits
310+
### Per-Argument Concurrency
311311

312-
Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments:
312+
Use `Annotated` to limit concurrency based on a specific argument value. Each distinct value gets its own independent limit:
313313

314314
```python
315+
from typing import Annotated
315316
from docket import ConcurrencyLimit
316317

317318
async def process_customer_data(
318-
customer_id: int,
319-
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
319+
customer_id: Annotated[int, ConcurrencyLimit(1)],
320320
) -> None:
321321
# Only one task per customer_id can run at a time
322322
await update_customer_profile(customer_id)
@@ -325,11 +325,22 @@ async def process_customer_data(
325325
# These will run sequentially for the same customer
326326
await docket.add(process_customer_data)(customer_id=1001)
327327
await docket.add(process_customer_data)(customer_id=1001)
328-
await docket.add(process_customer_data)(customer_id=1001)
329328

330329
# But different customers can run concurrently
331330
await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel
332-
await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel
331+
```
332+
333+
### Per-Task Concurrency
334+
335+
Use a default parameter to limit the total number of concurrent executions of a task, regardless of arguments:
336+
337+
```python
338+
async def expensive_computation(
339+
input_data: str,
340+
concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3),
341+
) -> None:
342+
# At most 3 of these tasks can run at once across all arguments
343+
await run_computation(input_data)
333344
```
334345

335346
### Database Connection Pooling
@@ -338,9 +349,8 @@ Limit concurrent database operations to prevent overwhelming your database:
338349

339350
```python
340351
async def backup_database_table(
341-
db_name: str,
352+
db_name: Annotated[str, ConcurrencyLimit(2)],
342353
table_name: str,
343-
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2)
344354
) -> None:
345355
# Maximum 2 backup operations per database at once
346356
await create_table_backup(db_name, table_name)
@@ -360,8 +370,7 @@ Protect external APIs from being overwhelmed:
360370
```python
361371
async def sync_user_with_external_service(
362372
user_id: int,
363-
service_name: str,
364-
concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5)
373+
service_name: Annotated[str, ConcurrencyLimit(5)],
365374
) -> None:
366375
# Limit to 5 concurrent API calls per external service
367376
api_client = get_api_client(service_name)
@@ -374,84 +383,22 @@ await docket.add(sync_user_with_external_service)(456, "salesforce") # Will que
374383
await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel
375384
```
376385

377-
### File Processing Limits
378-
379-
Control concurrent file operations to manage disk I/O:
380-
381-
```python
382-
async def process_media_file(
383-
file_path: str,
384-
operation_type: str,
385-
concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3)
386-
) -> None:
387-
# Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes)
388-
if operation_type == "video_transcode":
389-
await transcode_video(file_path)
390-
elif operation_type == "image_resize":
391-
await resize_image(file_path)
392-
elif operation_type == "audio_compress":
393-
await compress_audio(file_path)
394-
395-
# Different operation types can run concurrently, but each type is limited
396-
await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode")
397-
await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode")
398-
await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel
399-
```
400-
401386
### Custom Scopes
402387

403388
Use custom scopes to create independent concurrency limits:
404389

405390
```python
406391
async def process_tenant_data(
407-
tenant_id: str,
392+
tenant_id: Annotated[str, ConcurrencyLimit(2, scope="tenant_operations")],
408393
operation: str,
409-
concurrency: ConcurrencyLimit = ConcurrencyLimit(
410-
"tenant_id",
411-
max_concurrent=2,
412-
scope="tenant_operations"
413-
)
414394
) -> None:
415395
# Each tenant can have up to 2 concurrent operations
416396
await perform_tenant_operation(tenant_id, operation)
417-
418-
async def process_global_data(
419-
data_type: str,
420-
concurrency: ConcurrencyLimit = ConcurrencyLimit(
421-
"data_type",
422-
max_concurrent=1,
423-
scope="global_operations" # Separate from tenant operations
424-
)
425-
) -> None:
426-
# Global operations have their own concurrency limits
427-
await process_global_data_type(data_type)
428397
```
429398

430-
### Multi-Level Concurrency
431-
432-
Combine multiple concurrency controls for complex scenarios:
433-
434-
```python
435-
async def process_user_export(
436-
user_id: int,
437-
export_type: str,
438-
region: str,
439-
user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1),
440-
type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3),
441-
region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10)
442-
) -> None:
443-
# This task respects ALL concurrency limits:
444-
# - Only 1 export per user at a time
445-
# - Only 3 exports of each type globally
446-
# - Only 10 exports per region
447-
await generate_user_export(user_id, export_type, region)
448-
```
449-
450-
**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start.
451-
452399
### Monitoring Concurrency
453400

454-
Concurrency limits are enforced using Redis sets, so you can monitor them:
401+
Concurrency limits are enforced using Redis sorted sets, so you can monitor them:
455402

456403
```python
457404
async def monitor_concurrency_usage() -> None:
@@ -467,14 +414,129 @@ async def monitor_concurrency_usage() -> None:
467414
print(f"{key}: {count} active tasks")
468415
```
469416

470-
### Tips
417+
!!! note "Legacy default-parameter style"
418+
Prior to 0.18, `ConcurrencyLimit` required passing the argument name as a
419+
string: `ConcurrencyLimit("customer_id", max_concurrent=1)`. This style
420+
still works but `Annotated` is preferred — it avoids the string-name
421+
duplication and is consistent with Debounce, Cooldown, and other
422+
dependencies.
423+
424+
## Debounce
425+
426+
Debounce is a leading-edge admission control: if a task was recently started, duplicate submissions within the window are silently dropped. This is useful for deduplicating rapid-fire events like webhooks, where the same event may arrive multiple times in quick succession.
427+
428+
### Per-Task Debounce
429+
430+
Apply debounce to the whole task so only one execution can start within the window:
431+
432+
```python
433+
from datetime import timedelta
434+
from docket import Debounce
435+
436+
async def process_webhooks(
437+
debounce: Debounce = Debounce(timedelta(seconds=30)),
438+
) -> None:
439+
events = await fetch_pending_webhook_events()
440+
await process_events(events)
441+
442+
# First call starts immediately and sets a 30-second window
443+
await docket.add(process_webhooks)()
444+
445+
# This one arrives 5 seconds later — silently dropped
446+
await docket.add(process_webhooks)()
447+
```
448+
449+
### Per-Parameter Debounce
450+
451+
Use `Annotated` to debounce based on a specific argument value. Different values get independent windows:
452+
453+
```python
454+
from typing import Annotated
455+
456+
async def sync_customer(
457+
customer_id: Annotated[int, Debounce(timedelta(seconds=30))],
458+
) -> None:
459+
await refresh_customer_data(customer_id)
460+
461+
# First sync for customer 1001 starts immediately
462+
await docket.add(sync_customer)(customer_id=1001)
463+
464+
# Duplicate for 1001 within 30s — dropped
465+
await docket.add(sync_customer)(customer_id=1001)
466+
467+
# Different customer — runs immediately
468+
await docket.add(sync_customer)(customer_id=2002)
469+
```
471470

472-
1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint).
471+
### How It Works
473472

474-
2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints.
473+
On entry, debounce atomically sets a Redis key with a TTL equal to the window (`SET key 1 NX PX window_ms`). If the key already exists, the task is dropped without rescheduling. The key expires naturally after the window, so no cleanup is needed.
475474

476-
3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts.
475+
## Cooldown
476+
477+
Cooldown is a trailing-edge admission control: if a task recently _succeeded_, new submissions are dropped. Unlike debounce, the cooldown window only starts after a successful execution — failed tasks don't trigger it, so they can be retried immediately.
478+
479+
### Per-Task Cooldown
480+
481+
```python
482+
from datetime import timedelta
483+
from docket import Cooldown
484+
485+
async def send_daily_digest(
486+
cooldown: Cooldown = Cooldown(timedelta(minutes=30)),
487+
) -> None:
488+
digest = await build_digest()
489+
await send_email(digest)
490+
491+
# Runs and succeeds — starts a 30-minute cooldown
492+
await docket.add(send_daily_digest)()
493+
494+
# Within the cooldown window — silently dropped
495+
await docket.add(send_daily_digest)()
496+
```
477497

478-
4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays.
498+
If `send_daily_digest` raises an exception, no cooldown key is set and the task can be submitted again immediately.
499+
500+
### Per-Parameter Cooldown
501+
502+
```python
503+
from typing import Annotated
504+
505+
async def send_notification(
506+
customer_id: Annotated[int, Cooldown(timedelta(minutes=5))],
507+
) -> None:
508+
await deliver_notification(customer_id)
509+
510+
# Notification for customer 1001 — sends and starts cooldown
511+
await docket.add(send_notification)(customer_id=1001)
512+
513+
# Another for 1001 within 5 minutes — dropped
514+
await docket.add(send_notification)(customer_id=1001)
515+
516+
# Different customer — sends immediately
517+
await docket.add(send_notification)(customer_id=2002)
518+
```
519+
520+
### Debounce vs. Cooldown
521+
522+
| | Debounce | Cooldown |
523+
|---|---|---|
524+
| **When does the window start?** | When the task _starts_ | When the task _succeeds_ |
525+
| **Failed tasks** | Still block the window | Don't trigger cooldown |
526+
| **Good for** | Deduplicating incoming events | Rate-limiting outgoing side effects |
527+
528+
### Combining with Other Controls
529+
530+
Debounce, cooldown, and concurrency limits can all coexist on the same task:
531+
532+
```python
533+
from typing import Annotated
534+
535+
async def process_order(
536+
order_id: Annotated[int, ConcurrencyLimit(1)],
537+
cooldown: Cooldown = Cooldown(timedelta(seconds=60)),
538+
) -> None:
539+
await finalize_order(order_id)
540+
```
479541

480-
5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively.
542+
Each admission control is checked independently. A task must satisfy all of them to start.

0 commit comments

Comments
 (0)