Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 68 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

## Overview

FluxQueue is a task queue for Python that gets out of your way. The Rust core makes the process fast with less overhead, least dependencies, and most importantly, less memory usage. Tasks are managed through Redis.
FluxQueue is a task queue for Python that gets out of your way. Built on a multi-threaded Tokio runtime, FluxQueue delivers high throughput while maintaining low memory usage. The Rust core ensures minimal overhead and dependencies, making it an efficient solution for background task processing. Tasks are managed through Redis.

## Key Features

Expand All @@ -29,6 +29,7 @@ FluxQueue is a task queue for Python that gets out of your way. The Rust core ma
- **Multiple Queues**: Organize tasks across different queues
- **Simple API**: Decorator-based interface that feels natural in Python
- **Type Safe**: Full type hints support
- **Context Classes**: Access task metadata and manage thread-persistent resources with the Context class

## Requirements

Expand All @@ -51,15 +52,9 @@ from fluxqueue import FluxQueue
fluxqueue = FluxQueue()

@fluxqueue.task()
def send_email(to_email: str, subject: str, body: str):
def send_email(to_email: str):
with email_context() as email_client:
message = EmailMessage()
message["From"] = "test@example.com"
message["To"] = to_email
message["Subject"] = subject
message.set_content(body)

email_client.send_message(message)
send_email(to_email, email_client)
```

### Enqueue Tasks
Expand All @@ -78,19 +73,75 @@ FluxQueue supports async functions too. Just define an async function and use th

```python
@fluxqueue.task()
async def send_email(to_email: str, subject: str, body: str):
async def send_email_task(to_email: str):
async with email_context() as email_client:
message = EmailMessage()
message["From"] = "test@example.com"
message["To"] = to_email
message["Subject"] = subject
message.set_content(body)

await email_client.send_message(message)
await send_email(to_email, email_client)
```

Running the async function in an async context will also enqueue the task.

### Tasks with Context

FluxQueue provides a `Context` class that gives you access to task metadata and allows you to manage thread-persistent resources. Use `task_with_context()` decorator to enable this feature:

```python
from fluxqueue import FluxQueue, Context

fluxqueue = FluxQueue()

@fluxqueue.task_with_context()
def process_data_task(ctx: Context, data: str):
# Access task metadata
print(f"Task ID: {ctx.metadata.task_id}")
print(f"Retry count: {ctx.metadata.retries}")

process_data(data)
```

You can also subclass `Context` to create custom contexts with domain-specific resources. This example shows how to create a `DbContext` that manages database connections efficiently by reusing them across tasks in the same worker thread:

```python
from contextlib import asynccontextmanager
from fluxqueue import FluxQueue, Context
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

class DbContext(Context):
def __init__(self) -> None:
super().__init__()

def _get_local_session(self):
if "session" not in self.thread_storage:
engine = create_async_engine(DATABASE_URL)
self.thread_storage["session"] = async_sessionmaker(
bind=engine, expire_on_commit=False
)
return self.thread_storage["session"]

@asynccontextmanager
async def session_context(self):
local_session = self._get_local_session()
async with local_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

@fluxqueue.task_with_context()
async def create_user_task(ctx: DbContext, email: str, username: str):
async with ctx.session_context() as db_session:
user = User(email=email, username=username)
db_session.add(user)
```

The context parameter is automatically injected by the worker and is not part of the function's public signature when enqueueing:

```python
# Just call with your regular arguments
create_user_task("user@example.com", "johndoe")
```

## Installing the worker

In order the tasks to be executed you need to run a FluxQueue worker. You need to install the worker on your system, recommended way of doing that is using `fluxqueue-cli` which comes with the `[cli]` extra:
Expand Down
2 changes: 1 addition & 1 deletion crates/fluxqueue-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluxqueue_core"
version = "0.3.0-rc1"
version = "0.3.0"
repository.workspace = true
homepage.workspace = true
authors.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/fluxqueue-worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluxqueue-worker"
version = "0.3.0-rc1"
version = "0.3.0"
edition = "2024"

[lib]
Expand Down
154 changes: 154 additions & 0 deletions docs/features/context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Context Classes

The Context class provides access to task execution metadata and enables efficient resource management through thread-persistent storage. Use it when you need to share resources across multiple task executions in the same worker thread.

## Overview

The `Context` class provides a dual-layer storage system:

- **Worker Layer (`thread_storage`)**: A dictionary that persists across all tasks executed by the same worker thread. Use this for heavy resource pooling (e.g., database engines, HTTP clients) to avoid re-initialization overhead.
- **Task Layer (`metadata`)**: Isolated metadata for each task execution via ContextVars. Provides read-only access to the current task's unique identity and execution state.

## Basic Usage

To use the Context feature, decorate your task function with `@fluxqueue.task_with_context()` instead of `@fluxqueue.task()`. The function must accept a context parameter as its first argument, typed as `Context` or a subclass of `Context`:

```python
from fluxqueue import FluxQueue, Context

fluxqueue = FluxQueue()

@fluxqueue.task_with_context()
def process_data_task(ctx: Context, data: str):
# Access task metadata
print(f"Task ID: {ctx.metadata.task_id}")
print(f"Retry count: {ctx.metadata.retries}")
print(f"Max retries: {ctx.metadata.max_retries}")
print(f"Enqueued at: {ctx.metadata.enqueued_at}")

