|
| 1 | +# Context Classes |
| 2 | + |
| 3 | +The Context class provides access to task execution metadata and enables efficient resource management through thread-persistent storage. Use it when you need to share resources across multiple task executions in the same worker thread. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +The `Context` class provides a dual-layer storage system: |
| 8 | + |
| 9 | +- **Worker Layer (`thread_storage`)**: A dictionary that persists across all tasks executed by the same worker thread. Use this for heavy resource pooling (e.g., database engines, HTTP clients) to avoid re-initialization overhead. |
| 10 | +- **Task Layer (`metadata`)**: Isolated metadata for each task execution via ContextVars. Provides read-only access to the current task's unique identity and execution state. |
| 11 | + |
| 12 | +## Basic Usage |
| 13 | + |
| 14 | +To use the Context feature, decorate your task function with `@fluxqueue.task_with_context()` instead of `@fluxqueue.task()`. The function must accept a context parameter as its first argument, typed as `Context` or a subclass of `Context`: |
| 15 | + |
| 16 | +```python |
| 17 | +from fluxqueue import FluxQueue, Context |
| 18 | + |
| 19 | +fluxqueue = FluxQueue() |
| 20 | + |
| 21 | +@fluxqueue.task_with_context() |
| 22 | +def process_data_task(ctx: Context, data: str): |
| 23 | + # Access task metadata |
| 24 | + print(f"Task ID: {ctx.metadata.task_id}") |
| 25 | + print(f"Retry count: {ctx.metadata.retries}") |
| 26 | + print(f"Max retries: {ctx.metadata.max_retries}") |
| 27 | + print(f"Enqueued at: {ctx.metadata.enqueued_at}") |
| 28 | + |
| 29 | + # Process the data |
| 30 | + process_data(data) |
| 31 | +``` |
| 32 | + |
| 33 | +When you enqueue the task, the context parameter is automatically injected by the worker and is not part of the function's public signature: |
| 34 | + |
| 35 | +```python |
| 36 | +# Just call with your regular arguments - no context needed! |
| 37 | +process_data_task("some data") |
| 38 | +``` |
| 39 | + |
| 40 | +## Task Metadata |
| 41 | + |
| 42 | +The `metadata` property provides read-only access to task execution information: |
| 43 | + |
| 44 | +- `task_id`: Unique identifier for the current task execution |
| 45 | +- `retries`: Number of times this task has been retried |
| 46 | +- `max_retries`: Maximum number of retry attempts allowed |
| 47 | +- `enqueued_at`: ISO 8601 timestamp of when the task was originally enqueued |
| 48 | + |
| 49 | +This metadata is isolated per task using Python's `ContextVar`, ensuring data integrity even when multiple tasks execute concurrently on the same thread. |
| 50 | + |
| 51 | +## Custom Context Classes |
| 52 | + |
| 53 | +You can subclass `Context` to create domain-specific contexts that provide convenient access to resources. This is especially useful for database connections, HTTP clients, or other resources that benefit from pooling: |
| 54 | + |
| 55 | +```python |
| 56 | +from contextlib import asynccontextmanager |
| 57 | +from fluxqueue import FluxQueue, Context |
| 58 | +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession |
| 59 | + |
| 60 | +class DbContext(Context): |
| 61 | + """ |
| 62 | + Custom context for managing database connections. |
| 63 | +
|
| 64 | + This context reuses database engines across tasks in the same worker thread, |
| 65 | + significantly reducing connection overhead. |
| 66 | + """ |
| 67 | + def __init__(self) -> None: |
| 68 | + super().__init__() |
| 69 | + |
| 70 | + def _get_local_session(self): |
| 71 | + """ |
| 72 | + Get or create a session maker for this worker thread. |
| 73 | + """ |
| 74 | + session = self.thread_storage.get("session") |
| 75 | + if session is None: |
| 76 | + engine = create_async_engine(DATABASE_URL) |
| 77 | + session = async_sessionmaker( |
| 78 | + bind=engine, expire_on_commit=False |
| 79 | + ) |
| 80 | + self.thread_storage["session"] = session |
| 81 | + return session |
| 82 | + |
| 83 | + @asynccontextmanager |
| 84 | + async def session_context(self): |
| 85 | + """ |
| 86 | + Context manager for database sessions with automatic commit/rollback. |
| 87 | + """ |
| 88 | + local_session = self._get_local_session() |
| 89 | + async with local_session() as session: |
| 90 | + try: |
| 91 | + yield session |
| 92 | + await session.commit() |
| 93 | + except Exception: |
| 94 | + await session.rollback() |
| 95 | + raise |
| 96 | + |
| 97 | +# Use the custom context |
| 98 | +@fluxqueue.task_with_context() |
| 99 | +async def create_user_task(ctx: DbContext, email: str, username: str): |
| 100 | + async with ctx.session_context() as db_session: |
| 101 | + user = User(email=email, username=username) |
| 102 | + db_session.add(user) |
| 103 | + # Session commits automatically on success, rolls back on exception |
| 104 | +``` |
| 105 | + |
| 106 | +## Benefits |
| 107 | + |
| 108 | +Using Context classes provides several key benefits: |
| 109 | + |
| 110 | +1. **Resource Efficiency**: Share expensive resources (like database engines) across multiple tasks in the same worker thread, reducing initialization overhead. |
| 111 | + |
| 112 | +2. **Task Isolation**: Each task gets its own isolated metadata via ContextVars, ensuring data integrity even with concurrent execution. |
| 113 | + |
| 114 | +3. **Type Safety**: Full type hint support means your IDE and type checkers understand the context structure. |
| 115 | + |
| 116 | +4. **Clean API**: The context parameter is automatically injected, so your task functions have a clean public interface without exposing implementation details. |
| 117 | + |
| 118 | +5. **Flexibility**: Subclass `Context` to create domain-specific contexts tailored to your application's needs. |
| 119 | + |
| 120 | +!!! tip "Avoiding Event Loop Issues in Multi-threaded Environments" |
| 121 | + |
| 122 | + FluxQueue's multi-threaded Tokio runtime can cause issues with async database libraries like `asyncpg` that throw errors such as "got future pending attached to a different loop" when resources are shared across threads. |
| 123 | + |
| 124 | + By using the Context class with `thread_storage`, each executor in the worker gets its own database engine instance. Combined with Python's context managers, you can safely acquire database sessions and run queries without event loop conflicts. This ensures that each worker thread maintains its own isolated database connection pool, preventing cross-thread resource sharing issues. |
| 125 | + |
| 126 | +## Async and Sync Support |
| 127 | + |
| 128 | +Context classes work seamlessly with both synchronous and asynchronous tasks: |
| 129 | + |
| 130 | +```python |
| 131 | +# Synchronous task with context |
| 132 | +@fluxqueue.task_with_context() |
| 133 | +def sync_task(ctx: Context, data: str): |
| 134 | + print(f"Processing {data} in task {ctx.metadata.task_id}") |
| 135 | + process(data) |
| 136 | + |
| 137 | +# Async task with context |
| 138 | +@fluxqueue.task_with_context() |
| 139 | +async def async_task(ctx: Context, data: str): |
| 140 | + print(f"Processing {data} in task {ctx.metadata.task_id}") |
| 141 | + await process_async(data) |
| 142 | +``` |
| 143 | + |
| 144 | +## Best Practices |
| 145 | + |
| 146 | +1. **Use thread_storage for expensive resources**: Database engines, HTTP clients, and connection pools are perfect candidates for thread storage. |
| 147 | + |
| 148 | +2. **Keep metadata read-only**: The metadata property is read-only for a reason - don't try to modify it. |
| 149 | + |
| 150 | +3. **Subclass for domain logic**: Create custom context classes when you have domain-specific resources or logic that multiple tasks share. |
| 151 | + |
| 152 | +4. **One context parameter per task**: Each task function should have exactly one context parameter, typed as `Context` or a subclass. |
| 153 | + |
| 154 | +5. **Initialize resources lazily**: Check if resources exist in `thread_storage` before creating them to avoid unnecessary initialization. |
0 commit comments