Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions loq.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ max_lines = 945

[[rules]]
path = "src/docket/execution.py"
max_lines = 1021
max_lines = 1020

[[rules]]
path = "src/docket/docket.py"
max_lines = 866
max_lines = 869
3 changes: 3 additions & 0 deletions src/docket/docket.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,9 @@ async def _cancel(self, redis: Redis | RedisCluster, key: str) -> None:
redis.call('DEL', known_key, parked_key, stream_id_key)
redis.call('ZREM', queue_key, task_key)

-- Clear scheduling markers so add() can reschedule this key
redis.call('HDEL', runs_key, 'known', 'stream_id')

-- Only set CANCELLED if not already in a terminal state
local current_state = redis.call('HGET', runs_key, 'state')
if current_state ~= 'completed' and current_state ~= 'failed' and current_state ~= 'cancelled' then
Expand Down
32 changes: 31 additions & 1 deletion tests/fundamentals/test_perpetual.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from datetime import datetime, timedelta, timezone

from docket import Docket, Perpetual, Worker
from docket import Docket, ExecutionState, Perpetual, Worker


async def test_perpetual_tasks(docket: Docket, worker: Worker):
Expand Down Expand Up @@ -155,6 +155,36 @@ async def perpetual_task(
assert delay >= timedelta(milliseconds=50)


async def test_cancelled_automatic_perpetual_can_be_rescheduled(
docket: Docket, worker: Worker
):
"""After cancelling a scheduled Perpetual task, add() can reschedule it."""
calls = 0

async def my_auto_task(
perpetual: Perpetual = Perpetual(
every=timedelta(milliseconds=50), automatic=True
),
):
nonlocal calls
calls += 1

# Schedule the task but don't run a worker yet — leave it sitting in the queue
await docket.add(my_auto_task, key="my_auto_task")()

# Cancel before any worker picks it up
await docket.cancel("my_auto_task")

# Re-add with the same key; this should succeed, not silently fail
execution = await docket.add(my_auto_task, key="my_auto_task")()
await execution.sync()
assert execution.state in (ExecutionState.QUEUED, ExecutionState.SCHEDULED)

# Verify the task actually runs
await worker.run_at_most({"my_auto_task": 1})
assert calls == 1


async def test_perpetual_tasks_can_schedule_next_run_at_specific_time(
docket: Docket, worker: Worker
):
Expand Down