Skip to content

Commit cfe3345

Browse files
authored
Feature/task concurrency control (#148)
Adds a `ConcurrencyLimit` context manager. Closes #86. NOTE: I haven't tested this!
1 parent 91faaa3 commit cfe3345

File tree

11 files changed

+2098
-87
lines changed

11 files changed

+2098
-87
lines changed

docs/advanced-patterns.md

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,4 +377,182 @@ async def analyze_user_behavior(
377377
)(user_id)
378378
```
379379

380+
## Concurrency Control
381+
382+
Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections.
383+
384+
### Basic Concurrency Limits
385+
386+
Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments:
387+
388+
```python
389+
from docket import ConcurrencyLimit
390+
391+
async def process_customer_data(
392+
customer_id: int,
393+
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
394+
) -> None:
395+
# Only one task per customer_id can run at a time
396+
await update_customer_profile(customer_id)
397+
await recalculate_customer_metrics(customer_id)
398+
399+
# These will run sequentially for the same customer
400+
await docket.add(process_customer_data)(customer_id=1001)
401+
await docket.add(process_customer_data)(customer_id=1001)
402+
await docket.add(process_customer_data)(customer_id=1001)
403+
404+
# But different customers can run concurrently
405+
await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel
406+
await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel
407+
```
408+
409+
### Database Connection Pooling
410+
411+
Limit concurrent database operations to prevent overwhelming your database:
412+
413+
```python
414+
async def backup_database_table(
415+
db_name: str,
416+
table_name: str,
417+
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2)
418+
) -> None:
419+
# Maximum 2 backup operations per database at once
420+
await create_table_backup(db_name, table_name)
421+
await verify_backup_integrity(db_name, table_name)
422+
423+
# Schedule many backup tasks - only 2 per database will run concurrently
424+
tables = ["users", "orders", "products", "analytics", "logs"]
425+
for table in tables:
426+
await docket.add(backup_database_table)("production", table)
427+
await docket.add(backup_database_table)("staging", table)
428+
```
429+
430+
### API Rate Limiting
431+
432+
Protect external APIs from being overwhelmed:
433+
434+
```python
435+
async def sync_user_with_external_service(
436+
user_id: int,
437+
service_name: str,
438+
concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5)
439+
) -> None:
440+
# Limit to 5 concurrent API calls per external service
441+
api_client = get_api_client(service_name)
442+
user_data = await fetch_user_data(user_id)
443+
await api_client.sync_user(user_data)
444+
445+
# These respect per-service limits
446+
await docket.add(sync_user_with_external_service)(123, "salesforce")
447+
await docket.add(sync_user_with_external_service)(456, "salesforce") # Will queue if needed
448+
await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel
449+
```
450+
451+
### File Processing Limits
452+
453+
Control concurrent file operations to manage disk I/O:
454+
455+
```python
456+
async def process_media_file(
457+
file_path: str,
458+
operation_type: str,
459+
concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3)
460+
) -> None:
461+
# Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes)
462+
if operation_type == "video_transcode":
463+
await transcode_video(file_path)
464+
elif operation_type == "image_resize":
465+
await resize_image(file_path)
466+
elif operation_type == "audio_compress":
467+
await compress_audio(file_path)
468+
469+
# Different operation types can run concurrently, but each type is limited
470+
await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode")
471+
await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode")
472+
await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel
473+
```
474+
475+
### Custom Scopes
476+
477+
Use custom scopes to create independent concurrency limits:
478+
479+
```python
480+
async def process_tenant_data(
481+
tenant_id: str,
482+
operation: str,
483+
concurrency: ConcurrencyLimit = ConcurrencyLimit(
484+
"tenant_id",
485+
max_concurrent=2,
486+
scope="tenant_operations"
487+
)
488+
) -> None:
489+
# Each tenant can have up to 2 concurrent operations
490+
await perform_tenant_operation(tenant_id, operation)
491+
492+
async def process_global_data(
493+
data_type: str,
494+
concurrency: ConcurrencyLimit = ConcurrencyLimit(
495+
"data_type",
496+
max_concurrent=1,
497+
scope="global_operations" # Separate from tenant operations
498+
)
499+
) -> None:
500+
# Global operations have their own concurrency limits
501+
await process_global_data_type(data_type)
502+
```
503+
504+
### Multi-Level Concurrency
505+
506+
Combine multiple concurrency controls for complex scenarios:
507+
508+
```python
509+
async def process_user_export(
510+
user_id: int,
511+
export_type: str,
512+
region: str,
513+
user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1),
514+
type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3),
515+
region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10)
516+
) -> None:
517+
# This task respects ALL concurrency limits:
518+
# - Only 1 export per user at a time
519+
# - Only 3 exports of each type globally
520+
# - Only 10 exports per region
521+
await generate_user_export(user_id, export_type, region)
522+
```
523+
524+
**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start.
525+
526+
### Monitoring Concurrency
527+
528+
Concurrency limits are enforced using Redis sets, so you can monitor them:
529+
530+
```python
531+
async def monitor_concurrency_usage() -> None:
532+
async with docket.redis() as redis:
533+
# Check how many tasks are running for a specific limit
534+
active_count = await redis.scard("docket:concurrency:customer_id:1001")
535+
print(f"Customer 1001 has {active_count} active tasks")
536+
537+
# List all active concurrency keys
538+
keys = await redis.keys("docket:concurrency:*")
539+
for key in keys:
540+
count = await redis.scard(key)
541+
print(f"{key}: {count} active tasks")
542+
```
543+
544+
### Best Practices
545+
546+
1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint).
547+
548+
2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints.
549+
550+
3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts.
551+
552+
4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays.
553+
554+
5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively.
555+
556+
Concurrency control helps you build robust systems that respect resource limits while maintaining high throughput for independent operations.
557+
380558
These advanced patterns enable building sophisticated distributed systems that can adapt to changing conditions, handle operational requirements, and provide the debugging and testing capabilities needed for production deployments.

examples/concurrency_control.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Example demonstrating concurrency control in docket.
4+
5+
This example shows how to use ConcurrencyLimit to control the number of
6+
concurrent tasks based on specific argument values.
7+
"""
8+
9+
import asyncio
10+
from datetime import datetime
11+
12+
from docket import ConcurrencyLimit, Docket, Worker
13+
from common import run_redis
14+
15+
16+
async def process_customer_data(
17+
customer_id: int,
18+
data: str,
19+
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1),
20+
):
21+
"""Process customer data - only one task per customer_id can run at a time."""
22+
print(
23+
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Started processing customer {customer_id}: {data}"
24+
)
25+
26+
# Simulate some work
27+
await asyncio.sleep(0.5)
28+
29+
print(
30+
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Finished processing customer {customer_id}: {data}"
31+
)
32+
33+
34+
async def backup_database(
35+
db_name: str,
36+
table: str,
37+
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2),
38+
):
39+
"""Backup database tables - max 2 concurrent backups per database."""
40+
print(
41+
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Started backing up {db_name}.{table}"
42+
)
43+
44+
# Simulate backup work
45+
await asyncio.sleep(0.3)
46+
47+
print(
48+
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Finished backing up {db_name}.{table}"
49+
)
50+
51+
52+
async def regular_task(task_id: int):
53+
"""Regular task without concurrency limits."""
54+
print(
55+
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Regular task {task_id} started"
56+
)
57+
await asyncio.sleep(0.1)
58+
print(
59+
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Regular task {task_id} finished"
60+
)
61+
62+
63+
async def main():
64+
async with run_redis("7.4.2") as redis_url:
65+
async with Docket(name="concurrency-demo", url=redis_url) as docket:
66+
# Register tasks
67+
docket.register(process_customer_data)
68+
docket.register(backup_database)
69+
docket.register(regular_task)
70+
71+
print("=== Scheduling tasks ===")
72+
73+
# Schedule multiple tasks for the same customer (will run sequentially)
74+
print("Scheduling 3 tasks for customer 1001 (will run sequentially)...")
75+
await docket.add(process_customer_data)(customer_id=1001, data="order-1")
76+
await docket.add(process_customer_data)(customer_id=1001, data="order-2")
77+
await docket.add(process_customer_data)(customer_id=1001, data="order-3")
78+
79+
# Schedule tasks for different customers (will run concurrently)
80+
print("Scheduling tasks for different customers (will run concurrently)...")
81+
await docket.add(process_customer_data)(
82+
customer_id=2001, data="profile-update"
83+
)
84+
await docket.add(process_customer_data)(
85+
customer_id=3001, data="payment-update"
86+
)
87+
88+
# Schedule database backups (max 2 concurrent per database)
89+
print(
90+
"Scheduling 4 backup tasks for 'users' database (max 2 concurrent)..."
91+
)
92+
await docket.add(backup_database)(db_name="users", table="accounts")
93+
await docket.add(backup_database)(db_name="users", table="profiles")
94+
await docket.add(backup_database)(db_name="users", table="sessions")
95+
await docket.add(backup_database)(db_name="users", table="preferences")
96+
97+
# Schedule backups for different database (will run concurrently with users db)
98+
print("Scheduling backup for 'orders' database...")
99+
await docket.add(backup_database)(db_name="orders", table="transactions")
100+
101+
# Schedule regular tasks (no concurrency limits)
102+
print("Scheduling regular tasks (no concurrency limits)...")
103+
for i in range(3):
104+
await docket.add(regular_task)(task_id=i)
105+
106+
print("\n=== Starting worker (concurrency=8) ===")
107+
async with Worker(docket, concurrency=8) as worker:
108+
await worker.run_until_finished()
109+
110+
print("\n=== All tasks completed ===")
111+
112+
113+
if __name__ == "__main__":
114+
asyncio.run(main())

src/docket/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from .annotations import Logged
1212
from .dependencies import (
13+
ConcurrencyLimit,
1314
CurrentDocket,
1415
CurrentExecution,
1516
CurrentWorker,
@@ -28,6 +29,7 @@
2829

2930
__all__ = [
3031
"__version__",
32+
"ConcurrencyLimit",
3133
"CurrentDocket",
3234
"CurrentExecution",
3335
"CurrentWorker",

0 commit comments

Comments
 (0)