diff --git a/README.md b/README.md index eacea4f9..2beb2fc7 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,8 @@ reference](https://chrisguidry.github.io/docket/api-reference/). 🧩 Fully type-complete and type-aware for your background task functions +💉 Dependency injection like FastAPI, Typer, and FastMCP for reusable resources + ## Installing `docket` Docket is [available on PyPI](https://pypi.org/project/pydocket/) under the package name diff --git a/docs/advanced-patterns.md b/docs/advanced-patterns.md new file mode 100644 index 00000000..3135cdab --- /dev/null +++ b/docs/advanced-patterns.md @@ -0,0 +1,380 @@ +# Advanced Task Patterns + +Docket is made for building complex distributed systems, and the patterns below highlight some of the original use cases for Docket. + +## Perpetual Tasks + +Perpetual tasks automatically reschedule themselves, making them well-suited for recurring work like health checks, data synchronization, or periodic cleanup operations. + +### Basic Perpetual Tasks + +```python +from docket import Perpetual + +async def health_check_service( + service_url: str, + perpetual: Perpetual = Perpetual(every=timedelta(minutes=5)) +) -> None: + try: + response = await http_client.get(f"{service_url}/health") + response.raise_for_status() + print(f"✓ {service_url} is healthy") + except Exception as e: + print(f"✗ {service_url} failed health check: {e}") + await send_alert(f"Service {service_url} is down") + +# Schedule the task once, it will run every 5 minutes forever +await docket.add(health_check_service)("https://api.example.com") +``` + +After each execution, the task automatically schedules itself to run again after the specified interval. + +### Automatic Startup + +Perpetual tasks can start themselves automatically when a worker sees them, without needing to be explicitly scheduled: + +```python +async def background_cleanup( + perpetual: Perpetual = Perpetual( + every=timedelta(hours=1), + automatic=True + ) +) -> None: + deleted_count = await cleanup_old_records() + print(f"Cleaned up {deleted_count} old records") + +# Just register the task - no need to schedule it +docket.register(background_cleanup) + +# When a worker starts, it will automatically begin running this task +# The task key will be the function name: "background_cleanup" +``` + +### Self-Canceling Tasks + +Perpetual tasks can stop themselves when their work is done: + +```python +async def monitor_deployment( + deployment_id: str, + perpetual: Perpetual = Perpetual(every=timedelta(seconds=30)) +) -> None: + status = await check_deployment_status(deployment_id) + + if status in ["completed", "failed"]: + await notify_deployment_finished(deployment_id, status) + perpetual.cancel() # Stop monitoring this deployment + return + + print(f"Deployment {deployment_id} status: {status}") +``` + +### Dynamic Parameters + +Perpetual tasks can change their arguments or timing for the next execution: + +```python +async def adaptive_rate_limiter( + api_endpoint: str, + requests_per_minute: int = 60, + perpetual: Perpetual = Perpetual(every=timedelta(minutes=1)) +) -> None: + # Check current API load + current_load = await check_api_load(api_endpoint) + + if current_load > 0.8: # High load + new_rate = max(30, requests_per_minute - 10) + perpetual.every = timedelta(seconds=30) # Check more frequently + print(f"High load detected, reducing rate to {new_rate} req/min") + else: # Normal load + new_rate = min(120, requests_per_minute + 5) + perpetual.every = timedelta(minutes=1) # Normal check interval + print(f"Normal load, increasing rate to {new_rate} req/min") + + # Schedule next run with updated parameters + perpetual.perpetuate(api_endpoint, new_rate) +``` + +### Error Resilience + +Perpetual tasks automatically reschedule themselves regardless of success or failure: + +```python +async def resilient_sync( + source_url: str, + perpetual: Perpetual = Perpetual(every=timedelta(minutes=15)) +) -> None: + # This will ALWAYS reschedule, whether it succeeds or fails + await sync_data_from_source(source_url) + print(f"Successfully synced data from {source_url}") +``` + +You don't need try/except blocks to ensure rescheduling - Docket handles this automatically. Whether the task completes successfully or raises an exception, the next execution will be scheduled according to the `every` interval. + +### Find & Flood Pattern + +A common perpetual task pattern is "find & flood" - a single perpetual task that periodically discovers work to do, then creates many smaller tasks to handle the actual work: + +```python +from docket import CurrentDocket, Perpetual + +async def find_pending_orders( + docket: Docket = CurrentDocket(), + perpetual: Perpetual = Perpetual(every=timedelta(minutes=1)) +) -> None: + # Find all orders that need processing + pending_orders = await database.fetch_pending_orders() + + # Flood the queue with individual processing tasks + for order in pending_orders: + await docket.add(process_single_order)(order.id) + + print(f"Queued {len(pending_orders)} orders for processing") + +async def process_single_order(order_id: int) -> None: + # Handle one specific order + await process_order_payment(order_id) + await update_inventory(order_id) + await send_confirmation_email(order_id) +``` + +This pattern separates discovery (finding work) from execution (doing work), allowing for better load distribution and fault isolation. The perpetual task stays lightweight and fast, while the actual work is distributed across many workers. + +## Striking and Restoring Tasks + +Striking allows you to temporarily disable tasks without redeploying code. This is invaluable for incident response, gradual rollouts, or handling problematic customers. + +### Striking Entire Task Types + +Disable all instances of a specific task: + +```python +# Disable all order processing during maintenance +await docket.strike(process_order) + +# Orders added during this time won't be processed +await docket.add(process_order)(order_id=12345) # Won't run +await docket.add(process_order)(order_id=67890) # Won't run + +# Re-enable when ready +await docket.restore(process_order) +``` + +### Striking by Parameter Values + +Disable tasks based on their arguments using comparison operators: + +```python +# Block all tasks for a problematic customer +await docket.strike(None, "customer_id", "==", "12345") + +# Block low-priority work during high load +await docket.strike(process_order, "priority", "<=", "low") + +# Block all orders above a certain value during fraud investigation +await docket.strike(process_payment, "amount", ">", 10000) + +# Later, restore them +await docket.restore(None, "customer_id", "==", "12345") +await docket.restore(process_order, "priority", "<=", "low") +``` + +Supported operators include `==`, `!=`, `<`, `<=`, `>`, `>=`. + +### Striking Specific Task-Parameter Combinations + +Target very specific scenarios: + +```python +# Block only high-value orders for a specific customer +await docket.strike(process_order, "customer_id", "==", "12345") +await docket.strike(process_order, "amount", ">", 1000) + +# This order won't run (blocked customer) +await docket.add(process_order)(customer_id="12345", amount=500) + +# This order won't run (blocked customer AND high amount) +await docket.add(process_order)(customer_id="12345", amount=2000) + +# This order WILL run (different customer) +await docket.add(process_order)(customer_id="67890", amount=2000) +``` + +Striking is useful for incident response when you need to quickly disable failing tasks, customer management to block problematic accounts, gradual rollouts where you disable features for certain parameters, load management during high traffic, and debugging to isolate specific scenarios. + +## Advanced Logging and Debugging + +### Argument Logging + +Control which task arguments appear in logs using the `Logged` annotation: + +```python +from typing import Annotated +from docket import Logged + +async def process_payment( + customer_id: Annotated[str, Logged], # Will be logged + credit_card: str, # Won't be logged + amount: Annotated[float, Logged()] = 0.0, # Will be logged + trace_id: Annotated[str, Logged] = "unknown" # Will be logged +) -> None: + # Process the payment... + pass + +# Log output will show: +# process_payment('12345', credit_card=..., amount=150.0, trace_id='abc-123') +``` + +### Collection Length Logging + +For large collections, log just their size instead of contents: + +```python +async def bulk_update_users( + user_ids: Annotated[list[str], Logged(length_only=True)], + metadata: Annotated[dict[str, str], Logged(length_only=True)], + options: Annotated[set[str], Logged(length_only=True)] +) -> None: + # Process users... + pass + +# Log output will show: +# bulk_update_users([len 150], metadata={len 5}, options={len 3}) +``` + +This prevents logs from being overwhelmed with large data structures while still providing useful information. + +### Task Context Logging + +Use `TaskLogger` for structured logging with task context: + +```python +from logging import Logger, LoggerAdapter +from docket import TaskLogger + +async def complex_data_pipeline( + dataset_id: str, + logger: LoggerAdapter[Logger] = TaskLogger() +) -> None: + logger.info("Starting data pipeline", extra={"dataset_id": dataset_id}) + + try: + await extract_data(dataset_id) + logger.info("Data extraction completed") + + await transform_data(dataset_id) + logger.info("Data transformation completed") + + await load_data(dataset_id) + logger.info("Data loading completed") + + except Exception as e: + logger.error("Pipeline failed", extra={"error": str(e)}) + raise +``` + +The logger automatically includes task context like the task name, key, and worker information. + +### Built-in Utility Tasks + +Docket provides helpful debugging tasks: + +```python +from docket import tasks + +# Simple trace logging +await docket.add(tasks.trace)("System startup completed") +await docket.add(tasks.trace)("Processing batch 123") + +# Intentional failures for testing error handling +await docket.add(tasks.fail)("Testing error notification system") +``` + +These are particularly useful for: +- Marking milestones in complex workflows +- Testing monitoring and alerting systems +- Debugging task execution order +- Creating synthetic load for testing + +## Task Chain Patterns + +### Sequential Processing + +Create chains of related tasks that pass data forward: + +```python +async def download_data( + url: str, + docket: Docket = CurrentDocket() +) -> None: + file_path = await download_file(url) + await docket.add(validate_data)(file_path) + +async def validate_data( + file_path: str, + docket: Docket = CurrentDocket() +) -> None: + if await is_valid_data(file_path): + await docket.add(process_data)(file_path) + else: + await docket.add(handle_invalid_data)(file_path) + +async def process_data(file_path: str) -> None: + # Final processing step + await transform_and_store(file_path) +``` + +### Fan-out Processing + +Break large tasks into parallel subtasks: + +```python +async def process_large_dataset( + dataset_id: str, + docket: Docket = CurrentDocket() +) -> None: + chunk_ids = await split_dataset_into_chunks(dataset_id) + + # Schedule parallel processing of all chunks + for chunk_id in chunk_ids: + await docket.add(process_chunk)(dataset_id, chunk_id) + + # Schedule a task to run after all chunks should be done + estimated_completion = datetime.now(timezone.utc) + timedelta(hours=2) + await docket.add( + finalize_dataset, + when=estimated_completion, + key=f"finalize-{dataset_id}" + )(dataset_id, len(chunk_ids)) + +async def process_chunk(dataset_id: str, chunk_id: str) -> None: + await process_data_chunk(dataset_id, chunk_id) + await mark_chunk_complete(dataset_id, chunk_id) +``` + +### Conditional Workflows + +Tasks can make decisions about what work to schedule next: + +```python +async def analyze_user_behavior( + user_id: str, + docket: Docket = CurrentDocket() +) -> None: + behavior_data = await collect_user_behavior(user_id) + + if behavior_data.indicates_churn_risk(): + await docket.add(create_retention_campaign)(user_id) + elif behavior_data.indicates_upsell_opportunity(): + await docket.add(create_upsell_campaign)(user_id) + elif behavior_data.indicates_satisfaction(): + # Schedule a follow-up check in 30 days + future_check = datetime.now(timezone.utc) + timedelta(days=30) + await docket.add( + analyze_user_behavior, + when=future_check, + key=f"behavior-check-{user_id}" + )(user_id) +``` + +These advanced patterns enable building sophisticated distributed systems that can adapt to changing conditions, handle operational requirements, and provide the debugging and testing capabilities needed for production deployments. diff --git a/docs/dependencies.md b/docs/dependencies.md new file mode 100644 index 00000000..4b15832b --- /dev/null +++ b/docs/dependencies.md @@ -0,0 +1,394 @@ +# Dependencies Guide + +Docket tasks include a dependency injection system that provides access to context, configuration, and custom resources. This system is similar to FastAPI's dependency injection but tailored for background task patterns. + +## Built-in Context Dependencies + +### Accessing the Current Docket + +Tasks often need to schedule more work. The `CurrentDocket` dependency gives you access to the same docket the worker is processing: + +```python +from pathlib import Path +from datetime import datetime, timedelta, timezone +from docket import Docket, CurrentDocket + +def now() -> datetime: + return datetime.now(timezone.utc) + +async def poll_for_file( + file_path: str, + docket: Docket = CurrentDocket() +) -> None: + path = Path(file_path) + if path.exists(): + print(f"File {file_path} found!") + return + + # Schedule another check in 30 seconds + await docket.add( + poll_for_file, + when=now() + timedelta(seconds=30) + )(file_path) +``` + +This is especially useful for self-perpetuating tasks that create chains of future work. + +### Getting Your Task Key + +Use `TaskKey` to access the current task's key, which is helpful for creating related work or maintaining task chains: + +```python +from docket import CurrentDocket, TaskKey + +async def process_data_chunk( + dataset_id: int, + chunk: int, + total_chunks: int, + key: str = TaskKey(), + docket: Docket = CurrentDocket() +) -> None: + print(f"Processing chunk {chunk}/{total_chunks} for dataset {dataset_id}") + + # Process this chunk... + await process_chunk_data(dataset_id, chunk) + + if chunk < total_chunks: + # Schedule next chunk with a related key + next_key = f"dataset-{dataset_id}-chunk-{chunk + 1}" + await docket.add( + process_data_chunk, + key=next_key + )(dataset_id, chunk + 1, total_chunks) +``` + +### Worker and Execution Context + +Access the current worker and execution details when needed: + +```python +from docket import CurrentWorker, CurrentExecution, Worker, Execution + +async def diagnostic_task( + worker: Worker = CurrentWorker(), + execution: Execution = CurrentExecution() +) -> None: + print(f"Running on worker: {worker.name}") + print(f"Task key: {execution.key}") + print(f"Scheduled at: {execution.when}") + print(f"Worker concurrency: {worker.concurrency}") +``` + +## Advanced Retry Patterns + +### Exponential Backoff + +For services that might be overloaded, exponential backoff gives them time to recover: + +```python +from docket import ExponentialRetry + +async def call_external_api( + url: str, + retry: ExponentialRetry = ExponentialRetry( + attempts=5, + minimum_delay=timedelta(seconds=1), + maximum_delay=timedelta(minutes=5) + ) +) -> None: + # Retries with delays: 1s, 2s, 4s, 8s, 16s (but capped at 5 minutes) + try: + response = await http_client.get(url) + response.raise_for_status() + print(f"API call succeeded on attempt {retry.attempt}") + except Exception as e: + print(f"Attempt {retry.attempt} failed: {e}") + raise +``` + +### Unlimited Retries + +For critical tasks that must eventually succeed, use `attempts=None`: + +```python +from docket import Retry + +async def critical_data_sync( + source_url: str, + retry: Retry = Retry(attempts=None, delay=timedelta(minutes=5)) +) -> None: + # This will retry forever with 5-minute delays until it succeeds + await sync_critical_data(source_url) + print(f"Critical sync completed after {retry.attempt} attempts") +``` + +Both `Retry` and `ExponentialRetry` support unlimited retries this way. + +## Task Timeouts + +Prevent tasks from running too long with the `Timeout` dependency: + +```python +from docket import Timeout + +async def data_processing_task( + large_dataset: dict, + timeout: Timeout = Timeout(timedelta(minutes=10)) +) -> None: + # This task will be cancelled if it runs longer than 10 minutes + await process_dataset_phase_one(large_dataset) + + # Extend timeout if we need more time for phase two + timeout.extend(timedelta(minutes=5)) + await process_dataset_phase_two(large_dataset) +``` + +The `extend()` method can take a specific duration or default to the original timeout duration: + +```python +async def adaptive_timeout_task( + timeout: Timeout = Timeout(timedelta(minutes=2)) +) -> None: + await quick_check() + + # Extend by the base timeout (another 2 minutes) + timeout.extend() + await longer_operation() +``` + +Timeouts work alongside retries. If a task times out, it can be retried according to its retry policy. + +## Custom Dependencies + +Create your own dependencies using `Depends()` for reusable resources and patterns: + +```python +from contextlib import asynccontextmanager +from docket import Depends + +@asynccontextmanager +async def get_database_connection(): + """Simple dependency that returns a database connection.""" + conn = await database.connect() + try: + yield conn + finally: + await conn.close() + +@asynccontextmanager +async def get_redis_client(): + """Another dependency for Redis operations.""" + client = redis.Redis(host='localhost', port=6379) + try: + yield client + finally: + client.close() + +async def process_user_data( + user_id: int, + db=Depends(get_database_connection), + cache=Depends(get_redis_client) +) -> None: + # Both dependencies are automatically provided and cleaned up + user = await db.fetch_user(user_id) + await cache.set(f"user:{user_id}", user.to_json()) +``` + +### Nested Dependencies + +Dependencies can depend on other dependencies, and Docket resolves them in the correct order: + +```python +async def get_auth_service(db=Depends(get_database_connection)): + """A service that depends on the database connection.""" + return AuthService(db) + +async def get_user_service( + db=Depends(get_database_connection), + auth=Depends(get_auth_service) +): + """A service that depends on both database and auth service.""" + return UserService(db, auth) + +async def update_user_profile( + user_id: int, + profile_data: dict, + user_service=Depends(get_user_service) +) -> None: + # All dependencies are resolved automatically: + # db -> auth_service -> user_service -> this task + await user_service.update_profile(user_id, profile_data) +``` + +Dependencies are resolved once per task execution and cached, so if multiple parameters depend on the same resource, only one instance is created. + +### Context Manager Dependencies + +Dependencies can be async context managers for automatic resource cleanup: + +```python +from contextlib import asynccontextmanager + +@asynccontextmanager +async def get_file_lock(filename: str): + """A dependency that provides file locking.""" + lock = await acquire_file_lock(filename) + try: + yield lock + finally: + await release_file_lock(filename) + +async def process_shared_file( + filename: str, + file_lock=Depends(lambda: get_file_lock("shared.txt")) +) -> None: + # File is locked before task starts, unlocked after task completes + await process_file_safely(filename) +``` + +### Dependencies with Built-in Context + +Dependencies can access Docket's built-in context dependencies: + +```python +async def get_task_logger( + execution: Execution = CurrentExecution(), + worker: Worker = CurrentWorker() +) -> LoggerAdapter: + """Create a logger with task and worker context.""" + logger = logging.getLogger(f"worker.{worker.name}") + return LoggerAdapter(logger, { + 'task_key': execution.key, + 'worker_name': worker.name + }) + +async def important_task( + data: dict, + logger=Depends(get_task_logger) +) -> None: + logger.info("Starting important task") + await process_important_data(data) + logger.info("Important task completed") +``` + +## TaskArgument: Accessing Task Parameters + +Dependencies can access the task's input arguments using `TaskArgument`: + +```python +from docket import TaskArgument + +async def get_user_context(user_id: int = TaskArgument()) -> dict: + """Dependency that fetches user context based on task argument.""" + user = await fetch_user(user_id) + return { + 'user': user, + 'permissions': await fetch_user_permissions(user_id), + 'preferences': await fetch_user_preferences(user_id) + } + +async def send_personalized_email( + user_id: int, + message: str, + user_context=Depends(get_user_context) +) -> None: + # user_context is populated based on the user_id argument + email = personalize_email(message, user_context['preferences']) + await send_email(user_context['user'].email, email) +``` + +You can access arguments by name or make them optional: + +```python +async def get_optional_config( + config_name: str | None = TaskArgument("config", optional=True) +) -> dict: + """Get configuration if provided, otherwise use defaults.""" + if config_name: + return await load_config(config_name) + return DEFAULT_CONFIG + +async def flexible_task( + data: dict, + config: str | None = None, # Optional argument + resolved_config=Depends(get_optional_config) +) -> None: + # resolved_config will be loaded config or defaults + await process_data(data, resolved_config) +``` + +## Dependency Error Handling + +When dependencies fail, the entire task fails with detailed error information: + +```python +async def unreliable_dependency(): + if random.random() < 0.5: + raise ValueError("Service unavailable") + return "success" + +async def dependent_task( + value=Depends(unreliable_dependency) +) -> None: + print(f"Got value: {value}") +``` + +If `unreliable_dependency` fails, the task won't execute and the error will be logged with context about which dependency failed. This prevents tasks from running with incomplete or invalid dependencies. + +## Dependency Guidelines + +### Design for Reusability + +Create dependencies that can be used across multiple tasks: + +```python +# Good: Reusable across many tasks +async def get_api_client(): + return APIClient(api_key=os.getenv("API_KEY")) + +# Less ideal: Too specific to one task +async def get_user_api_client_for_profile_updates(): + return APIClient(api_key=os.getenv("API_KEY"), timeout=30) +``` + +### Keep Dependencies Focused + +Each dependency should have a single responsibility: + +```python +# Good: Focused dependencies +async def get_database(): + return await database.connect() + +async def get_cache(): + return redis.Redis() + +# Less ideal: Too many responsibilities +async def get_all_services(): + return { + 'db': await database.connect(), + 'cache': redis.Redis(), + 'api': APIClient(), + 'metrics': MetricsClient() + } +``` + +### Handle Resource Cleanup + +Always use context managers or try/finally for resource cleanup: + +```python +# Good: Automatic cleanup +async def get_database(): + conn = await database.connect() + try: + yield conn + finally: + await conn.close() + +# Risky: Manual cleanup required +async def get_database_no_cleanup(): + return await database.connect() # Who closes this? +``` + +The dependency injection system supports flexible task design while maintaining clear separation of concerns. Dependencies can be simple values, complex services, or entire subsystems that your tasks need to operate effectively. diff --git a/docs/getting-started.md b/docs/getting-started.md index a08e2247..43fdf742 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -1,4 +1,6 @@ -## Installing `docket` +# Getting Started + +## Installation Docket is [available on PyPI](https://pypi.org/project/pydocket/) under the package name `pydocket`. It targets Python 3.12 or above. @@ -19,268 +21,178 @@ With `pip`: pip install pydocket ``` -## Creating a `Docket` +You'll also need a [Redis](http://redis.io/) server with Streams support (Redis 5.0+). Docket is tested with Redis 6 and 7, and also works with [Valkey](https://valkey.io/). + +## Your First Docket Each `Docket` should have a name that will be shared across your system, like the name -of a topic or queue. By default this is `"docket"`. You can support many separate +of a topic or queue. By default this is `"docket"`. You can run multiple separate dockets on a single Redis server as long as they have different names. -Docket accepts a URL to connect to the Redis server (defaulting to the local -server), and you can pass any additional connection configuration you need on that -connection URL. - ```python -async with Docket(name="orders", url="redis://my-redis:6379/0") as docket: - ... +from datetime import datetime, timedelta, timezone +from docket import Docket + +async def send_welcome_email(customer_id: int, name: str) -> None: + print(f"Welcome, {name}! (customer {customer_id})") + +async with Docket(name="emails", url="redis://localhost:6379/0") as docket: + # Schedule immediate work + await docket.add(send_welcome_email)(12345, "Alice") + + # Schedule future work + tomorrow = datetime.now(timezone.utc) + timedelta(days=1) + await docket.add(send_welcome_email, when=tomorrow)(67890, "Bob") ``` The `name` and `url` together represent a single shared docket of work across all your -system. +system. Different services can schedule work on the same docket as long as they use the same connection details. -## Scheduling work +## Understanding Task Keys -A `Docket` is the entrypoint to scheduling immediate and future work. You define work -in the form of `async` functions that return `None`. These task functions can accept -any parameter types, so long as they can be serialized with -[`cloudpickle`](https://github.com/cloudpipe/cloudpickle). +Every task gets a unique identifier called a `key`. By default, Docket generates random UUIDs for these keys, which works well for most use cases since each task execution is truly independent work. ```python -def now() -> datetime: - return datetime.now(timezone.utc) - -async def send_welcome_email(customer_id: int, name: str) -> None: - ... - -async def send_followup_email(customer_id: int, name: str) -> None: - ... +async def send_notification(user_id: int, message: str) -> None: + print(f"Sending to user {user_id}: {message}") async with Docket() as docket: - await docket.add(send_welcome_email)(12345, "Jane Smith") - - tomorrow = now() + timedelta(days=1) - await docket.add(send_followup_email, when=tomorrow)(12345, "Jane Smith") + # Each of these gets a random key and will execute independently + await docket.add(send_notification)(123, "Welcome!") + await docket.add(send_notification)(456, "Your order shipped") + await docket.add(send_notification)(123, "Thank you for your purchase") ``` -`docket.add` schedules both immediate work (the default) or future work (with the -`when: datetime` parameter). +Sometimes though, you want to control whether multiple tasks represent the same logical work. For example, if a user clicks "process my order" multiple times, you probably only want to process that order once. -All task executions are identified with a `key` that captures the unique essence of that -piece of work. By default they are randomly assigned UUIDs, but assigning your own keys -unlocks many powerful capabilities. +Custom keys make scheduling idempotent. There can only ever be one future task scheduled with a given key: ```python -async with Docket() as docket: - await docket.add(send_welcome_email)(12345, "Jane Smith") +async def process_order(order_id: int) -> None: + print(f"Processing order {order_id}") - tomorrow = now() + timedelta(days=1) - key = "welcome-email-for-12345" - await docket.add(send_followup_email, when=tomorrow, key=key)(12345, "Jane Smith") -``` - -If you've given your future work a `key`, then only one unique instance of that -execution will exist in the future: - -```python -key = "welcome-email-for-12345" -await docket.add(send_followup_email, when=tomorrow, key=key)(12345, "Jane Smith") +async with Docket() as docket: + key = f"process-order-{12345}" + await docket.add(process_order, key=key)(12345) + await docket.add(process_order, key=key)(12345) # Ignored - key already exists ``` -Calling `.add` a second time with the same key won't do anything, so luckily your -customer won't get two emails! - -However, at any time later you can replace that task execution to alter _when_ it will -happen: +This is especially valuable for web APIs where client retries or network issues might cause the same request to arrive multiple times: ```python -key = "welcome-email-for-12345" -next_week = now() + timedelta(days=7) -await docket.replace(send_followup_email, when=next_week, key=key)(12345, "Jane Smith") +@app.post("/orders/{order_id}/process") +async def api_process_order(order_id: int): + # Even if this endpoint gets called multiple times, only one task is scheduled + key = f"process-order-{order_id}" + await docket.add(process_order, key=key)(order_id) + return {"status": "scheduled"} ``` -_what arguments_ will be passed: +Custom keys also let you manage scheduled work. You can replace future tasks to change their timing or arguments, or cancel them entirely: ```python -key = "welcome-email-for-12345" -await docket.replace(send_followup_email, when=tomorrow, key=key)(12345, "Jane Q. Smith") -``` - -Or just cancel it outright: +key = f"reminder-{customer_id}" -```python -await docket.cancel("welcome-email-for-12345") -``` +# Schedule a reminder for next week +next_week = datetime.now(timezone.utc) + timedelta(days=7) +await docket.add(send_reminder, when=next_week, key=key)( + customer_id, "Your trial expires soon" +) -Tasks may also be called by name, in cases where you can't or don't want to import the -module that has your tasks. This may be common in a distributed environment where the -code of your task system just isn't available, or it requires heavyweight libraries that -you wouldn't want to import into your web server. In this case, you will lose the -type-checking for `.add` and `.replace` calls, but otherwise everything will work as -it does with the actual function: +# Customer upgrades - move reminder to next month instead +next_month = datetime.now(timezone.utc) + timedelta(days=30) +await docket.replace(send_reminder, when=next_month, key=key)( + customer_id, "Thanks for upgrading!" +) -```python -await docket.add("send_followup_email", when=tomorrow)(12345, "Jane Smith") +# Customer cancels - remove reminder entirely +await docket.cancel(key) ``` -These primitives of `.add`, `.replace`, and `.cancel` are sufficient to build a -large-scale and robust system of background tasks for your application. - -## Writing tasks +Note that canceling only works for tasks scheduled in the future. Tasks that are ready for immediate execution cannot be canceled once they've been added to the processing queue. -Tasks are any `async` function that takes `cloudpickle`-able parameters, and returns -`None`. Returning `None` is a strong signal that these are _fire-and-forget_ tasks -whose results aren't used or waited-on by your application. These are the only kinds of -tasks that Docket supports. +## Running Tasks: Workers -Docket uses a parameter-based dependency and configuration pattern, which has become -common in frameworks like [FastAPI](https://fastapi.tiangolo.com/), -[Typer](https://typer.tiangolo.com/), or [FastMCP](https://github.com/jlowin/fastmcp). -As such, there is no decorator for tasks. - -A very common requirement for tasks is that they have access to schedule further work -on their own docket, especially for chains of self-perpetuating tasks to implement -distributed polling and other periodic systems. One of the first dependencies you may -look for is the `CurrentDocket`: +Tasks don't execute automatically - you need workers to process them. A worker connects to the same docket and continuously pulls tasks from the queue. ```python -from docket import Docket, CurrentDocket - -POLLING_INTERVAL = timedelta(seconds=10) +from docket import Docket, Worker -async def poll_for_changes(file: Path, docket: Docket = CurrentDocket()) -> None: - if file.exists(): - ...do something interesting... - return - else: - await docket.add(poll_for_changes, when=now() + POLLING_INTERVAL)(file) -``` +async def process_order(order_id: int) -> None: + print(f"Processing order {order_id}") -Here the argument to `docket` is an instance of `Docket` with the same name and URL as -the worker it's running on. You can ask for the `CurrentWorker` and `CurrentExecution` -as well. Many times it could be useful to have your own task `key` available in order -to idempotently schedule future work: +async def send_notification(message: str) -> None: + print(f"Notification: {message}") -```python -from docket import Docket, CurrentDocket, TaskKey +async with Docket() as docket: + # Register tasks so workers know about them + docket.register(process_order) + docket.register(send_notification) -async def poll_for_changes( - file: Path, - key: str = TaskKey(), - docket: Docket = CurrentDocket() -) -> None: - if file.exists(): - ...do something interesting... - return - else: - await docket.add(poll_for_changes, when=now() + POLLING_INTERVAL, key=key)(file) + async with Worker(docket) as worker: + await worker.run_forever() # Process tasks until interrupted ``` -This helps to ensure that there is one continuous "chain" of these future tasks, as they -all use the same key. - -Configuring the retry behavior for a task is also done with a dependency: +For production deployments, you'll typically run workers via the CLI: -```python -from datetime import timedelta -from docket import Retry +```bash +# In tasks.py +async def process_order(order_id: int) -> None: + print(f"Processing order {order_id}") -async def faily(retry: Retry = Retry(attempts=5, delay=timedelta(seconds=3))): - if retry.attempt == 4: - print("whew!") - return +async def send_notification(message: str) -> None: + print(f"Notification: {message}") - raise ValueError("whoops!") +tasks = [process_order, send_notification] ``` -In this case, the task `faily` will run 4 times with a delay of 3 seconds between each -attempt. If it were to get to 5 attempts, no more would be attempted. This is a -linear retry, and an `ExponentialRetry` is also available: - -```python -from datetime import timedelta -from docket import Retry, ExponentialRetry - - -async def faily( - retry: Retry = Retry( - attempts=5, - minimum_delay=timedelta(seconds=2), - maximum_delay=timedelta(seconds=32), - ), -): - if retry.attempt == 4: - print("whew!") - return - - raise ValueError("whoops!") +```bash +docket worker --tasks tasks:tasks --concurrency 5 ``` -This would retry in 2, 4, 8, then 16 seconds before that fourth attempt succeeded. +Workers automatically handle concurrency (processing multiple tasks simultaneously), retries on failure, and graceful shutdown. By default, a worker processes up to 10 tasks concurrently. -## Running workers +## Basic Error Handling -You can run as many workers as you like to process the tasks on your docket. You can -either run a worker programmatically in Python, or via the CLI. Clients using docket -have the advantage that they are usually passing the task functions, but workers don't -necessarily know which tasks they are supposed to run. Docket solves this by allowing -you to explicitly register tasks. +By default, if a task fails (raises an exception), Docket will log the error and mark the task as failed in its OpenTelemetry traces. The task won't be retried and the worker will move on to the next task. -In `my_tasks.py`: +For tasks that might fail due to transient issues, you can configure automatic retries: ```python -async def my_first_task(): - ... +from docket import Retry -async def my_second_task(): - ... +async def flaky_api_call( + url: str, + retry: Retry = Retry(attempts=3, delay=timedelta(seconds=5)) +) -> None: + # This will retry up to 3 times with 5 seconds between each attempt + response = await http_client.get(url) + if response.status_code != 200: + raise Exception(f"API returned {response.status_code}") -my_task_collection = [ - my_first_task, - my_second_task, -] + print(f"Success on attempt {retry.attempt}") ``` -From Python: - -```python -from my_tasks import my_task_collection - -async with Docket() as docket: - for task in my_task_collection: - docket.register(task) +Tasks use a dependency injection pattern similar to FastAPI. The `Retry` dependency tells Docket how to handle failures for that specific task. - async with Worker(docket) as worker: - await worker.run_forever() -``` +## Worker Configuration -From the CLI: +Workers handle task delivery guarantees and fault tolerance. By default, workers process up to 10 tasks simultaneously, but you can adjust this with the `concurrency=` parameter or `--concurrency` CLI option. If a worker crashes, its tasks are redelivered to other workers after `redelivery_timeout` expires - you'll want to set this higher than your longest-running task. -```bash -docket worker --tasks my_tasks:my_task_collection -``` +Docket provides at-least-once delivery semantics, meaning tasks may be delivered more than once if workers crash, so design your tasks to be idempotent when possible. -By default, workers will process up to 10 tasks concurrently, but you can adjust this -to your needs with the `concurrency=` keyword argument or the `--concurrency` CLI -option. +## What's Next? -When a worker crashes ungracefully, any tasks it was currently executing will be held -for a period of time before being redelivered to other workers. You can control this -time period with `redelivery_timeout=` or `--redelivery-timeout`. You'd want to set -this to a value higher than the longest task you expect to run. For queues of very fast -tasks, a few seconds may be ideal; for long data-processing steps involving large -amount of data, you may need minutes. +You now know the core concepts: creating dockets, scheduling work with idempotent keys, running workers, and basic error handling. This gives you what you need to build background task systems for most applications. -## Delivery guarantees +Ready for more? Check out: -Docket provides _at-least-once_ delivery semantics. When a worker picks up a -task, if it crashes or fails to acknowledge within `redelivery_timeout`, the -task will be considered unacknowledged and redelivered to another available -worker. This ensures tasks are not lost but may be delivered more than once. To -achieve exactly-once processing, design your tasks to be idempotent. +- **[Dependencies Guide](dependencies.md)** - Access current docket, advanced retry patterns, timeouts, and custom dependencies +- **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests +- **[Advanced Task Patterns](advanced-patterns.md)** - Perpetual tasks, striking/restoring, logging, and task chains +- **[Docket in Production](production.md)** - Redis architecture, monitoring, and deployment strategies +- **[API Reference](api-reference.md)** - Complete documentation of all classes and methods -## Serialization and cloudpickle usage +## A Note on Security -Docket uses `cloudpickle` to serialize task functions and their arguments. This -allows you to pass nearly any Python object as arguments to a task, but it also -means that deserializing these arguments can execute arbitrary code. Avoid -scheduling tasks from untrusted or unauthenticated sources to mitigate security -risks. +Docket uses `cloudpickle` to serialize task functions and their arguments. This allows passing nearly any Python object as task arguments, but also means deserializing arguments can execute arbitrary code. Only schedule tasks from trusted sources in your system. diff --git a/docs/production.md b/docs/production.md new file mode 100644 index 00000000..072aac95 --- /dev/null +++ b/docs/production.md @@ -0,0 +1,407 @@ +# Docket in Production + +Running Docket at scale requires understanding its Redis-based architecture, configuring workers appropriately, and monitoring system health. This guide covers everything you need for reliable production deployments. + +## Redis Streams Architecture + +Docket uses Redis streams and sorted sets to provide reliable task delivery with at-least-once semantics. Note that Docket requires a single Redis instance and does not support Redis Cluster. + +### Task Lifecycle + +Understanding how tasks flow through the system helps with monitoring and troubleshooting: + +1. **Immediate tasks** go directly to the Redis stream and are available to any worker in the consumer group +2. **Future tasks** are stored in the sorted set with their execution time as the score +3. **Workers continuously move** due tasks from the sorted set to the stream +4. **Consumer groups** ensure each task is delivered to exactly one worker +5. **Acknowledgment** removes completed tasks; unacknowledged tasks are redelivered + +### Redelivery Behavior + +When a worker crashes or fails to acknowledge a task within `redelivery_timeout`, Redis automatically makes the task available to other workers. This ensures reliability but means tasks may execute more than once. + +```python +# Configure redelivery timeout based on your longest-running tasks +async with Worker( + docket, + redelivery_timeout=timedelta(minutes=10) # Adjust for your workload +) as worker: + await worker.run_forever() +``` + +Set redelivery timeout to be longer than your 99th percentile task duration to minimize duplicate executions. + +### Redis Data Structures + +Docket creates several Redis data structures for each docket: + +- **Stream (`{docket}:stream`)**: Ready-to-execute tasks using Redis consumer groups +- **Sorted Set (`{docket}:queue`)**: Future tasks ordered by scheduled execution time +- **Hashes (`{docket}:{key}`)**: Serialized task data for scheduled tasks +- **Set (`{docket}:workers`)**: Active worker heartbeats with timestamps +- **Set (`{docket}:worker-tasks:{worker}`)**: Tasks each worker can execute +- **Stream (`{docket}:strikes`)**: Strike/restore commands for operational control + +## Worker Configuration + +### Core Settings + +Workers have several configuration knobs for different environments: + +```python +async with Worker( + docket, + name="worker-1", # Unique worker identifier + concurrency=20, # Parallel task limit + redelivery_timeout=timedelta(minutes=5), # When to redeliver tasks + reconnection_delay=timedelta(seconds=5), # Redis reconnection backoff + minimum_check_interval=timedelta(milliseconds=100), # Polling frequency + scheduling_resolution=timedelta(milliseconds=250), # Future task check frequency + schedule_automatic_tasks=True # Enable perpetual task startup +) as worker: + await worker.run_forever() +``` + +### Environment Variable Configuration + +All settings can be configured via environment variables for production deployments: + +```bash +# Core docket settings +export DOCKET_NAME=orders +export DOCKET_URL=redis://redis.production.com:6379/0 + +# Worker settings +export DOCKET_WORKER_NAME=orders-worker-1 +export DOCKET_WORKER_CONCURRENCY=50 +export DOCKET_WORKER_REDELIVERY_TIMEOUT=10m +export DOCKET_WORKER_RECONNECTION_DELAY=5s +export DOCKET_WORKER_MINIMUM_CHECK_INTERVAL=100ms +export DOCKET_WORKER_SCHEDULING_RESOLUTION=250ms + +# Monitoring +export DOCKET_WORKER_HEALTHCHECK_PORT=8080 +export DOCKET_WORKER_METRICS_PORT=9090 + +# Logging +export DOCKET_LOGGING_LEVEL=INFO +export DOCKET_LOGGING_FORMAT=json + +# Task modules +export DOCKET_TASKS=myapp.tasks:production_tasks +``` + +### CLI Usage + +Run workers in production using the CLI: + +```bash +# Basic worker +docket worker --tasks myapp.tasks:all_tasks + +# Production worker with full configuration +docket worker \ + --docket orders \ + --url redis://redis.prod.com:6379/0 \ + --name orders-worker-1 \ + --concurrency 50 \ + --redelivery-timeout 10m \ + --healthcheck-port 8080 \ + --metrics-port 9090 \ + --logging-format json \ + --tasks myapp.tasks:production_tasks +``` + +### Tuning for Different Workloads + +**High-throughput, fast tasks:** + +```bash +docket worker \ + --concurrency 100 \ + --redelivery-timeout 30s \ + --minimum-check-interval 50ms \ + --scheduling-resolution 100ms +``` + +**Long-running, resource-intensive tasks:** + +```bash +docket worker \ + --concurrency 5 \ + --redelivery-timeout 1h \ + --minimum-check-interval 1s \ + --scheduling-resolution 5s +``` + +**Mixed workload with perpetual tasks:** + +```bash +docket worker \ + --concurrency 25 \ + --redelivery-timeout 5m \ + --schedule-automatic-tasks \ + --tasks myapp.tasks:all_tasks,myapp.monitoring:health_checks +``` + +## Connection Management + +### Redis Connection Pools + +Docket automatically manages Redis connection pools, but you can tune them for your environment: + +```python +from redis.asyncio import ConnectionPool + +# Custom connection pool for high-concurrency workers +pool = ConnectionPool.from_url( + "redis://redis.prod.com:6379/0", + max_connections=50, # Match or exceed worker concurrency + retry_on_timeout=True +) + +async with Docket(name="orders", connection_pool=pool) as docket: + # Use the custom pool + pass +``` + +### Redis Requirements + +Docket requires a single Redis instance and does not currently support Redis Cluster. For high availability, consider: + +- **Managed Redis services** like AWS ElastiCache, Google Cloud Memorystore, or Redis Cloud +- **Redis replicas** with manual failover procedures + +```python +# With authentication +docket_url = "redis://:password@redis.prod.com:6379/0" +``` + +### Valkey Support + +Docket also works with Valkey (Redis fork): + +```bash +export DOCKET_URL=valkey://valkey.prod.com:6379/0 +``` + +## Monitoring and Observability + +### Prometheus Metrics + +Enable Prometheus metrics with the `--metrics-port` option: + +```bash +docket worker --metrics-port 9090 +``` + +Available metrics include: + +#### Task Counters + +- `docket_tasks_added` - Tasks scheduled +- `docket_tasks_started` - Tasks begun execution +- `docket_tasks_succeeded` - Successfully completed tasks +- `docket_tasks_failed` - Failed tasks +- `docket_tasks_retried` - Retry attempts +- `docket_tasks_stricken` - Tasks blocked by strikes + +#### Task Timing + +- `docket_task_duration` - Histogram of task execution times +- `docket_task_punctuality` - How close tasks run to their scheduled time + +#### System Health + +- `docket_queue_depth` - Tasks ready for immediate execution +- `docket_schedule_depth` - Tasks scheduled for future execution +- `docket_tasks_running` - Currently executing tasks +- `docket_redis_disruptions` - Redis connection failures +- `docket_strikes_in_effect` - Active strike rules + +All metrics include labels for docket name, worker name, and task function name. + +### Health Checks + +Enable health check endpoints: + +```bash +docket worker --healthcheck-port 8080 +``` + +The health check endpoint (`/`) returns 200 OK when the worker is healthy and able to process tasks. + +### OpenTelemetry Traces + +Docket automatically creates OpenTelemetry spans for task execution: + +- **Span name**: `docket.task.{function_name}` +- **Attributes**: docket name, worker name, task key, attempt number +- **Status**: Success/failure with error details +- **Duration**: Complete task execution time + +Configure your OpenTelemetry exporter to send traces to your observability platform. See the [OpenTelemetry Python documentation](https://opentelemetry.io/docs/languages/python/) for configuration examples with various backends like Jaeger, Zipkin, or cloud providers. + +### Structured Logging + +Configure structured logging for production: + +```bash +# JSON logs for log aggregation +docket worker --logging-format json --logging-level info + +# Plain logs for simple deployments +docket worker --logging-format plain --logging-level warning +``` + +Log entries include: + +- Task execution start/completion +- Error details with stack traces +- Worker lifecycle events +- Redis connection status +- Strike/restore operations + +### Example Grafana Dashboard + +Monitor Docket health with queries like: + +```promql +# Task throughput +rate(docket_tasks_completed[5m]) + +# Error rate +rate(docket_tasks_failed[5m]) / rate(docket_tasks_started[5m]) + +# Queue depth trending +docket_queue_depth + +# P95 task duration +histogram_quantile(0.95, rate(docket_task_duration_bucket[5m])) + +# Worker availability +up{job="docket-workers"} +``` + +## Production Guidelines + +### Capacity Planning + +**Estimate concurrent tasks:** + +``` +concurrent_tasks = avg_task_duration * tasks_per_second +worker_concurrency = concurrent_tasks * 1.2 # 20% buffer +``` + +**Size worker pools:** + +- Start with 1-2 workers per CPU core +- Monitor CPU and memory usage +- Scale horizontally rather than increasing concurrency indefinitely + +### Deployment Strategies + +**Blue-green deployments:** + +```bash +# Deploy new workers with different name +docket worker --name orders-worker-v2 --tasks myapp.tasks:v2_tasks + +# Gradually strike old task versions +docket strike old_task_function + +# Scale down old workers after tasks drain +``` + +### Error Handling + +**Configure appropriate retries:** + +```python +# Transient failures - short delays +async def api_call( + retry: Retry = Retry(attempts=3, delay=timedelta(seconds=5)) +): ... + +# Infrastructure issues - exponential backoff +async def database_sync( + retry: ExponentialRetry = ExponentialRetry( + attempts=5, + minimum_delay=timedelta(seconds=30), + maximum_delay=timedelta(minutes=10) + ) +): ... + +# Critical operations - unlimited retries +async def financial_transaction( + retry: Retry = Retry(attempts=None, delay=timedelta(minutes=1)) +): ... +``` + +**Dead letter handling:** + +```python +async def process_order(order_id: str) -> None: + try: + await handle_order(order_id) + except CriticalError as e: + # Send to dead letter queue for manual investigation + await send_to_dead_letter_queue(order_id, str(e)) + raise +``` + +### Operational Procedures + +**Graceful shutdown:** + +```bash +# Workers handle SIGTERM gracefully +kill -TERM $WORKER_PID + +# Or use container orchestration stop signals +docker stop docket-worker +``` + +**Emergency task blocking:** + +```bash +# Block problematic tasks immediately +docket strike problematic_function + +# Block tasks for specific customers +docket strike process_order customer_id == "problematic-customer" + +# Restore when issues are resolved +docket restore problematic_function +``` + +**Monitoring checklist:** + +- Queue depth alerts (tasks backing up) +- Error rate alerts (> 5% failure rate) +- Task duration alerts (P95 > expected) +- Worker availability alerts +- Redis connection health + +### Scaling Considerations + +**Horizontal scaling:** + +- Add workers across multiple machines +- Use consistent worker naming for monitoring +- Monitor Redis memory usage as task volume grows + +**Vertical scaling:** + +- Increase worker concurrency for I/O bound tasks +- Increase memory limits for large task payloads +- Monitor CPU usage to avoid oversubscription + +**Redis scaling:** + +- Use managed Redis services for high availability (Redis Cluster is not supported) +- Monitor memory usage and eviction policies +- Scale vertically for larger workloads + +Running Docket in production requires attention to these operational details, but the Redis-based architecture and monitoring support can help with demanding production workloads. diff --git a/docs/testing.md b/docs/testing.md new file mode 100644 index 00000000..5b4b1859 --- /dev/null +++ b/docs/testing.md @@ -0,0 +1,356 @@ +# Testing with Docket + +Docket includes the utilities you need to test all your background task systems in realistic ways. The ergonomic design supports testing complex workflows with minimal setup. + +## Testing Tasks as Simple Functions + +Often you can test your tasks without running a worker at all! Docket tasks are just Python functions, so you can call them directly and pass test values for dependency parameters: + +```python +from docket import Docket, CurrentDocket, Retry +from unittest.mock import AsyncMock + +async def process_order( + order_id: int, + docket: Docket = CurrentDocket(), + retry: Retry = Retry(attempts=3) +) -> None: + # Your task logic here + order = await fetch_order(order_id) + await charge_payment(order) + await docket.add(send_confirmation)(order_id) + +async def test_process_order_logic() -> None: + """Test the task logic without running a worker.""" + mock_docket = AsyncMock() + + # Call the task directly with test parameters + await process_order( + order_id=123, + docket=mock_docket, + retry=Retry(attempts=1) + ) + + # Verify the task scheduled follow-up work + mock_docket.add.assert_called_once() +``` + +This approach is great for testing business logic quickly without the overhead of setting up dockets and workers. + +## Testing with Pytest Fixtures + +The most powerful way to test with Docket is using pytest fixtures to set up your docket and worker. This approach, used throughout Docket's own test suite, provides clean isolation and reusable test infrastructure. + +### Basic Fixture Setup + +Create fixtures for your test docket and worker: + +```python +import pytest +from datetime import datetime, timedelta +from typing import AsyncGenerator, Callable +from uuid import uuid4 +from unittest.mock import AsyncMock +from docket import Docket, Worker + +@pytest.fixture +async def test_docket() -> AsyncGenerator[Docket, None]: + """Create a test docket with a unique name for each test.""" + async with Docket( + name=f"test-{uuid4()}", + url="redis://localhost:6379/0" + ) as docket: + yield docket + +@pytest.fixture +async def test_worker(test_docket: Docket) -> AsyncGenerator[Worker, None]: + """Create a test worker with fast polling for quick tests.""" + async with Worker( + test_docket, + minimum_check_interval=timedelta(milliseconds=5), + scheduling_resolution=timedelta(milliseconds=5) + ) as worker: + yield worker +``` + +### Using Fixtures in Tests + +With these fixtures, your tests become much cleaner: + +```python +async def send_notification(user_id: int, message: str) -> None: + """Example task for testing.""" + print(f"Sending '{message}' to user {user_id}") + +async def test_task_execution(test_docket: Docket, test_worker: Worker) -> None: + """Test that tasks execute with correct arguments.""" + test_docket.register(send_notification) + await test_docket.add(send_notification)(123, "Welcome!") + + await test_worker.run_until_finished() + + # Verify by checking side effects or using test doubles + +async def test_idempotent_scheduling(test_docket: Docket, test_worker: Worker) -> None: + """Test that tasks with same key don't duplicate.""" + test_docket.register(send_notification) + key = "unique-notification" + + # Schedule same task multiple times with same key + await test_docket.add(send_notification, key=key)(123, "message1") + await test_docket.add(send_notification, key=key)(123, "message2") # Should replace + await test_docket.add(send_notification, key=key)(123, "message3") # Should replace + + # Verify only one task is scheduled + snapshot = await test_docket.snapshot() + assert len([t for t in snapshot.future if t.key == key]) == 1 +``` + +## Running Until Finished + +For tests and batch processing, use [`run_until_finished()`](api-reference.md#docket.Worker.run_until_finished) to process all pending tasks then stop: + +```python +async def test_order_processing(test_docket: Docket, test_worker: Worker) -> None: + """Test order processing workflow.""" + test_docket.register(process_order) + test_docket.register(send_confirmation) + test_docket.register(update_inventory) + + # Schedule some work + await test_docket.add(process_order)(order_id=123) + await test_docket.add(send_confirmation)(order_id=123) + await test_docket.add(update_inventory)(product_id=456) + + # Process all pending tasks + await test_worker.run_until_finished() + + # Now verify results + assert order_is_processed(123) + assert confirmation_was_sent(123) + assert inventory_was_updated(456) +``` + +This works well for testing workflows where you need to ensure all tasks complete before making assertions. + +### Testing Task Registration + +Test that tasks are properly registered and can be called by name: + +```python +async def test_task_registration_by_name(test_docket: Docket, test_worker: Worker) -> None: + """Test executing tasks by string name.""" + async def example_task(data: str) -> None: + print(f"Processing: {data}") + + test_docket.register(example_task) + + # Execute by name instead of function reference + await test_docket.add("example_task")("test data") + + await test_worker.run_until_finished() + + # Verify by checking side effects or logs +``` + +## Controlling Perpetual Tasks + +Use [`run_at_most()`](api-reference.md#docket.Worker.run_at_most) to limit how many times specific tasks run, which is essential for testing perpetual tasks: + +```python +async def test_perpetual_monitoring(test_docket: Docket, test_worker: Worker) -> None: + """Test perpetual task monitoring.""" + test_docket.register(health_check_service) + test_docket.register(process_data) + test_docket.register(send_reports) + + # This would normally run forever + await test_docket.add(health_check_service)("https://api.example.com") + + # Also schedule some regular tasks + await test_docket.add(process_data)(dataset="test") + await test_docket.add(send_reports)() + + # Let health check run 3 times, everything else runs to completion + await test_worker.run_at_most({"health_check_service": 3}) + + # Verify the health check ran the expected number of times + assert health_check_call_count == 3 +``` + +The [`run_at_most()`](api-reference.md#docket.Worker.run_at_most) method takes a dictionary mapping task names to maximum execution counts. Tasks not in the dictionary run to completion as normal. + +## Testing Self-Perpetuating Chains + +For tasks that create chains of future work, you can control the chain length: + +```python +async def test_batch_processing_chain(test_docket: Docket, test_worker: Worker) -> None: + """Test batch processing chains.""" + test_docket.register(process_batch) + + # This creates a chain: batch 1 -> batch 2 -> batch 3 + await test_docket.add(process_batch, key="batch-job")(batch_id=1, total_batches=3) + + # Let this specific key run 3 times (for 3 batches) + await test_worker.run_at_most({"batch-job": 3}) + + # Verify all batches were processed + assert all_batches_processed([1, 2, 3]) +``` + +You can use task keys in [`run_at_most()`](api-reference.md#docket.Worker.run_at_most) to control specific task instances rather than all tasks of a given type. + +## Testing Task Scheduling + +Test that tasks are scheduled correctly without running them: + +```python +from datetime import datetime, timedelta, timezone + +async def test_scheduling_logic(test_docket: Docket) -> None: + """Test task scheduling without execution.""" + test_docket.register(send_reminder) + + # Schedule some tasks + future_time = datetime.now(timezone.utc) + timedelta(hours=1) + await test_docket.add(send_reminder, when=future_time, key="reminder-123")( + customer_id=123, + message="Your subscription expires soon" + ) + + # Check that task was scheduled (but not executed) + snapshot = await test_docket.snapshot() + + assert len(snapshot.future) == 1 + assert len(snapshot.running) == 0 + assert snapshot.future[0].key == "reminder-123" + assert snapshot.future[0].function.__name__ == "send_reminder" +``` + +## Integration Testing with Real Redis + +For integration tests, use a real Redis instance but with a test-specific docket name: + +```python +import pytest +from typing import AsyncGenerator +from uuid import uuid4 +from docket import Docket, Worker +from redis.asyncio import Redis + +@pytest.fixture +async def test_docket() -> AsyncGenerator[Docket, None]: + # Use a unique docket name for each test + test_name = f"test-{uuid4()}" + + async with Docket(name=test_name, url="redis://localhost:6379/1") as docket: + yield docket + + # Clean up after test + await docket.clear() + +async def test_full_workflow(test_docket: Docket) -> None: + test_docket.register(process_order) + test_docket.register(send_confirmation) + + await test_docket.add(process_order)(order_id=123) + + async with Worker(test_docket) as worker: + await worker.run_until_finished() + + # Verify against real external systems + assert order_exists_in_database(123) + assert email_was_sent_to_customer(123) +``` + +## Testing Guidelines + +### Use Descriptive Task Keys + +Use meaningful task keys in tests to make debugging easier: + +```python +from uuid import uuid4 + +# Good: Clear what this task represents +await test_docket.add(process_order, key=f"test-order-{order_id}")(order_id) + +# Less clear: Generic key doesn't help with debugging +await test_docket.add(process_order, key=f"task-{uuid4()}")(order_id) +``` + +### Test Error Scenarios + +Always test what happens when tasks fail: + +```python +from unittest import mock +async def test_order_processing_failure(test_docket: Docket, test_worker: Worker) -> None: + """Test error handling in order processing.""" + test_docket.register(process_order) + + # Simulate a failing external service + with mock.patch('external_service.process_payment', side_effect=PaymentError): + await test_docket.add(process_order)(order_id=123) + + await test_worker.run_until_finished() + + # Verify error handling + assert order_status(123) == "payment_failed" + assert error_notification_sent() +``` + +### Test Idempotency + +Verify that tasks with the same key don't create duplicate work: + +```python +async def test_idempotent_scheduling(test_docket: Docket) -> None: + """Test idempotent task scheduling.""" + test_docket.register(process_order) + key = "process-order-123" + + # Schedule the same task multiple times + await test_docket.add(process_order, key=key)(order_id=123) + await test_docket.add(process_order, key=key)(order_id=123) + await test_docket.add(process_order, key=key)(order_id=123) + + snapshot = await test_docket.snapshot() + + # Should only have one task scheduled + assert len(snapshot.future) == 1 + assert snapshot.future[0].key == key +``` + +### Test Timing-Sensitive Logic + +For tasks that depend on timing, use controlled time in tests: + +```python +from datetime import datetime, timedelta, timezone +from unittest import mock + +async def test_scheduled_task_timing(test_docket: Docket, test_worker: Worker) -> None: + """Test timing-sensitive task scheduling.""" + test_docket.register(send_reminder) + now = datetime.now(timezone.utc) + future_time = now + timedelta(seconds=10) + + await test_docket.add(send_reminder, when=future_time)(customer_id=123) + + # Task should not run immediately + await test_worker.run_until_finished() + + assert not reminder_was_sent(123) + + # Fast-forward time and test again + with mock.patch('docket.datetime') as mock_datetime: + mock_datetime.now.return_value = future_time + timedelta(seconds=1) + + await test_worker.run_until_finished() + + assert reminder_was_sent(123) +``` + +Docket's testing utilities make it straightforward to write comprehensive tests for even complex distributed task workflows. The key is using [`run_until_finished()`](api-reference.md#docket.Worker.run_until_finished) for deterministic execution and [`run_at_most()`](api-reference.md#docket.Worker.run_at_most) for controlling perpetual or self-scheduling tasks. diff --git a/mkdocs.yml b/mkdocs.yml index 15463f8e..41c970ec 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -57,4 +57,8 @@ markdown_extensions: nav: - Home: index.md - Getting Started: getting-started.md + - Dependencies Guide: dependencies.md + - Testing with Docket: testing.md + - Advanced Task Patterns: advanced-patterns.md + - Docket in Production: production.md - API Reference: api-reference.md