Skip to content

Commit 977b99e

Browse files
chrisguidryclaude
andauthored
Expand documentation with comprehensive guides addressing #134 (#145)
## Summary Closes #134 by adding detailed documentation that covers all the advanced features demonstrated in the test suite. Added four new comprehensive guides: - Dependencies Guide - dependency injection, retry patterns, timeouts, custom dependencies - Testing with Docket - pytest fixtures, testing utilities, integration testing - Advanced Task Patterns - perpetual tasks, striking/restoring, logging, task chains - Docket in Production - Redis architecture, monitoring, deployment strategies Enhanced Getting Started with better explanations of task keys and idempotency. Updated navigation structure for better learning flow. Added FastAPI/Typer context to README. Fixed factual errors in dependency examples and clarified task cancellation behavior. Reformatted code blocks and softened overly confident language. 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 8892251 commit 977b99e

File tree

7 files changed

+1653
-198
lines changed

7 files changed

+1653
-198
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ reference](https://chrisguidry.github.io/docket/api-reference/).
5757

5858
🧩 Fully type-complete and type-aware for your background task functions
5959

60+
💉 Dependency injection like FastAPI, Typer, and FastMCP for reusable resources
61+
6062
## Installing `docket`
6163

6264
Docket is [available on PyPI](https://pypi.org/project/pydocket/) under the package name

docs/advanced-patterns.md

Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
# Advanced Task Patterns
2+
3+
Docket is made for building complex distributed systems, and the patterns below highlight some of the original use cases for Docket.
4+
5+
## Perpetual Tasks
6+
7+
Perpetual tasks automatically reschedule themselves, making them well-suited for recurring work like health checks, data synchronization, or periodic cleanup operations.
8+
9+
### Basic Perpetual Tasks
10+
11+
```python
12+
from docket import Perpetual
13+
14+
async def health_check_service(
15+
service_url: str,
16+
perpetual: Perpetual = Perpetual(every=timedelta(minutes=5))
17+
) -> None:
18+
try:
19+
response = await http_client.get(f"{service_url}/health")
20+
response.raise_for_status()
21+
print(f"{service_url} is healthy")
22+
except Exception as e:
23+
print(f"{service_url} failed health check: {e}")
24+
await send_alert(f"Service {service_url} is down")
25+
26+
# Schedule the task once, it will run every 5 minutes forever
27+
await docket.add(health_check_service)("https://api.example.com")
28+
```
29+
30+
After each execution, the task automatically schedules itself to run again after the specified interval.
31+
32+
### Automatic Startup
33+
34+
Perpetual tasks can start themselves automatically when a worker sees them, without needing to be explicitly scheduled:
35+
36+
```python
37+
async def background_cleanup(
38+
perpetual: Perpetual = Perpetual(
39+
every=timedelta(hours=1),
40+
automatic=True
41+
)
42+
) -> None:
43+
deleted_count = await cleanup_old_records()
44+
print(f"Cleaned up {deleted_count} old records")
45+
46+
# Just register the task - no need to schedule it
47+
docket.register(background_cleanup)
48+
49+
# When a worker starts, it will automatically begin running this task
50+
# The task key will be the function name: "background_cleanup"
51+
```
52+
53+
### Self-Canceling Tasks
54+
55+
Perpetual tasks can stop themselves when their work is done:
56+
57+
```python
58+
async def monitor_deployment(
59+
deployment_id: str,
60+
perpetual: Perpetual = Perpetual(every=timedelta(seconds=30))
61+
) -> None:
62+
status = await check_deployment_status(deployment_id)
63+
64+
if status in ["completed", "failed"]:
65+
await notify_deployment_finished(deployment_id, status)
66+
perpetual.cancel() # Stop monitoring this deployment
67+
return
68+
69+
print(f"Deployment {deployment_id} status: {status}")
70+
```
71+
72+
### Dynamic Parameters
73+
74+
Perpetual tasks can change their arguments or timing for the next execution:
75+
76+
```python
77+
async def adaptive_rate_limiter(
78+
api_endpoint: str,
79+
requests_per_minute: int = 60,
80+
perpetual: Perpetual = Perpetual(every=timedelta(minutes=1))
81+
) -> None:
82+
# Check current API load
83+
current_load = await check_api_load(api_endpoint)
84+
85+
if current_load > 0.8: # High load
86+
new_rate = max(30, requests_per_minute - 10)
87+
perpetual.every = timedelta(seconds=30) # Check more frequently
88+
print(f"High load detected, reducing rate to {new_rate} req/min")
89+
else: # Normal load
90+
new_rate = min(120, requests_per_minute + 5)
91+
perpetual.every = timedelta(minutes=1) # Normal check interval
92+
print(f"Normal load, increasing rate to {new_rate} req/min")
93+
94+
# Schedule next run with updated parameters
95+
perpetual.perpetuate(api_endpoint, new_rate)
96+
```
97+
98+
### Error Resilience
99+
100+
Perpetual tasks automatically reschedule themselves regardless of success or failure:
101+
102+
```python
103+
async def resilient_sync(
104+
source_url: str,
105+
perpetual: Perpetual = Perpetual(every=timedelta(minutes=15))
106+
) -> None:
107+
# This will ALWAYS reschedule, whether it succeeds or fails
108+
await sync_data_from_source(source_url)
109+
print(f"Successfully synced data from {source_url}")
110+
```
111+
112+
You don't need try/except blocks to ensure rescheduling - Docket handles this automatically. Whether the task completes successfully or raises an exception, the next execution will be scheduled according to the `every` interval.
113+
114+
### Find & Flood Pattern
115+
116+
A common perpetual task pattern is "find & flood" - a single perpetual task that periodically discovers work to do, then creates many smaller tasks to handle the actual work:
117+
118+
```python
119+
from docket import CurrentDocket, Perpetual
120+
121+
async def find_pending_orders(
122+
docket: Docket = CurrentDocket(),
123+
perpetual: Perpetual = Perpetual(every=timedelta(minutes=1))
124+
) -> None:
125+
# Find all orders that need processing
126+
pending_orders = await database.fetch_pending_orders()
127+
128+
# Flood the queue with individual processing tasks
129+
for order in pending_orders:
130+
await docket.add(process_single_order)(order.id)
131+
132+
print(f"Queued {len(pending_orders)} orders for processing")
133+
134+
async def process_single_order(order_id: int) -> None:
135+
# Handle one specific order
136+
await process_order_payment(order_id)
137+
await update_inventory(order_id)
138+
await send_confirmation_email(order_id)
139+
```
140+
141+
This pattern separates discovery (finding work) from execution (doing work), allowing for better load distribution and fault isolation. The perpetual task stays lightweight and fast, while the actual work is distributed across many workers.
142+
143+
## Striking and Restoring Tasks
144+
145+
Striking allows you to temporarily disable tasks without redeploying code. This is invaluable for incident response, gradual rollouts, or handling problematic customers.
146+
147+
### Striking Entire Task Types
148+
149+
Disable all instances of a specific task:
150+
151+
```python
152+
# Disable all order processing during maintenance
153+
await docket.strike(process_order)
154+
155+
# Orders added during this time won't be processed
156+
await docket.add(process_order)(order_id=12345) # Won't run
157+
await docket.add(process_order)(order_id=67890) # Won't run
158+
159+
# Re-enable when ready
160+
await docket.restore(process_order)
161+
```
162+
163+
### Striking by Parameter Values
164+
165+
Disable tasks based on their arguments using comparison operators:
166+
167+
```python
168+
# Block all tasks for a problematic customer
169+
await docket.strike(None, "customer_id", "==", "12345")
170+
171+
# Block low-priority work during high load
172+
await docket.strike(process_order, "priority", "<=", "low")
173+
174+
# Block all orders above a certain value during fraud investigation
175+
await docket.strike(process_payment, "amount", ">", 10000)
176+
177+
# Later, restore them
178+
await docket.restore(None, "customer_id", "==", "12345")
179+
await docket.restore(process_order, "priority", "<=", "low")
180+
```
181+
182+
Supported operators include `==`, `!=`, `<`, `<=`, `>`, `>=`.
183+
184+
### Striking Specific Task-Parameter Combinations
185+
186+
Target very specific scenarios:
187+
188+
```python
189+
# Block only high-value orders for a specific customer
190+
await docket.strike(process_order, "customer_id", "==", "12345")
191+
await docket.strike(process_order, "amount", ">", 1000)
192+
193+
# This order won't run (blocked customer)
194+
await docket.add(process_order)(customer_id="12345", amount=500)
195+
196+
# This order won't run (blocked customer AND high amount)
197+
await docket.add(process_order)(customer_id="12345", amount=2000)
198+
199+
# This order WILL run (different customer)
200+
await docket.add(process_order)(customer_id="67890", amount=2000)
201+
```
202+
203+
Striking is useful for incident response when you need to quickly disable failing tasks, customer management to block problematic accounts, gradual rollouts where you disable features for certain parameters, load management during high traffic, and debugging to isolate specific scenarios.
204+
205+
## Advanced Logging and Debugging
206+
207+
### Argument Logging
208+
209+
Control which task arguments appear in logs using the `Logged` annotation:
210+
211+
```python
212+
from typing import Annotated
213+
from docket import Logged
214+
215+
async def process_payment(
216+
customer_id: Annotated[str, Logged], # Will be logged
217+
credit_card: str, # Won't be logged
218+
amount: Annotated[float, Logged()] = 0.0, # Will be logged
219+
trace_id: Annotated[str, Logged] = "unknown" # Will be logged
220+
) -> None:
221+
# Process the payment...
222+
pass
223+
224+
# Log output will show:
225+
# process_payment('12345', credit_card=..., amount=150.0, trace_id='abc-123')
226+
```
227+
228+
### Collection Length Logging
229+
230+
For large collections, log just their size instead of contents:
231+
232+
```python
233+
async def bulk_update_users(
234+
user_ids: Annotated[list[str], Logged(length_only=True)],
235+
metadata: Annotated[dict[str, str], Logged(length_only=True)],
236+
options: Annotated[set[str], Logged(length_only=True)]
237+
) -> None:
238+
# Process users...
239+
pass
240+
241+
# Log output will show:
242+
# bulk_update_users([len 150], metadata={len 5}, options={len 3})
243+
```
244+
245+
This prevents logs from being overwhelmed with large data structures while still providing useful information.
246+
247+
### Task Context Logging
248+
249+
Use `TaskLogger` for structured logging with task context:
250+
251+
```python
252+
from logging import Logger, LoggerAdapter
253+
from docket import TaskLogger
254+
255+
async def complex_data_pipeline(
256+
dataset_id: str,
257+
logger: LoggerAdapter[Logger] = TaskLogger()
258+
) -> None:
259+
logger.info("Starting data pipeline", extra={"dataset_id": dataset_id})
260+
261+
try:
262+
await extract_data(dataset_id)
263+
logger.info("Data extraction completed")
264+
265+
await transform_data(dataset_id)
266+
logger.info("Data transformation completed")
267+
268+
await load_data(dataset_id)
269+
logger.info("Data loading completed")
270+
271+
except Exception as e:
272+
logger.error("Pipeline failed", extra={"error": str(e)})
273+
raise
274+
```
275+
276+
The logger automatically includes task context like the task name, key, and worker information.
277+
278+
### Built-in Utility Tasks
279+
280+
Docket provides helpful debugging tasks:
281+
282+
```python
283+
from docket import tasks
284+
285+
# Simple trace logging
286+
await docket.add(tasks.trace)("System startup completed")
287+
await docket.add(tasks.trace)("Processing batch 123")
288+
289+
# Intentional failures for testing error handling
290+
await docket.add(tasks.fail)("Testing error notification system")
291+
```
292+
293+
These are particularly useful for:
294+
- Marking milestones in complex workflows
295+
- Testing monitoring and alerting systems
296+
- Debugging task execution order
297+
- Creating synthetic load for testing
298+
299+
## Task Chain Patterns
300+
301+
### Sequential Processing
302+
303+
Create chains of related tasks that pass data forward:
304+
305+
```python
306+
async def download_data(
307+
url: str,
308+
docket: Docket = CurrentDocket()
309+
) -> None:
310+
file_path = await download_file(url)
311+
await docket.add(validate_data)(file_path)
312+
313+
async def validate_data(
314+
file_path: str,
315+
docket: Docket = CurrentDocket()
316+
) -> None:
317+
if await is_valid_data(file_path):
318+
await docket.add(process_data)(file_path)
319+
else:
320+
await docket.add(handle_invalid_data)(file_path)
321+
322+
async def process_data(file_path: str) -> None:
323+
# Final processing step
324+
await transform_and_store(file_path)
325+
```
326+
327+
### Fan-out Processing
328+
329+
Break large tasks into parallel subtasks:
330+
331+
```python
332+
async def process_large_dataset(
333+
dataset_id: str,
334+
docket: Docket = CurrentDocket()
335+
) -> None:
336+
chunk_ids = await split_dataset_into_chunks(dataset_id)
337+
338+
# Schedule parallel processing of all chunks
339+
for chunk_id in chunk_ids:
340+
await docket.add(process_chunk)(dataset_id, chunk_id)
341+
342+
# Schedule a task to run after all chunks should be done
343+
estimated_completion = datetime.now(timezone.utc) + timedelta(hours=2)
344+
await docket.add(
345+
finalize_dataset,
346+
when=estimated_completion,
347+
key=f"finalize-{dataset_id}"
348+
)(dataset_id, len(chunk_ids))
349+
350+
async def process_chunk(dataset_id: str, chunk_id: str) -> None:
351+
await process_data_chunk(dataset_id, chunk_id)
352+
await mark_chunk_complete(dataset_id, chunk_id)
353+
```
354+
355+
### Conditional Workflows
356+
357+
Tasks can make decisions about what work to schedule next:
358+
359+
```python
360+
async def analyze_user_behavior(
361+
user_id: str,
362+
docket: Docket = CurrentDocket()
363+
) -> None:
364+
behavior_data = await collect_user_behavior(user_id)
365+
366+
if behavior_data.indicates_churn_risk():
367+
await docket.add(create_retention_campaign)(user_id)
368+
elif behavior_data.indicates_upsell_opportunity():
369+
await docket.add(create_upsell_campaign)(user_id)
370+
elif behavior_data.indicates_satisfaction():
371+
# Schedule a follow-up check in 30 days
372+
future_check = datetime.now(timezone.utc) + timedelta(days=30)
373+
await docket.add(
374+
analyze_user_behavior,
375+
when=future_check,
376+
key=f"behavior-check-{user_id}"
377+
)(user_id)
378+
```
379+
380+
These advanced patterns enable building sophisticated distributed systems that can adapt to changing conditions, handle operational requirements, and provide the debugging and testing capabilities needed for production deployments.

0 commit comments

Comments
 (0)