Skip to content

Commit c5dc2c1

Browse files
desertaxleclaude
andauthored
Add execution state tracking and progress monitoring (#181)
## Summary Adds comprehensive execution state tracking and progress monitoring capabilities to Docket, enabling real-time observability of task execution through Redis pub/sub. - Implements `ExecutionProgress` class with instance attributes for tracking task progress - Adds execution state machine: `SCHEDULED → QUEUED → RUNNING → COMPLETED/FAILED` - Provides pub/sub event publishing for state transitions and progress updates - Moves scheduling logic from `Docket` to `Execution` class for better encapsulation - Add `docket watch` CLI to view progress for a single task ## Demo https://github.com/user-attachments/assets/7f5fcb51-5748-4cd0-886f-a6b802490bdf ## Key Changes ### Execution State Management - Renamed `PENDING` state to `QUEUED` throughout codebase - Immediate tasks now transition directly to `QUEUED` state - Future tasks start as `SCHEDULED`, then move to `QUEUED` when due - Worker scheduler now publishes pub/sub events when moving tasks from scheduled to queued ### ExecutionProgress Class - Added instance attributes: `current`, `total`, `message`, `updated_at` - All Redis-modifying methods now update both Redis and instance attributes atomically - Added `create()` classmethod for initialization - Progress data automatically deleted on task completion/failure ### Scheduling Refactor - Moved scheduling logic from `Docket._schedule()` to `Execution.schedule()` - Atomic operation combining stream insertion and state record writing via Lua script - Removed redundant `set_scheduled()` and `set_queued()` methods - State transitions now handled automatically by scheduling logic ### Pub/Sub Event Publishing - State changes publish to `{docket}:state:{key}` channel - Progress updates publish to `{docket}:progress:{key}` channel - Both `Execution.subscribe()` and `ExecutionProgress.subscribe()` methods available - Worker scheduler Lua script now publishes events for scheduled→queued transitions Closes #88 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent eb8269f commit c5dc2c1

18 files changed

+2657
-189
lines changed

examples/task_progress.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Example demonstrating task progress tracking and real-time monitoring.
2+
3+
This example shows how to:
4+
- Report progress from within a task using ExecutionProgress
5+
- Track progress with current value, total, and status messages
6+
- Monitor task progress in real-time using the 'docket watch' command
7+
- Schedule tasks for future execution
8+
9+
Key Concepts:
10+
- ExecutionProgress: Tracks task progress (current/total) and status messages
11+
- Progress dependency: Injected into tasks via Progress() default parameter
12+
- Real-time monitoring: Use 'docket watch' CLI to monitor running tasks
13+
- State tracking: Tasks transition through SCHEDULED → QUEUED → RUNNING → COMPLETED
14+
15+
Run this example with `uv run -m examples.task_progress` and use the printed 'docket watch' command to see live progress updates.
16+
"""
17+
18+
from datetime import datetime, timedelta, timezone
19+
from docket import Docket, Progress, Worker
20+
import asyncio
21+
import rich.console
22+
23+
from docket.execution import ExecutionProgress
24+
25+
from .common import run_redis
26+
27+
28+
async def long_task(progress: ExecutionProgress = Progress()) -> None:
29+
"""A long-running task that reports progress as it executes.
30+
31+
This demonstrates the key progress tracking patterns:
32+
- Progress dependency injection via Progress() default parameter
33+
- Incremental progress updates with increment()
34+
- Status messages with set_message()
35+
36+
The ExecutionProgress object has a default total of 100, so we don't need
37+
to call set_total() in this example. The progress automatically increments
38+
from 0 to 100.
39+
40+
Args:
41+
progress: Injected ExecutionProgress tracker (automatically provided by Docket)
42+
43+
Pattern for your own tasks:
44+
1. Add progress parameter with Progress() default
45+
2. Call increment() as work progresses (or set_total + increment)
46+
3. Optionally set_message() to show current status
47+
4. Monitor with: docket watch --url <redis_url> --docket <name> <task_key>
48+
"""
49+
# Simulate 100 steps of work, each taking 1 second
50+
for i in range(1, 101):
51+
await asyncio.sleep(1) # Simulate work being done
52+
53+
# Increment progress by 1 (tracks that one more unit is complete)
54+
await progress.increment()
55+
56+
# Update status message every 10 items for demonstration
57+
if i % 10 == 0:
58+
await progress.set_message(f"{i} splines retriculated")
59+
60+
61+
# Export tasks for docket CLI to discover
62+
tasks = [long_task]
63+
64+
# Console for printing user-friendly messages
65+
console = rich.console.Console()
66+
67+
68+
async def main():
69+
"""Run the progress tracking example.
70+
71+
This function demonstrates the complete lifecycle:
72+
1. Start a Redis container for testing
73+
2. Create a Docket (task queue)
74+
3. Start a Worker (executes tasks)
75+
4. Register and schedule a task
76+
5. Monitor progress with the 'docket watch' command
77+
78+
The task is scheduled 20 seconds in the future to give you time to
79+
run the watch command and see the task transition through states:
80+
SCHEDULED → QUEUED → RUNNING → COMPLETED
81+
"""
82+
# Start a temporary Redis container for this example
83+
# In production, you'd connect to your existing Redis instance
84+
async with run_redis("7.4.2") as redis_url:
85+
# Create a Docket connected to Redis
86+
async with Docket(name="task-progress", url=redis_url) as docket:
87+
# Start a Worker to execute tasks from the docket
88+
async with Worker(docket, name="task-progress-worker") as worker:
89+
# Register the task so the worker knows how to execute it
90+
docket.register(long_task)
91+
92+
# Schedule the task to run 20 seconds from now
93+
# This gives you time to run the watch command before it starts
94+
in_twenty_seconds = datetime.now(timezone.utc) + timedelta(seconds=20)
95+
execution = await docket.add(
96+
long_task, key="long-task", when=in_twenty_seconds
97+
)()
98+
99+
# Print instructions for monitoring
100+
console.print(f"Execution {execution.key} started!")
101+
console.print(
102+
f"Run [blue]docket watch --url {redis_url} --docket {docket.name} {execution.key}[/blue] to see the progress!"
103+
)
104+
105+
# Run the worker until all tasks complete
106+
# The worker will wait for the scheduled time, then execute the task
107+
await worker.run_until_finished()
108+
109+
110+
if __name__ == "__main__":
111+
asyncio.run(main())

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ dev = [
5858
"pytest>=8.3.4",
5959
"pytest-asyncio>=0.24.0",
6060
"pytest-cov>=6.0.0",
61+
"pytest-timeout>=2.4.0",
6162
"pytest-xdist>=3.6.1",
6263
"ruff>=0.9.7",
6364
]

src/docket/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
Depends,
1919
ExponentialRetry,
2020
Perpetual,
21+
Progress,
2122
Retry,
2223
TaskArgument,
2324
TaskKey,
2425
TaskLogger,
2526
Timeout,
2627
)
2728
from .docket import Docket
28-
from .execution import Execution
29+
from .execution import Execution, ExecutionState
2930
from .worker import Worker
3031

3132
__all__ = [
@@ -38,9 +39,11 @@
3839
"Depends",
3940
"Docket",
4041
"Execution",
42+
"ExecutionState",
4143
"ExponentialRetry",
4244
"Logged",
4345
"Perpetual",
46+
"Progress",
4447
"Retry",
4548
"TaskArgument",
4649
"TaskKey",

src/docket/agenda.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ async def scatter(
181181
# Create execution with unique key
182182
key = str(uuid7())
183183
execution = Execution(
184+
docket=docket,
184185
function=resolved_func,
185186
args=args,
186187
kwargs=kwargs,

0 commit comments

Comments
 (0)