Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions docs/advanced-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,4 +377,182 @@ async def analyze_user_behavior(
)(user_id)
```

## Concurrency Control

Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections.

### Basic Concurrency Limits

Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments:

```python
from docket import ConcurrencyLimit

async def process_customer_data(
customer_id: int,
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
) -> None:
# Only one task per customer_id can run at a time
await update_customer_profile(customer_id)
await recalculate_customer_metrics(customer_id)

# These will run sequentially for the same customer
await docket.add(process_customer_data)(customer_id=1001)
await docket.add(process_customer_data)(customer_id=1001)
await docket.add(process_customer_data)(customer_id=1001)

# But different customers can run concurrently
await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel
await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel
```

### Database Connection Pooling

Limit concurrent database operations to prevent overwhelming your database:

```python
async def backup_database_table(
db_name: str,
table_name: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2)
) -> None:
# Maximum 2 backup operations per database at once
await create_table_backup(db_name, table_name)
await verify_backup_integrity(db_name, table_name)

# Schedule many backup tasks - only 2 per database will run concurrently
tables = ["users", "orders", "products", "analytics", "logs"]
for table in tables:
await docket.add(backup_database_table)("production", table)
await docket.add(backup_database_table)("staging", table)
```

### API Rate Limiting

Protect external APIs from being overwhelmed:

```python
async def sync_user_with_external_service(
user_id: int,
service_name: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5)
) -> None:
# Limit to 5 concurrent API calls per external service
api_client = get_api_client(service_name)
user_data = await fetch_user_data(user_id)
await api_client.sync_user(user_data)

# These respect per-service limits
await docket.add(sync_user_with_external_service)(123, "salesforce")
await docket.add(sync_user_with_external_service)(456, "salesforce") # Will queue if needed
await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel
```

### File Processing Limits

Control concurrent file operations to manage disk I/O:

```python
async def process_media_file(
file_path: str,
operation_type: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3)
) -> None:
# Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes)
if operation_type == "video_transcode":
await transcode_video(file_path)
elif operation_type == "image_resize":
await resize_image(file_path)
elif operation_type == "audio_compress":
await compress_audio(file_path)

# Different operation types can run concurrently, but each type is limited
await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode")
await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode")
await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel
```

### Custom Scopes

Use custom scopes to create independent concurrency limits:

```python
async def process_tenant_data(
tenant_id: str,
operation: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(
"tenant_id",
max_concurrent=2,
scope="tenant_operations"
)
) -> None:
# Each tenant can have up to 2 concurrent operations
await perform_tenant_operation(tenant_id, operation)

async def process_global_data(
data_type: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(
"data_type",
max_concurrent=1,
scope="global_operations" # Separate from tenant operations
)
) -> None:
# Global operations have their own concurrency limits
await process_global_data_type(data_type)
```

### Multi-Level Concurrency

Combine multiple concurrency controls for complex scenarios:

```python
async def process_user_export(
user_id: int,
export_type: str,
region: str,
user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1),
type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3),
region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10)
) -> None:
# This task respects ALL concurrency limits:
# - Only 1 export per user at a time
# - Only 3 exports of each type globally
# - Only 10 exports per region
await generate_user_export(user_id, export_type, region)
```

**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start.

### Monitoring Concurrency

Concurrency limits are enforced using Redis sets, so you can monitor them:

```python
async def monitor_concurrency_usage() -> None:
async with docket.redis() as redis:
# Check how many tasks are running for a specific limit
active_count = await redis.scard("docket:concurrency:customer_id:1001")
print(f"Customer 1001 has {active_count} active tasks")

# List all active concurrency keys
keys = await redis.keys("docket:concurrency:*")
for key in keys:
count = await redis.scard(key)
print(f"{key}: {count} active tasks")
```

### Best Practices

1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint).

2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints.

3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts.

4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays.

5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively.

Concurrency control helps you build robust systems that respect resource limits while maintaining high throughput for independent operations.

These advanced patterns enable building sophisticated distributed systems that can adapt to changing conditions, handle operational requirements, and provide the debugging and testing capabilities needed for production deployments.
114 changes: 114 additions & 0 deletions examples/concurrency_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Example demonstrating concurrency control in docket.

This example shows how to use ConcurrencyLimit to control the number of
concurrent tasks based on specific argument values.
"""

import asyncio
from datetime import datetime

from docket import ConcurrencyLimit, Docket, Worker
from common import run_redis


async def process_customer_data(
customer_id: int,
data: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1),
):
"""Process customer data - only one task per customer_id can run at a time."""
print(
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Started processing customer {customer_id}: {data}"
)

# Simulate some work
await asyncio.sleep(0.5)

print(
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Finished processing customer {customer_id}: {data}"
)


async def backup_database(
db_name: str,
table: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2),
):
"""Backup database tables - max 2 concurrent backups per database."""
print(
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Started backing up {db_name}.{table}"
)

# Simulate backup work
await asyncio.sleep(0.3)

print(
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Finished backing up {db_name}.{table}"
)


async def regular_task(task_id: int):
"""Regular task without concurrency limits."""
print(
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Regular task {task_id} started"
)
await asyncio.sleep(0.1)
print(
f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] Regular task {task_id} finished"
)


async def main():
async with run_redis("7.4.2") as redis_url:
async with Docket(name="concurrency-demo", url=redis_url) as docket:
# Register tasks
docket.register(process_customer_data)
docket.register(backup_database)
docket.register(regular_task)

print("=== Scheduling tasks ===")

# Schedule multiple tasks for the same customer (will run sequentially)
print("Scheduling 3 tasks for customer 1001 (will run sequentially)...")
await docket.add(process_customer_data)(customer_id=1001, data="order-1")
await docket.add(process_customer_data)(customer_id=1001, data="order-2")
await docket.add(process_customer_data)(customer_id=1001, data="order-3")

# Schedule tasks for different customers (will run concurrently)
print("Scheduling tasks for different customers (will run concurrently)...")
await docket.add(process_customer_data)(
customer_id=2001, data="profile-update"
)
await docket.add(process_customer_data)(
customer_id=3001, data="payment-update"
)

# Schedule database backups (max 2 concurrent per database)
print(
"Scheduling 4 backup tasks for 'users' database (max 2 concurrent)..."
)
await docket.add(backup_database)(db_name="users", table="accounts")
await docket.add(backup_database)(db_name="users", table="profiles")
await docket.add(backup_database)(db_name="users", table="sessions")
await docket.add(backup_database)(db_name="users", table="preferences")

# Schedule backups for different database (will run concurrently with users db)
print("Scheduling backup for 'orders' database...")
await docket.add(backup_database)(db_name="orders", table="transactions")

# Schedule regular tasks (no concurrency limits)
print("Scheduling regular tasks (no concurrency limits)...")
for i in range(3):
await docket.add(regular_task)(task_id=i)

print("\n=== Starting worker (concurrency=8) ===")
async with Worker(docket, concurrency=8) as worker:
await worker.run_until_finished()

print("\n=== All tasks completed ===")


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions src/docket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from .annotations import Logged
from .dependencies import (
ConcurrencyLimit,
CurrentDocket,
CurrentExecution,
CurrentWorker,
Expand All @@ -28,6 +29,7 @@

__all__ = [
"__version__",
"ConcurrencyLimit",
"CurrentDocket",
"CurrentExecution",
"CurrentWorker",
Expand Down
Loading
Loading