Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,25 @@ on:

jobs:
test:
name: Test Python ${{ matrix.python-version }}, Redis ${{ matrix.redis-version }}, redis-py ${{ matrix.redis-py-version }}
name: Test Python ${{ matrix.python-version }}, ${{ matrix.backend.name }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.12", "3.13"]
redis-version: ["6.2", "7.4", "valkey-8.0"]
redis-py-version: [">=4.6,<5", ">=5"]
backend:
- name: "Redis 6.2, redis-py <5"
redis-version: "6.2"
redis-py-version: ">=4.6,<5"
- name: "Redis 7.4, redis-py >=5"
redis-version: "7.4"
redis-py-version: ">=5"
- name: "Valkey 8.0, redis-py >=5"
redis-version: "valkey-8.0"
redis-py-version: ">=5"
- name: "Memory (in-memory backend)"
redis-version: "memory"
redis-py-version: ">=5"

steps:
- uses: actions/checkout@v4
Expand All @@ -29,11 +40,11 @@ jobs:
cache-dependency-glob: "pyproject.toml"

- name: Install dependencies
run: uv sync --dev --upgrade-package 'redis${{ matrix.redis-py-version }}'
run: uv sync --dev --upgrade-package 'redis${{ matrix.backend.redis-py-version }}'

- name: Run tests
env:
REDIS_VERSION: ${{ matrix.redis-version }}
REDIS_VERSION: ${{ matrix.backend.redis-version }}
run: uv run pytest --cov-branch --cov-fail-under=100 --cov-report=xml --cov-report=term-missing:skip-covered

- name: Upload coverage reports to Codecov
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ __pycache__/
build/
dist/
wheels/

.coverage.*
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ pip install pydocket
Docket requires a [Redis](http://redis.io/) server with Streams support (which was
introduced in Redis 5.0.0). Docket is tested with Redis 6 and 7.

For testing without Redis, Docket includes [fakeredis](https://github.com/cunla/fakeredis-py) for in-memory operation:

```python
from docket import Docket

async with Docket(name="my-docket", url="memory://my-docket") as docket:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I might have misled you, sorry! Now that I see this, we probably do just want memory:// as the URL for all of these (not memory://<docket-name>) because the docket name is specified separately, I forgot about that part. It works either way, honestly. I'll leave it to you!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All keys are prefixed with the docket name, which is how we can have many dockets on the same redis. But again, it works either way

# Use docket normally - all operations are in-memory
...
```

See [Testing with Docket](https://chrisguidry.github.io/docket/testing/#using-in-memory-backend-no-redis-required) for more details.

# Hacking on `docket`

We use [`uv`](https://docs.astral.sh/uv/) for project management, so getting set up
Expand Down
70 changes: 70 additions & 0 deletions docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,76 @@

Docket includes the utilities you need to test all your background task systems in realistic ways. The ergonomic design supports testing complex workflows with minimal setup.

## Using In-Memory Backend (No Redis Required)

For the fastest tests and simplest setup, Docket supports an in-memory backend using [fakeredis](https://github.com/cunla/fakeredis-py). This is perfect for:

- **CI/CD environments** - No need to spin up Redis containers
- **Local development** - Test without installing/running Redis
- **Unit tests** - Fast, isolated tests without external dependencies
- **Educational environments** - Workshops and tutorials without infrastructure

### Installation

Fakeredis is included as a standard dependency, so no extra installation is needed.

### Usage

Use the `memory://` URL scheme to enable the in-memory backend:

```python
from docket import Docket

async with Docket(name="test-docket", url="memory://test") as docket:
# Use docket normally - all operations are in-memory
docket.register(my_task)
await docket.add(my_task)("arg")
```

### Multiple In-Memory Dockets

You can run multiple independent in-memory dockets simultaneously by using different URLs:

```python
async with (
Docket(name="service-a", url="memory://service-a") as docket_a,
Docket(name="service-b", url="memory://service-b") as docket_b,
):
# Each docket has its own isolated in-memory data
await docket_a.add(task_a)()
await docket_b.add(task_b)()
```

This is useful for testing multi-service scenarios in a single process.

### Pytest Fixture Example

```python
import pytest
from docket import Docket, Worker
from uuid import uuid4

@pytest.fixture
async def test_docket() -> AsyncGenerator[Docket, None]:
"""Create a test docket with in-memory backend."""
async with Docket(
name=f"test-{uuid4()}",
url=f"memory://test-{uuid4()}"
) as docket:
yield docket
```

### Limitations

The in-memory backend has some limitations compared to real Redis:

- **Single process only** - Cannot distribute work across multiple processes/machines
- **Data is ephemeral** - Lost when the process exits
- **Performance may differ** - Timing-sensitive tests may behave differently
- **Async polling behavior** - Uses non-blocking reads with manual sleeps for proper asyncio integration

For integration tests or multi-worker scenarios across processes, use a real Redis instance.

## Testing Tasks as Simple Functions

Often you can test your tasks without running a worker at all! Docket tasks are just Python functions, so you can call them directly and pass test values for dependency parameters:
Expand Down
98 changes: 98 additions & 0 deletions examples/local_development.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""
Example: Local Development Without Redis

This example demonstrates using Docket with the in-memory backend for
local development, prototyping, or situations where you don't have Redis
available but still want to use Docket's task scheduling features.

Use cases:
- Local development on a laptop without Docker/Redis
- Quick prototyping and experimentation
- Educational/tutorial environments
- Desktop applications that need background tasks
- CI/CD environments without Redis containers
- Single-process utilities that benefit from task scheduling

Limitations:
- Single process only (no distributed workers)
- Data stored in memory (lost on restart)
- Performance may differ from real Redis

To run:
uv run examples/local_development.py
"""

import asyncio
from datetime import datetime, timedelta, timezone

from docket import Docket, Worker
from docket.dependencies import Perpetual, Retry


# Example 1: Simple immediate task
async def process_file(filename: str) -> None:
print(f"📄 Processing file: {filename}")
await asyncio.sleep(0.5) # Simulate work
print(f"✅ Completed: {filename}")


# Example 2: Scheduled task with retry
async def backup_data(target: str, retry: Retry = Retry(attempts=3)) -> None:
print(f"💾 Backing up to: {target}")
await asyncio.sleep(0.3)
print(f"✅ Backup complete: {target}")


# Example 3: Periodic background task
async def health_check(
perpetual: Perpetual = Perpetual(every=timedelta(seconds=2), automatic=True),
) -> None:
print(f"🏥 Health check at {datetime.now(timezone.utc).strftime('%H:%M:%S')}")


async def main():
print("🚀 Starting Docket with in-memory backend (no Redis required!)\n")

# Use memory:// URL for in-memory operation
async with Docket(name="local-dev", url="memory://local-dev") as docket:
# Register tasks
docket.register(process_file)
docket.register(backup_data)
docket.register(health_check)

# Schedule some immediate tasks
print("Scheduling immediate tasks...")
await docket.add(process_file)("report.pdf")
await docket.add(process_file)("data.csv")
await docket.add(process_file)("config.json")

# Schedule a future task
in_two_seconds = datetime.now(timezone.utc) + timedelta(seconds=2)
print("Scheduling backup for 2 seconds from now...")
await docket.add(backup_data, when=in_two_seconds)("/tmp/backup")

# The periodic task will be auto-scheduled by the worker
print("Setting up periodic health check...\n")

# Run worker to process tasks
print("=" * 60)
async with Worker(docket, concurrency=2) as worker:
# Run for 6 seconds to see the periodic task execute a few times
print("Worker running for 6 seconds...\n")
try:
await asyncio.wait_for(worker.run_forever(), timeout=6.0)
except asyncio.TimeoutError:
print("\n" + "=" * 60)
print("✨ Demo complete!")

# Show final state
snapshot = await docket.snapshot()
print("\nFinal state:")
print(f" Snapshot time: {snapshot.taken.strftime('%H:%M:%S')}")
print(f" Future tasks: {len(snapshot.future)}")
print(f" Running tasks: {len(snapshot.running)}")


if __name__ == "__main__":
asyncio.run(main())
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ dependencies = [
dev = [
"codespell>=2.4.1",
"docker>=7.1.0",
# Using fork until https://github.com/cunla/fakeredis-py/pull/427 is merged
# This fixes xpending_range to return all 4 required fields (message_id, consumer,
# time_since_delivered, times_delivered) instead of just 2, matching Redis behavior
"fakeredis[lua] @ git+https://github.com/zzstoatzz/fakeredis-py.git@fix-xpending-range-fields",
"ipython>=9.0.1",
"mypy>=1.14.1",
"opentelemetry-distro>=0.51b0",
Expand Down Expand Up @@ -69,6 +73,10 @@ docket = "docket.__main__:app"
[tool.hatch.version]
source = "vcs"

[tool.hatch.metadata]
allow-direct-references = true


[tool.hatch.build.targets.wheel]
packages = ["src/docket"]

Expand Down
28 changes: 26 additions & 2 deletions src/docket/docket.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ def __init__(
"""
Args:
name: The name of the docket.
url: The URL of the Redis server. For example:
url: The URL of the Redis server or in-memory backend. For example:
- "redis://localhost:6379/0"
- "redis://user:password@localhost:6379/0"
- "redis://user:password@localhost:6379/0?ssl=true"
- "rediss://localhost:6379/0"
- "unix:///path/to/redis.sock"
- "memory://" (in-memory backend for testing)
heartbeat_interval: How often workers send heartbeat messages to the docket.
missed_heartbeats: How many heartbeats a worker can miss before it is
considered dead.
Expand All @@ -183,7 +184,30 @@ async def __aenter__(self) -> Self:
self.tasks = {fn.__name__: fn for fn in standard_tasks}
self.strike_list = StrikeList()

self._connection_pool = ConnectionPool.from_url(self.url) # type: ignore
# Check if we should use in-memory backend (fakeredis)
# Support memory:// URLs for in-memory dockets
if self.url.startswith("memory://"):
try:
from fakeredis.aioredis import FakeConnection, FakeServer

# All memory:// URLs share a single FakeServer instance
# Multiple dockets with different names are isolated by Redis key prefixes
# (e.g., docket1:stream vs docket2:stream)
if not hasattr(Docket, "_memory_server"):
Docket._memory_server = FakeServer() # type: ignore

server = Docket._memory_server # type: ignore
self._connection_pool = ConnectionPool(
connection_class=FakeConnection, server=server
)
except ImportError as e:
raise ImportError(
"fakeredis is required for memory:// URLs. "
"Install with: pip install pydocket[memory]"
) from e
else:
self._connection_pool = ConnectionPool.from_url(self.url) # type: ignore

self._monitor_strikes_task = asyncio.create_task(self._monitor_strikes())

# Ensure that the stream and worker group exist
Expand Down
13 changes: 11 additions & 2 deletions src/docket/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,22 @@ async def get_redeliveries(redis: Redis) -> RedisReadGroupResponse:

async def get_new_deliveries(redis: Redis) -> RedisReadGroupResponse:
logger.debug("Getting new deliveries", extra=log_context)
return await redis.xreadgroup(
# Use non-blocking read with in-memory backend + manual sleep
# This is necessary because fakeredis's async blocking operations don't
# properly yield control to the asyncio event loop
is_memory = self.docket.url.startswith("memory://")
result = await redis.xreadgroup(
groupname=self.docket.worker_group_name,
consumername=self.name,
streams={self.docket.stream_key: ">"},
block=int(self.minimum_check_interval.total_seconds() * 1000),
block=0
if is_memory
else int(self.minimum_check_interval.total_seconds() * 1000),
count=available_slots,
)
if is_memory and not result:
await asyncio.sleep(self.minimum_check_interval.total_seconds())
return result

def start_task(
message_id: RedisMessageID,
Expand Down
Loading