|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Example: FastAPI Background Tasks with Docket |
| 4 | +
|
| 5 | +This example demonstrates how to integrate Docket with FastAPI to handle |
| 6 | +background tasks that are offloaded from web request handlers. This pattern |
| 7 | +is ideal for operations that are too slow to run synchronously during a |
| 8 | +web request (sending emails, processing images, generating reports, etc.). |
| 9 | +
|
| 10 | +Why use Docket instead of FastAPI's built-in background_tasks? |
| 11 | +-------------------------------------------------------------- |
| 12 | +FastAPI provides BackgroundTasks for simple fire-and-forget operations, but |
| 13 | +Docket offers critical advantages for production systems: |
| 14 | +
|
| 15 | +- **Durability**: Tasks are persisted in Redis and survive server restarts, |
| 16 | + deployments, and crashes. FastAPI's background_tasks run in-memory and are |
| 17 | + lost if the server goes down. |
| 18 | +
|
| 19 | +- **Horizontal scaling**: Multiple worker processes across different machines |
| 20 | + can process tasks from the same queue. FastAPI's background_tasks only run |
| 21 | + in the web server process that created them. |
| 22 | +
|
| 23 | +- **Advanced features**: Docket provides scheduling (run tasks at specific times), |
| 24 | + retries with exponential backoff, task dependencies, and more. FastAPI's |
| 25 | + background_tasks are simple callables with no built-in retry or scheduling. |
| 26 | +
|
| 27 | +- **Observability**: Monitor queued, running, and completed tasks across your |
| 28 | + entire system. Track worker health and task performance. |
| 29 | +
|
| 30 | +Use Docket when you need reliability and scalability. Use FastAPI's background_tasks |
| 31 | +for simple, non-critical operations where task loss on restart is acceptable. |
| 32 | +
|
| 33 | +Key patterns demonstrated: |
| 34 | +- Using FastAPI's lifespan context manager to start/stop Docket worker |
| 35 | +- Embedding a Docket worker within the web application process |
| 36 | +- Dependency injection to access Docket from route handlers |
| 37 | +- Scheduling background tasks from API endpoints |
| 38 | +
|
| 39 | +Architecture: |
| 40 | +- The Docket worker runs in a background asyncio task alongside uvicorn |
| 41 | +- Web requests return immediately after scheduling tasks |
| 42 | +- Background tasks are processed concurrently by the embedded worker |
| 43 | +
|
| 44 | +Required dependencies: |
| 45 | + uv pip install pydocket fastapi uvicorn |
| 46 | +
|
| 47 | +To run: |
| 48 | + uv run -s examples/fastapi_background_tasks.py |
| 49 | +
|
| 50 | +To test: |
| 51 | + curl -X POST http://localhost:8000/create_user \\ |
| 52 | + -H "Content-Type: application/json" \\ |
| 53 | + -d '{"name": "Jane Doe", "email": "jane@example.com", "password": "secret"}' |
| 54 | +
|
| 55 | +You should see the endpoint return immediately (201 Created), then 1 second |
| 56 | +later see the "Email sent" message in the server logs as the background task |
| 57 | +executes. |
| 58 | +""" |
| 59 | + |
| 60 | +from contextlib import asynccontextmanager |
| 61 | +import asyncio |
| 62 | +from datetime import datetime |
| 63 | +from typing import Annotated |
| 64 | +from fastapi import Depends, FastAPI, Request |
| 65 | +from pydantic import BaseModel |
| 66 | + |
| 67 | +from docket import Docket, Worker |
| 68 | + |
| 69 | +from common import run_redis |
| 70 | + |
| 71 | +# Redis connection URL - will be overridden by main() during testing |
| 72 | +redis_url = "redis://localhost:6379/0" |
| 73 | + |
| 74 | + |
| 75 | +# ============================================================================ |
| 76 | +# Background Task Definition |
| 77 | +# ============================================================================ |
| 78 | +# This is the function that will be executed as a background task. In a real |
| 79 | +# application, this might send an actual email via SMTP, an email service API, |
| 80 | +# or a message queue. Here we simulate a slow operation with asyncio.sleep(). |
| 81 | + |
| 82 | + |
| 83 | +async def send_email(email: str): |
| 84 | + """Simulates sending a welcome email to a new user.""" |
| 85 | + print(f"Sending email to {email}", flush=True) |
| 86 | + await asyncio.sleep(1) # Simulate slow I/O operation |
| 87 | + print( |
| 88 | + f"Email sent to {email} @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", |
| 89 | + flush=True, |
| 90 | + ) |
| 91 | + |
| 92 | + |
| 93 | +# ============================================================================ |
| 94 | +# FastAPI Lifespan Management |
| 95 | +# ============================================================================ |
| 96 | +# FastAPI's lifespan context manager runs during application startup and |
| 97 | +# shutdown. This is the perfect place to initialize Docket and start the |
| 98 | +# background worker. The worker will run in a separate asyncio task alongside |
| 99 | +# the web server, processing tasks as they're scheduled. |
| 100 | + |
| 101 | + |
| 102 | +@asynccontextmanager |
| 103 | +async def lifespan(app: FastAPI): |
| 104 | + """Manages Docket and Worker lifecycle alongside FastAPI application.""" |
| 105 | + worker_task: asyncio.Task[None] | None = None |
| 106 | + try: |
| 107 | + # Initialize Docket connection to Redis |
| 108 | + async with Docket(url=redis_url) as docket: |
| 109 | + # Store Docket instance in app state for access from route handlers |
| 110 | + app.state.docket = docket |
| 111 | + |
| 112 | + # Register our background task function with Docket |
| 113 | + docket.register(send_email) |
| 114 | + |
| 115 | + # Start the worker in a background asyncio task |
| 116 | + async with Worker(docket) as worker: |
| 117 | + # run_forever() processes tasks continuously |
| 118 | + worker_task = asyncio.create_task(worker.run_forever()) |
| 119 | + |
| 120 | + # Yield control back to FastAPI - app is now running with |
| 121 | + # both the web server and background worker active |
| 122 | + yield |
| 123 | + finally: |
| 124 | + # Cleanup: gracefully shutdown the worker when app stops |
| 125 | + if worker_task: |
| 126 | + worker_task.cancel() |
| 127 | + try: |
| 128 | + await worker_task |
| 129 | + except asyncio.CancelledError: |
| 130 | + pass |
| 131 | + |
| 132 | + |
| 133 | +# ============================================================================ |
| 134 | +# Dependency Injection Setup |
| 135 | +# ============================================================================ |
| 136 | +# FastAPI's dependency injection system allows us to easily access the Docket |
| 137 | +# instance from route handlers. This function extracts Docket from app state. |
| 138 | + |
| 139 | + |
| 140 | +def get_docket(request: Request) -> Docket: |
| 141 | + """Dependency that provides access to the Docket instance.""" |
| 142 | + return request.app.state.docket |
| 143 | + |
| 144 | + |
| 145 | +# Initialize FastAPI app with our lifespan manager |
| 146 | +# This ensures Docket worker starts when the app starts |
| 147 | +app = FastAPI(lifespan=lifespan) |
| 148 | + |
| 149 | + |
| 150 | +# ============================================================================ |
| 151 | +# API Route with Background Task |
| 152 | +# ============================================================================ |
| 153 | +# This route demonstrates the typical pattern: handle the request quickly, |
| 154 | +# schedule background work, and return immediately to the client. |
| 155 | + |
| 156 | + |
| 157 | +class User(BaseModel): |
| 158 | + """User registration data.""" |
| 159 | + |
| 160 | + name: str |
| 161 | + email: str |
| 162 | + password: str |
| 163 | + |
| 164 | + |
| 165 | +@app.post("/create_user", status_code=201) |
| 166 | +async def create_user(user: User, docket: Annotated[Docket, Depends(get_docket)]): |
| 167 | + """ |
| 168 | + Create a new user and send welcome email in the background. |
| 169 | +
|
| 170 | + The endpoint returns immediately after scheduling the email task. |
| 171 | + The actual email sending happens asynchronously in the background worker. |
| 172 | + """ |
| 173 | + # Schedule the send_email task with the user's email address |
| 174 | + # This returns almost instantly - the task is queued but not yet executed |
| 175 | + await docket.add(send_email)(user.email) |
| 176 | + |
| 177 | + # Return 201 Created immediately - client doesn't wait for email to send |
| 178 | + return |
| 179 | + |
| 180 | + |
| 181 | +# ============================================================================ |
| 182 | +# Test Harness |
| 183 | +# ============================================================================ |
| 184 | +# For demonstration purposes, we embed a temporary Redis instance. |
| 185 | +# In production, you would connect to your existing Redis server. |
| 186 | + |
| 187 | + |
| 188 | +async def main(): |
| 189 | + """Run the FastAPI app with an embedded test Redis instance.""" |
| 190 | + # Start a temporary Redis instance for testing |
| 191 | + async with run_redis("7.4.2") as url: |
| 192 | + global redis_url |
| 193 | + redis_url = url |
| 194 | + |
| 195 | + import uvicorn |
| 196 | + |
| 197 | + # Use uvicorn's async API to run the server within our event loop |
| 198 | + config = uvicorn.Config(app, host="0.0.0.0", port=8000) |
| 199 | + server = uvicorn.Server(config) |
| 200 | + await server.serve() |
| 201 | + |
| 202 | + |
| 203 | +if __name__ == "__main__": |
| 204 | + asyncio.run(main()) |
0 commit comments