Skip to content

Commit d58f650

Browse files
authored
docs: Update docs for v0.3.0 (#131)
* Update README * Better examples * Update overview * Add docs and release notes * Update versions
1 parent 4f45329 commit d58f650

10 files changed

Lines changed: 318 additions & 23 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
## Overview
1919

20-
FluxQueue is a task queue for Python that gets out of your way. The Rust core makes the process fast with less overhead, least dependencies, and most importantly, less memory usage. Tasks are managed through Redis.
20+
FluxQueue is a task queue for Python that gets out of your way. Built on a multi-threaded Tokio runtime, FluxQueue delivers high throughput while maintaining low memory usage. The Rust core ensures minimal overhead and dependencies, making it an efficient solution for background task processing. Tasks are managed through Redis.
2121

2222
## Key Features
2323

@@ -29,6 +29,7 @@ FluxQueue is a task queue for Python that gets out of your way. The Rust core ma
2929
- **Multiple Queues**: Organize tasks across different queues
3030
- **Simple API**: Decorator-based interface that feels natural in Python
3131
- **Type Safe**: Full type hints support
32+
- **Context Classes**: Access task metadata and manage thread-persistent resources with the Context class
3233

3334
## Requirements
3435

@@ -51,15 +52,9 @@ from fluxqueue import FluxQueue
5152
fluxqueue = FluxQueue()
5253

5354
@fluxqueue.task()
54-
def send_email(to_email: str, subject: str, body: str):
55+
def send_email(to_email: str):
5556
with email_context() as email_client:
56-
message = EmailMessage()
57-
message["From"] = "test@example.com"
58-
message["To"] = to_email
59-
message["Subject"] = subject
60-
message.set_content(body)
61-
62-
email_client.send_message(message)
57+
send_email(to_email, email_client)
6358
```
6459

6560
### Enqueue Tasks
@@ -78,19 +73,75 @@ FluxQueue supports async functions too. Just define an async function and use th
7873

7974
```python
8075
@fluxqueue.task()
81-
async def send_email(to_email: str, subject: str, body: str):
76+
async def send_email_task(to_email: str):
8277
async with email_context() as email_client:
83-
message = EmailMessage()
84-
message["From"] = "test@example.com"
85-
message["To"] = to_email
86-
message["Subject"] = subject
87-
message.set_content(body)
88-
89-
await email_client.send_message(message)
78+
await send_email(to_email, email_client)
9079
```
9180

9281
Running the async function in an async context will also enqueue the task.
9382

83+
### Tasks with Context
84+
85+
FluxQueue provides a `Context` class that gives you access to task metadata and allows you to manage thread-persistent resources. Use `task_with_context()` decorator to enable this feature:
86+
87+
```python
88+
from fluxqueue import FluxQueue, Context
89+
90+
fluxqueue = FluxQueue()
91+
92+
@fluxqueue.task_with_context()
93+
def process_data_task(ctx: Context, data: str):
94+
# Access task metadata
95+
print(f"Task ID: {ctx.metadata.task_id}")
96+
print(f"Retry count: {ctx.metadata.retries}")
97+
98+
process_data(data)
99+
```
100+
101+
You can also subclass `Context` to create custom contexts with domain-specific resources. This example shows how to create a `DbContext` that manages database connections efficiently by reusing them across tasks in the same worker thread:
102+
103+
```python
104+
from contextlib import asynccontextmanager
105+
from fluxqueue import FluxQueue, Context
106+
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
107+
108+
class DbContext(Context):
109+
def __init__(self) -> None:
110+
super().__init__()
111+
112+
def _get_local_session(self):
113+
if "session" not in self.thread_storage:
114+
engine = create_async_engine(DATABASE_URL)
115+
self.thread_storage["session"] = async_sessionmaker(
116+
bind=engine, expire_on_commit=False
117+
)
118+
return self.thread_storage["session"]
119+
120+
@asynccontextmanager
121+
async def session_context(self):
122+
local_session = self._get_local_session()
123+
async with local_session() as session:
124+
try:
125+
yield session
126+
await session.commit()
127+
except Exception:
128+
await session.rollback()
129+
raise
130+
131+
@fluxqueue.task_with_context()
132+
async def create_user_task(ctx: DbContext, email: str, username: str):
133+
async with ctx.session_context() as db_session:
134+
user = User(email=email, username=username)
135+
db_session.add(user)
136+
```
137+
138+
The context parameter is automatically injected by the worker and is not part of the function's public signature when enqueueing:
139+
140+
```python
141+
# Just call with your regular arguments
142+
create_user_task("user@example.com", "johndoe")
143+
```
144+
94145
## Installing the worker
95146

96147
In order the tasks to be executed you need to run a FluxQueue worker. You need to install the worker on your system, recommended way of doing that is using `fluxqueue-cli` which comes with the `[cli]` extra:

crates/fluxqueue-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fluxqueue_core"
3-
version = "0.3.0-rc1"
3+
version = "0.3.0"
44
repository.workspace = true
55
homepage.workspace = true
66
authors.workspace = true

crates/fluxqueue-worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fluxqueue-worker"
3-
version = "0.3.0-rc1"
3+
version = "0.3.0"
44
edition = "2024"
55

66
[lib]

docs/features/context.md

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# Context Classes
2+
3+
The Context class provides access to task execution metadata and enables efficient resource management through thread-persistent storage. Use it when you need to share resources across multiple task executions in the same worker thread.
4+
5+
## Overview
6+
7+
The `Context` class provides a dual-layer storage system:
8+
9+
- **Worker Layer (`thread_storage`)**: A dictionary that persists across all tasks executed by the same worker thread. Use this for heavy resource pooling (e.g., database engines, HTTP clients) to avoid re-initialization overhead.
10+
- **Task Layer (`metadata`)**: Isolated metadata for each task execution via ContextVars. Provides read-only access to the current task's unique identity and execution state.
11+
12+
## Basic Usage
13+
14+
To use the Context feature, decorate your task function with `@fluxqueue.task_with_context()` instead of `@fluxqueue.task()`. The function must accept a context parameter as its first argument, typed as `Context` or a subclass of `Context`:
15+
16+
```python
17+
from fluxqueue import FluxQueue, Context
18+
19+
fluxqueue = FluxQueue()
20+
21+
@fluxqueue.task_with_context()
22+
def process_data_task(ctx: Context, data: str):
23+
# Access task metadata
24+
print(f"Task ID: {ctx.metadata.task_id}")
25+
print(f"Retry count: {ctx.metadata.retries}")
26+
print(f"Max retries: {ctx.metadata.max_retries}")
27+
print(f"Enqueued at: {ctx.metadata.enqueued_at}")
28+
29+
# Process the data
30+
process_data(data)
31+
```
32+
33+
When you enqueue the task, the context parameter is automatically injected by the worker and is not part of the function's public signature:
34+
35+
```python
36+
# Just call with your regular arguments - no context needed!
37+
process_data_task("some data")
38+
```
39+
40+
## Task Metadata
41+
42+
The `metadata` property provides read-only access to task execution information:
43+
44+
- `task_id`: Unique identifier for the current task execution
45+
- `retries`: Number of times this task has been retried
46+
- `max_retries`: Maximum number of retry attempts allowed
47+
- `enqueued_at`: ISO 8601 timestamp of when the task was originally enqueued
48+
49+
This metadata is isolated per task using Python's `ContextVar`, ensuring data integrity even when multiple tasks execute concurrently on the same thread.
50+
51+
## Custom Context Classes
52+
53+
You can subclass `Context` to create domain-specific contexts that provide convenient access to resources. This is especially useful for database connections, HTTP clients, or other resources that benefit from pooling:
54+
55+
```python
56+
from contextlib import asynccontextmanager
57+
from fluxqueue import FluxQueue, Context
58+
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
59+
60+
class DbContext(Context):
61+
"""
62+
Custom context for managing database connections.
63+
64+
This context reuses database engines across tasks in the same worker thread,
65+
significantly reducing connection overhead.
66+
"""
67+
def __init__(self) -> None:
68+
super().__init__()
69+
70+
def _get_local_session(self):
71+
"""
72+
Get or create a session maker for this worker thread.
73+
"""
74+
session = self.thread_storage.get("session")
75+
if session is None:
76+
engine = create_async_engine(DATABASE_URL)
77+
session = async_sessionmaker(
78+
bind=engine, expire_on_commit=False
79+
)
80+
self.thread_storage["session"] = session
81+
return session
82+
83+
@asynccontextmanager
84+
async def session_context(self):
85+
"""
86+
Context manager for database sessions with automatic commit/rollback.
87+
"""
88+
local_session = self._get_local_session()
89+
async with local_session() as session:
90+
try:
91+
yield session
92+
await session.commit()
93+
except Exception:
94+
await session.rollback()
95+
raise
96+
97+
# Use the custom context
98+
@fluxqueue.task_with_context()
99+
async def create_user_task(ctx: DbContext, email: str, username: str):
100+
async with ctx.session_context() as db_session:
101+
user = User(email=email, username=username)
102+
db_session.add(user)
103+
# Session commits automatically on success, rolls back on exception
104+
```
105+
106+
## Benefits
107+
108+
Using Context classes provides several key benefits:
109+
110+
1. **Resource Efficiency**: Share expensive resources (like database engines) across multiple tasks in the same worker thread, reducing initialization overhead.
111+
112+
2. **Task Isolation**: Each task gets its own isolated metadata via ContextVars, ensuring data integrity even with concurrent execution.
113+
114+
3. **Type Safety**: Full type hint support means your IDE and type checkers understand the context structure.
115+
116+
4. **Clean API**: The context parameter is automatically injected, so your task functions have a clean public interface without exposing implementation details.
117+
118+
5. **Flexibility**: Subclass `Context` to create domain-specific contexts tailored to your application's needs.
119+
120+
!!! tip "Avoiding Event Loop Issues in Multi-threaded Environments"
121+
122+
FluxQueue's multi-threaded Tokio runtime can cause issues with async database libraries like `asyncpg` that throw errors such as "got future pending attached to a different loop" when resources are shared across threads.
123+
124+
By using the Context class with `thread_storage`, each executor in the worker gets its own database engine instance. Combined with Python's context managers, you can safely acquire database sessions and run queries without event loop conflicts. This ensures that each worker thread maintains its own isolated database connection pool, preventing cross-thread resource sharing issues.
125+
126+
## Async and Sync Support
127+
128+
Context classes work seamlessly with both synchronous and asynchronous tasks:
129+
130+
```python
131+
# Synchronous task with context
132+
@fluxqueue.task_with_context()
133+
def sync_task(ctx: Context, data: str):
134+
print(f"Processing {data} in task {ctx.metadata.task_id}")
135+
process(data)
136+
137+
# Async task with context
138+
@fluxqueue.task_with_context()
139+
async def async_task(ctx: Context, data: str):
140+
print(f"Processing {data} in task {ctx.metadata.task_id}")
141+
await process_async(data)
142+
```
143+
144+
## Best Practices
145+
146+
1. **Use thread_storage for expensive resources**: Database engines, HTTP clients, and connection pools are perfect candidates for thread storage.
147+
148+
2. **Keep metadata read-only**: The metadata property is read-only for a reason - don't try to modify it.
149+
150+
3. **Subclass for domain logic**: Create custom context classes when you have domain-specific resources or logic that multiple tasks share.
151+
152+
4. **One context parameter per task**: Each task function should have exactly one context parameter, typed as `Context` or a subclass.
153+
154+
5. **Initialize resources lazily**: Check if resources exist in `thread_storage` before creating them to avoid unnecessary initialization.

docs/features/index.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Features
2+
3+
Learn about FluxQueue's features and how to use them.
4+
5+
## Feature Documentation
6+
7+
- **[Context Classes](context.md)** - Access task metadata and manage thread-persistent resources

docs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Welcome to FluxQueue documentation. FluxQueue is a lightweight, resource-efficie
1919

2020
## What is FluxQueue?
2121

22-
FluxQueue is a task queue for Python that gets out of your way. The Rust core makes the process fast with less overhead, least dependencies, and most importantly, less memory usage. Tasks are managed through Redis.
22+
FluxQueue is a task queue for Python that gets out of your way. Built on a multi-threaded Tokio runtime, FluxQueue delivers high throughput while maintaining low memory usage. The Rust core ensures minimal overhead and dependencies, making it an efficient solution for background task processing. Tasks are managed through Redis.
2323

2424
## Key Features
2525

@@ -31,6 +31,7 @@ FluxQueue is a task queue for Python that gets out of your way. The Rust core ma
3131
- **Multiple Queues**: Organize tasks across different queues
3232
- **Simple API**: Decorator-based interface that feels natural in Python
3333
- **Type Safe**: Full type hints support
34+
- **Context Classes**: Access task metadata and manage thread-persistent resources with the Context class
3435

3536
## Requirements
3637

0 commit comments

Comments
 (0)