Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ reference](https://chrisguidry.github.io/docket/api-reference/).

🧩 Fully type-complete and type-aware for your background task functions

💉 Dependency injection like FastAPI, Typer, and FastMCP for reusable resources

## Installing `docket`

Docket is [available on PyPI](https://pypi.org/project/pydocket/) under the package name
Expand Down
380 changes: 380 additions & 0 deletions docs/advanced-patterns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,380 @@
# Advanced Task Patterns

Docket is made for building complex distributed systems, and the patterns below highlight some of the original use cases for Docket.

## Perpetual Tasks

Perpetual tasks automatically reschedule themselves, making them well-suited for recurring work like health checks, data synchronization, or periodic cleanup operations.

### Basic Perpetual Tasks

```python
from docket import Perpetual

async def health_check_service(
service_url: str,
perpetual: Perpetual = Perpetual(every=timedelta(minutes=5))
) -> None:
try:
response = await http_client.get(f"{service_url}/health")
response.raise_for_status()
print(f"✓ {service_url} is healthy")
except Exception as e:
print(f"✗ {service_url} failed health check: {e}")
await send_alert(f"Service {service_url} is down")

# Schedule the task once, it will run every 5 minutes forever
await docket.add(health_check_service)("https://api.example.com")
```

After each execution, the task automatically schedules itself to run again after the specified interval.

### Automatic Startup

Perpetual tasks can start themselves automatically when a worker sees them, without needing to be explicitly scheduled:

```python
async def background_cleanup(
perpetual: Perpetual = Perpetual(
every=timedelta(hours=1),
automatic=True
)
) -> None:
deleted_count = await cleanup_old_records()
print(f"Cleaned up {deleted_count} old records")

# Just register the task - no need to schedule it
docket.register(background_cleanup)

# When a worker starts, it will automatically begin running this task
# The task key will be the function name: "background_cleanup"
```

### Self-Canceling Tasks

Perpetual tasks can stop themselves when their work is done:

```python
async def monitor_deployment(
deployment_id: str,
perpetual: Perpetual = Perpetual(every=timedelta(seconds=30))
) -> None:
status = await check_deployment_status(deployment_id)

if status in ["completed", "failed"]:
await notify_deployment_finished(deployment_id, status)
perpetual.cancel() # Stop monitoring this deployment
return

print(f"Deployment {deployment_id} status: {status}")
```

### Dynamic Parameters

Perpetual tasks can change their arguments or timing for the next execution:

```python
async def adaptive_rate_limiter(
api_endpoint: str,
requests_per_minute: int = 60,
perpetual: Perpetual = Perpetual(every=timedelta(minutes=1))
) -> None:
# Check current API load
current_load = await check_api_load(api_endpoint)

if current_load > 0.8: # High load
new_rate = max(30, requests_per_minute - 10)
perpetual.every = timedelta(seconds=30) # Check more frequently
print(f"High load detected, reducing rate to {new_rate} req/min")
else: # Normal load
new_rate = min(120, requests_per_minute + 5)
perpetual.every = timedelta(minutes=1) # Normal check interval
print(f"Normal load, increasing rate to {new_rate} req/min")

# Schedule next run with updated parameters
perpetual.perpetuate(api_endpoint, new_rate)
```

### Error Resilience

Perpetual tasks automatically reschedule themselves regardless of success or failure:

```python
async def resilient_sync(
source_url: str,
perpetual: Perpetual = Perpetual(every=timedelta(minutes=15))
) -> None:
# This will ALWAYS reschedule, whether it succeeds or fails
await sync_data_from_source(source_url)
print(f"Successfully synced data from {source_url}")
```

You don't need try/except blocks to ensure rescheduling - Docket handles this automatically. Whether the task completes successfully or raises an exception, the next execution will be scheduled according to the `every` interval.

### Find & Flood Pattern

A common perpetual task pattern is "find & flood" - a single perpetual task that periodically discovers work to do, then creates many smaller tasks to handle the actual work:

```python
from docket import CurrentDocket, Perpetual

async def find_pending_orders(
docket: Docket = CurrentDocket(),
perpetual: Perpetual = Perpetual(every=timedelta(minutes=1))
) -> None:
# Find all orders that need processing
pending_orders = await database.fetch_pending_orders()

# Flood the queue with individual processing tasks
for order in pending_orders:
await docket.add(process_single_order)(order.id)

print(f"Queued {len(pending_orders)} orders for processing")

async def process_single_order(order_id: int) -> None:
# Handle one specific order
await process_order_payment(order_id)
await update_inventory(order_id)
await send_confirmation_email(order_id)
```

This pattern separates discovery (finding work) from execution (doing work), allowing for better load distribution and fault isolation. The perpetual task stays lightweight and fast, while the actual work is distributed across many workers.

## Striking and Restoring Tasks

Striking allows you to temporarily disable tasks without redeploying code. This is invaluable for incident response, gradual rollouts, or handling problematic customers.

### Striking Entire Task Types

Disable all instances of a specific task:

```python
# Disable all order processing during maintenance
await docket.strike(process_order)

# Orders added during this time won't be processed
await docket.add(process_order)(order_id=12345) # Won't run
await docket.add(process_order)(order_id=67890) # Won't run

# Re-enable when ready
await docket.restore(process_order)
```

### Striking by Parameter Values

Disable tasks based on their arguments using comparison operators:

```python
# Block all tasks for a problematic customer
await docket.strike(None, "customer_id", "==", "12345")

# Block low-priority work during high load
await docket.strike(process_order, "priority", "<=", "low")

# Block all orders above a certain value during fraud investigation
await docket.strike(process_payment, "amount", ">", 10000)

# Later, restore them
await docket.restore(None, "customer_id", "==", "12345")
await docket.restore(process_order, "priority", "<=", "low")
```

Supported operators include `==`, `!=`, `<`, `<=`, `>`, `>=`.

### Striking Specific Task-Parameter Combinations

Target very specific scenarios:

```python
# Block only high-value orders for a specific customer
await docket.strike(process_order, "customer_id", "==", "12345")
await docket.strike(process_order, "amount", ">", 1000)

# This order won't run (blocked customer)
await docket.add(process_order)(customer_id="12345", amount=500)

# This order won't run (blocked customer AND high amount)
await docket.add(process_order)(customer_id="12345", amount=2000)

# This order WILL run (different customer)
await docket.add(process_order)(customer_id="67890", amount=2000)
```

Striking is useful for incident response when you need to quickly disable failing tasks, customer management to block problematic accounts, gradual rollouts where you disable features for certain parameters, load management during high traffic, and debugging to isolate specific scenarios.

## Advanced Logging and Debugging

### Argument Logging

Control which task arguments appear in logs using the `Logged` annotation:

```python
from typing import Annotated
from docket import Logged

async def process_payment(
customer_id: Annotated[str, Logged], # Will be logged
credit_card: str, # Won't be logged
amount: Annotated[float, Logged()] = 0.0, # Will be logged
trace_id: Annotated[str, Logged] = "unknown" # Will be logged
) -> None:
# Process the payment...
pass

# Log output will show:
# process_payment('12345', credit_card=..., amount=150.0, trace_id='abc-123')
```

### Collection Length Logging

For large collections, log just their size instead of contents:

```python
async def bulk_update_users(
user_ids: Annotated[list[str], Logged(length_only=True)],
metadata: Annotated[dict[str, str], Logged(length_only=True)],
options: Annotated[set[str], Logged(length_only=True)]
) -> None:
# Process users...
pass

# Log output will show:
# bulk_update_users([len 150], metadata={len 5}, options={len 3})
```

This prevents logs from being overwhelmed with large data structures while still providing useful information.

### Task Context Logging

Use `TaskLogger` for structured logging with task context:

```python
from logging import Logger, LoggerAdapter
from docket import TaskLogger

async def complex_data_pipeline(
dataset_id: str,
logger: LoggerAdapter[Logger] = TaskLogger()
) -> None:
logger.info("Starting data pipeline", extra={"dataset_id": dataset_id})

try:
await extract_data(dataset_id)
logger.info("Data extraction completed")

await transform_data(dataset_id)
logger.info("Data transformation completed")

await load_data(dataset_id)
logger.info("Data loading completed")

except Exception as e:
logger.error("Pipeline failed", extra={"error": str(e)})
raise
```

The logger automatically includes task context like the task name, key, and worker information.

### Built-in Utility Tasks

Docket provides helpful debugging tasks:

```python
from docket import tasks

# Simple trace logging
await docket.add(tasks.trace)("System startup completed")
await docket.add(tasks.trace)("Processing batch 123")

# Intentional failures for testing error handling
await docket.add(tasks.fail)("Testing error notification system")
```

These are particularly useful for:
- Marking milestones in complex workflows
- Testing monitoring and alerting systems
- Debugging task execution order
- Creating synthetic load for testing

## Task Chain Patterns

### Sequential Processing

Create chains of related tasks that pass data forward:

```python
async def download_data(
url: str,
docket: Docket = CurrentDocket()
) -> None:
file_path = await download_file(url)
await docket.add(validate_data)(file_path)

async def validate_data(
file_path: str,
docket: Docket = CurrentDocket()
) -> None:
if await is_valid_data(file_path):
await docket.add(process_data)(file_path)
else:
await docket.add(handle_invalid_data)(file_path)

async def process_data(file_path: str) -> None:
# Final processing step
await transform_and_store(file_path)
```

### Fan-out Processing

Break large tasks into parallel subtasks:

```python
async def process_large_dataset(
dataset_id: str,
docket: Docket = CurrentDocket()
) -> None:
chunk_ids = await split_dataset_into_chunks(dataset_id)

# Schedule parallel processing of all chunks
for chunk_id in chunk_ids:
await docket.add(process_chunk)(dataset_id, chunk_id)

# Schedule a task to run after all chunks should be done
estimated_completion = datetime.now(timezone.utc) + timedelta(hours=2)
await docket.add(
finalize_dataset,
when=estimated_completion,
key=f"finalize-{dataset_id}"
)(dataset_id, len(chunk_ids))

async def process_chunk(dataset_id: str, chunk_id: str) -> None:
await process_data_chunk(dataset_id, chunk_id)
await mark_chunk_complete(dataset_id, chunk_id)
```

### Conditional Workflows

Tasks can make decisions about what work to schedule next:

```python
async def analyze_user_behavior(
user_id: str,
docket: Docket = CurrentDocket()
) -> None:
behavior_data = await collect_user_behavior(user_id)

if behavior_data.indicates_churn_risk():
await docket.add(create_retention_campaign)(user_id)
elif behavior_data.indicates_upsell_opportunity():
await docket.add(create_upsell_campaign)(user_id)
elif behavior_data.indicates_satisfaction():
# Schedule a follow-up check in 30 days
future_check = datetime.now(timezone.utc) + timedelta(days=30)
await docket.add(
analyze_user_behavior,
when=future_check,
key=f"behavior-check-{user_id}"
)(user_id)
```

These advanced patterns enable building sophisticated distributed systems that can adapt to changing conditions, handle operational requirements, and provide the debugging and testing capabilities needed for production deployments.
Loading