Skip to content

Commit f9c7d16

Browse files
omikaderOmar Abdelkaderclaude
authored
Add Cron dependency for cron-style task scheduling (#311)
Introduces a new `Cron` dependency that extends `Perpetual` to support wall-clock scheduled tasks using cron expressions. Unlike `Perpetual` which uses relative intervals, `Cron` schedules tasks at exact times (e.g., "0 9 * * 1" for Mondays at 9 AM). Key changes: - Add `Cron` class with `croniter` integration for expression parsing - Support standard 5-field cron syntax and Vixie keywords (@daily, etc.) - Automatic scheduling at worker startup (`automatic=True` by default) Notes: - Added `croniter` as a direct dependency and `types-croniter` as a `dev` dependency - Many of the tests in `test_cron` are mocked to avoid long wait times. Closes #288 --------- Co-authored-by: Omar Abdelkader <oabdelkader@nvidia.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent aecd46a commit f9c7d16

File tree

8 files changed

+277
-10
lines changed

8 files changed

+277
-10
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ classifiers = [
2424
]
2525
dependencies = [
2626
"cloudpickle>=3.1.1",
27+
"croniter>=6",
2728
"exceptiongroup>=1.2.0; python_version < '3.11'",
2829
"taskgroup>=0.2.2; python_version < '3.11'",
2930
"fakeredis[lua]>=2.32.1",
@@ -69,6 +70,7 @@ dev = [
6970
"pytest-timeout>=2.4.0",
7071
"pytest-xdist>=3.6.1",
7172
"ruff>=0.14.14",
73+
"types-croniter>=6",
7274
]
7375

7476
docs = [

src/docket/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .annotations import Logged
1313
from .dependencies import (
1414
ConcurrencyLimit,
15+
Cron,
1516
CurrentDocket,
1617
CurrentExecution,
1718
CurrentWorker,
@@ -36,6 +37,7 @@
3637
"__version__",
3738
"Agenda",
3839
"ConcurrencyLimit",
40+
"Cron",
3941
"CurrentDocket",
4042
"CurrentExecution",
4143
"CurrentWorker",

src/docket/dependencies/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
format_duration,
1717
)
1818
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
19+
from ._cron import Cron
1920
from ._contextual import (
2021
CurrentDocket,
2122
CurrentExecution,
@@ -74,6 +75,7 @@
7475
"AdmissionBlocked",
7576
"ConcurrencyBlocked",
7677
"ConcurrencyLimit",
78+
"Cron",
7779
"Perpetual",
7880
"Progress",
7981
"Timeout",

src/docket/dependencies/_cron.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Cron-style scheduling dependency."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import datetime, timezone
6+
from typing import TYPE_CHECKING
7+
8+
from croniter import croniter
9+
10+
from ._perpetual import Perpetual
11+
12+
if TYPE_CHECKING: # pragma: no cover
13+
from ._base import TaskOutcome
14+
from ..execution import Execution
15+
16+
17+
class Cron(Perpetual):
18+
"""Declare a task that should run on a cron schedule. Cron tasks are automatically
19+
rescheduled for the next matching time after they finish (whether they succeed or
20+
fail). By default, a cron task is scheduled at worker startup with `automatic=True`.
21+
22+
Unlike `Perpetual` which schedules based on intervals from the current time, `Cron`
23+
schedules based on wall-clock time, ensuring tasks run at consistent times regardless
24+
of execution duration or delays.
25+
26+
Supports standard cron expressions and Vixie cron-style keywords (@daily, @hourly, etc.)
27+
via the croniter library.
28+
29+
Example:
30+
31+
```python
32+
@task
33+
async def weekly_report(cron: Cron = Cron("0 9 * * 1")) -> None:
34+
# Runs every Monday at 9:00 AM UTC
35+
...
36+
37+
@task
38+
async def daily_cleanup(cron: Cron = Cron("@daily")) -> None:
39+
# Runs every day at midnight UTC
40+
...
41+
```
42+
"""
43+
44+
expression: str
45+
46+
_croniter: croniter[datetime]
47+
48+
def __init__(
49+
self,
50+
expression: str,
51+
automatic: bool = True,
52+
) -> None:
53+
"""
54+
Args:
55+
expression: A cron expression string. Supports:
56+
- Standard 5-field syntax: "minute hour day month weekday"
57+
(e.g., "0 9 * * 1" for Mondays at 9 AM)
58+
- Vixie cron keywords: @yearly, @annually, @monthly, @weekly,
59+
@daily, @midnight, @hourly
60+
automatic: If set, this task will be automatically scheduled during worker
61+
startup and continually through the worker's lifespan. This ensures
62+
that the task will always be scheduled despite crashes and other
63+
adverse conditions. Automatic tasks must not require any arguments.
64+
"""
65+
super().__init__(automatic=automatic)
66+
self.expression = expression
67+
self._croniter = croniter(expression, datetime.now(timezone.utc), datetime)
68+
69+
async def __aenter__(self) -> Cron:
70+
execution = self.execution.get()
71+
cron = Cron(expression=self.expression, automatic=self.automatic)
72+
cron.args = execution.args
73+
cron.kwargs = execution.kwargs
74+
return cron
75+
76+
@property
77+
def initial_when(self) -> datetime:
78+
"""Return the next cron time for initial scheduling."""
79+
return self._croniter.get_next()
80+
81+
async def on_complete(self, execution: Execution, outcome: TaskOutcome) -> bool:
82+
"""Handle completion by scheduling the next execution at the exact cron time.
83+
84+
This overrides Perpetual's on_complete to ensure we hit the exact wall-clock
85+
time rather than adjusting for task duration.
86+
"""
87+
self.at(self._croniter.get_next())
88+
return await super().on_complete(execution, outcome)

src/docket/dependencies/_perpetual.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,16 @@ def __init__(
6161

6262
async def __aenter__(self) -> Perpetual:
6363
execution = self.execution.get()
64-
perpetual = Perpetual(every=self.every)
64+
perpetual = Perpetual(every=self.every, automatic=self.automatic)
6565
perpetual.args = execution.args
6666
perpetual.kwargs = execution.kwargs
6767
return perpetual
6868

69+
@property
70+
def initial_when(self) -> datetime | None:
71+
"""Return None to schedule for immediate execution at worker startup."""
72+
return None
73+
6974
def cancel(self) -> None:
7075
self.cancelled = True
7176

src/docket/worker.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -763,15 +763,12 @@ async def _schedule_all_automatic_perpetual_tasks(self) -> None:
763763
perpetual = get_single_dependency_parameter_of_type(
764764
task_function, Perpetual
765765
)
766-
if perpetual is None:
767-
continue
768766

769-
if not perpetual.automatic:
770-
continue
771-
772-
key = task_function.__name__
773-
774-
await self.docket.add(task_function, key=key)()
767+
if perpetual is not None and perpetual.automatic:
768+
key = task_function.__name__
769+
await self.docket.add(
770+
task_function, when=perpetual.initial_when, key=key
771+
)()
775772
except LockError: # pragma: no cover
776773
return
777774

tests/fundamentals/test_cron.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
"""Tests for Cron dependency (cron-style scheduled tasks)."""
2+
3+
from datetime import datetime, timedelta, timezone
4+
from unittest.mock import patch
5+
6+
from croniter import croniter
7+
8+
from docket import Docket, Worker
9+
from docket.dependencies import Cron
10+
11+
12+
async def test_cron_task_reschedules_itself(docket: Docket, worker: Worker):
13+
"""Cron tasks automatically reschedule after each execution."""
14+
runs = 0
15+
16+
async def my_cron_task(cron: Cron = Cron("0 9 * * *", automatic=False)):
17+
nonlocal runs
18+
runs += 1
19+
20+
# Patch croniter.get_next to return a time 10ms in the future
21+
with patch.object(
22+
croniter,
23+
"get_next",
24+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
25+
):
26+
execution = await docket.add(my_cron_task)()
27+
await worker.run_at_most({execution.key: 3})
28+
29+
assert runs == 3
30+
31+
32+
async def test_cron_tasks_are_automatically_scheduled(docket: Docket, worker: Worker):
33+
"""Cron tasks with automatic=True are scheduled at worker startup."""
34+
calls = 0
35+
36+
async def my_automatic_cron(
37+
cron: Cron = Cron("0 0 * * *"),
38+
): # automatic=True is default
39+
nonlocal calls
40+
calls += 1
41+
42+
docket.register(my_automatic_cron)
43+
44+
with patch.object(
45+
croniter,
46+
"get_next",
47+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
48+
):
49+
await worker.run_at_most({"my_automatic_cron": 2})
50+
51+
assert calls == 2
52+
53+
54+
async def test_cron_tasks_continue_after_errors(docket: Docket, worker: Worker):
55+
"""Cron tasks keep rescheduling even when they raise exceptions."""
56+
calls = 0
57+
58+
async def flaky_cron_task(cron: Cron = Cron("0 * * * *", automatic=False)):
59+
nonlocal calls
60+
calls += 1
61+
raise ValueError("Task failed!")
62+
63+
with patch.object(
64+
croniter,
65+
"get_next",
66+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
67+
):
68+
execution = await docket.add(flaky_cron_task)()
69+
await worker.run_at_most({execution.key: 3})
70+
71+
assert calls == 3
72+
73+
74+
async def test_cron_tasks_can_cancel_themselves(docket: Docket, worker: Worker):
75+
"""A cron task can stop rescheduling by calling cron.cancel()."""
76+
calls = 0
77+
78+
async def limited_cron_task(cron: Cron = Cron("0 * * * *", automatic=False)):
79+
nonlocal calls
80+
calls += 1
81+
if calls >= 3:
82+
cron.cancel()
83+
84+
with patch.object(
85+
croniter,
86+
"get_next",
87+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
88+
):
89+
await docket.add(limited_cron_task)()
90+
await worker.run_until_finished()
91+
92+
assert calls == 3
93+
94+
95+
async def test_cron_supports_vixie_keywords(docket: Docket, worker: Worker):
96+
"""Cron supports Vixie cron keywords like @daily, @weekly, @hourly."""
97+
runs = 0
98+
99+
# @daily is equivalent to "0 0 * * *" (midnight every day)
100+
async def daily_task(cron: Cron = Cron("@daily", automatic=False)):
101+
nonlocal runs
102+
runs += 1
103+
104+
with patch.object(
105+
croniter,
106+
"get_next",
107+
return_value=datetime.now(timezone.utc) + timedelta(milliseconds=10),
108+
):
109+
execution = await docket.add(daily_task)()
110+
await worker.run_at_most({execution.key: 1})
111+
112+
assert runs == 1
113+
114+
115+
async def test_automatic_cron_waits_for_scheduled_time(docket: Docket, worker: Worker):
116+
"""Automatic cron tasks wait for their next scheduled time instead of running immediately.
117+
118+
Unlike Perpetual tasks which run immediately at worker startup, Cron tasks
119+
schedule themselves for the next matching cron time. This ensures a Monday 9 AM
120+
cron doesn't accidentally run on a Wednesday startup.
121+
"""
122+
calls: list[datetime] = []
123+
124+
async def scheduled_task(cron: Cron = Cron("0 9 * * 1")): # Mondays at 9 AM
125+
calls.append(datetime.now(timezone.utc))
126+
127+
docket.register(scheduled_task)
128+
129+
# Schedule for 100ms in the future (simulating next Monday 9 AM)
130+
future_time = datetime.now(timezone.utc) + timedelta(milliseconds=100)
131+
with patch.object(croniter, "get_next", return_value=future_time):
132+
await worker.run_at_most({"scheduled_task": 1})
133+
134+
assert len(calls) == 1
135+
# The task ran at or after the scheduled time, not immediately
136+
assert calls[0] >= future_time - timedelta(milliseconds=50)

0 commit comments

Comments
 (0)