# Process the data
process_data(data)
```

When you enqueue the task, the context parameter is automatically injected by the worker and is not part of the function's public signature:

```python
# Just call with your regular arguments - no context needed!
process_data_task("some data")
```

## Task Metadata

The `metadata` property provides read-only access to task execution information:

- `task_id`: Unique identifier for the current task execution
- `retries`: Number of times this task has been retried
- `max_retries`: Maximum number of retry attempts allowed
- `enqueued_at`: ISO 8601 timestamp of when the task was originally enqueued

This metadata is isolated per task using Python's `ContextVar`, ensuring data integrity even when multiple tasks execute concurrently on the same thread.

## Custom Context Classes

You can subclass `Context` to create domain-specific contexts that provide convenient access to resources. This is especially useful for database connections, HTTP clients, or other resources that benefit from pooling:

```python
from contextlib import asynccontextmanager
from fluxqueue import FluxQueue, Context
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

class DbContext(Context):
"""
Custom context for managing database connections.

This context reuses database engines across tasks in the same worker thread,
significantly reducing connection overhead.
"""
def __init__(self) -> None:
super().__init__()

def _get_local_session(self):
"""
Get or create a session maker for this worker thread.
"""
session = self.thread_storage.get("session")
if session is None:
engine = create_async_engine(DATABASE_URL)
session = async_sessionmaker(
bind=engine, expire_on_commit=False
)
self.thread_storage["session"] = session
return session

@asynccontextmanager
async def session_context(self):
"""
Context manager for database sessions with automatic commit/rollback.
"""
local_session = self._get_local_session()
async with local_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

# Use the custom context
@fluxqueue.task_with_context()
async def create_user_task(ctx: DbContext, email: str, username: str):
async with ctx.session_context() as db_session:
user = User(email=email, username=username)
db_session.add(user)
# Session commits automatically on success, rolls back on exception
```

## Benefits

Using Context classes provides several key benefits:

1. **Resource Efficiency**: Share expensive resources (like database engines) across multiple tasks in the same worker thread, reducing initialization overhead.

2. **Task Isolation**: Each task gets its own isolated metadata via ContextVars, ensuring data integrity even with concurrent execution.

3. **Type Safety**: Full type hint support means your IDE and type checkers understand the context structure.

4. **Clean API**: The context parameter is automatically injected, so your task functions have a clean public interface without exposing implementation details.

5. **Flexibility**: Subclass `Context` to create domain-specific contexts tailored to your application's needs.

!!! tip "Avoiding Event Loop Issues in Multi-threaded Environments"

FluxQueue's multi-threaded Tokio runtime can cause issues with async database libraries like `asyncpg` that throw errors such as "got future pending attached to a different loop" when resources are shared across threads.

By using the Context class with `thread_storage`, each executor in the worker gets its own database engine instance. Combined with Python's context managers, you can safely acquire database sessions and run queries without event loop conflicts. This ensures that each worker thread maintains its own isolated database connection pool, preventing cross-thread resource sharing issues.

## Async and Sync Support

Context classes work seamlessly with both synchronous and asynchronous tasks:

```python
# Synchronous task with context
@fluxqueue.task_with_context()
def sync_task(ctx: Context, data: str):
print(f"Processing {data} in task {ctx.metadata.task_id}")
process(data)

# Async task with context
@fluxqueue.task_with_context()
async def async_task(ctx: Context, data: str):
print(f"Processing {data} in task {ctx.metadata.task_id}")
await process_async(data)
```

## Best Practices

1. **Use thread_storage for expensive resources**: Database engines, HTTP clients, and connection pools are perfect candidates for thread storage.

2. **Keep metadata read-only**: The metadata property is read-only for a reason - don't try to modify it.

3. **Subclass for domain logic**: Create custom context classes when you have domain-specific resources or logic that multiple tasks share.

4. **One context parameter per task**: Each task function should have exactly one context parameter, typed as `Context` or a subclass.

5. **Initialize resources lazily**: Check if resources exist in `thread_storage` before creating them to avoid unnecessary initialization.
7 changes: 7 additions & 0 deletions docs/features/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Features

Learn about FluxQueue's features and how to use them.

## Feature Documentation

- **[Context Classes](context.md)** - Access task metadata and manage thread-persistent resources
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Welcome to FluxQueue documentation. FluxQueue is a lightweight, resource-efficie

## What is FluxQueue?

FluxQueue is a task queue for Python that gets out of your way. The Rust core makes the process fast with less overhead, least dependencies, and most importantly, less memory usage. Tasks are managed through Redis.
FluxQueue is a task queue for Python that gets out of your way. Built on a multi-threaded Tokio runtime, FluxQueue delivers high throughput while maintaining low memory usage. The Rust core ensures minimal overhead and dependencies, making it an efficient solution for background task processing. Tasks are managed through Redis.

## Key Features

Expand All @@ -31,6 +31,7 @@ FluxQueue is a task queue for Python that gets out of your way. The Rust core ma
- **Multiple Queues**: Organize tasks across different queues
- **Simple API**: Decorator-based interface that feels natural in Python
- **Type Safe**: Full type hints support
- **Context Classes**: Access task metadata and manage thread-persistent resources with the Context class

## Requirements

Expand Down
Loading