Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Run tests
env:
REDIS_VERSION: ${{ matrix.redis-version }}
run: uv run pytest --cov-branch --cov-report=xml --cov-report=term-missing:skip-covered
run: uv run pytest --cov-branch --cov-fail-under=100 --cov-report=xml --cov-report=term-missing:skip-covered

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v5
Expand Down
26 changes: 26 additions & 0 deletions src/docket/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,32 @@ async def run() -> None:
asyncio.run(run())


@app.command(help="Clear all pending and scheduled tasks from the docket")
def clear(
docket_: Annotated[
str,
typer.Option(
"--docket",
help="The name of the docket",
envvar="DOCKET_NAME",
),
] = "docket",
url: Annotated[
str,
typer.Option(
help="The URL of the Redis server",
envvar="DOCKET_URL",
),
] = "redis://localhost:6379/0",
) -> None:
async def run() -> None:
async with Docket(name=docket_, url=url) as docket:
cleared_count = await docket.clear()
print(f"Cleared {cleared_count} tasks from docket '{docket_}'")

asyncio.run(run())


@app.command(help="Restores a task or parameters to the Docket")
def restore(
function: Annotated[
Expand Down
43 changes: 43 additions & 0 deletions src/docket/docket.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,3 +743,46 @@ async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
workers.append(WorkerInfo(worker_name, last_seen, task_names))

return workers

async def clear(self) -> int:
"""Clear all pending and scheduled tasks from the docket.

This removes all tasks from the stream (immediate tasks) and queue
(scheduled tasks), along with their associated parked data. Running
tasks are not affected.

Returns:
The total number of tasks that were cleared.
"""
with tracer.start_as_current_span(
"docket.clear",
attributes=self.labels(),
):
async with self.redis() as redis:
async with redis.pipeline() as pipeline:
# Get counts before clearing
pipeline.xlen(self.stream_key)
pipeline.zcard(self.queue_key)
pipeline.zrange(self.queue_key, 0, -1)

stream_count: int
queue_count: int
scheduled_keys: list[bytes]
stream_count, queue_count, scheduled_keys = await pipeline.execute()

# Clear all data
# Trim stream to 0 messages instead of deleting it to preserve consumer group
if stream_count > 0:
pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
pipeline.delete(self.queue_key)

# Clear parked task data and known task keys
for key_bytes in scheduled_keys:
key = key_bytes.decode()
pipeline.delete(self.parked_task_key(key))
pipeline.delete(self.known_task_key(key))

await pipeline.execute()

total_cleared = stream_count + queue_count
return total_cleared
253 changes: 253 additions & 0 deletions tests/cli/test_clear.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock

import pytest
from typer.testing import CliRunner

from docket.cli import app
from docket.docket import Docket


@pytest.fixture(autouse=True)
async def empty_docket(docket: Docket):
"""Ensure that the docket starts empty"""
await docket.clear()


async def test_clear_command_empty_docket(docket: Docket, runner: CliRunner):
"""Should clear empty docket and report 0 tasks cleared"""
result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared 0 tasks" in result.output


async def test_clear_command_with_immediate_tasks(
docket: Docket, runner: CliRunner, the_task: AsyncMock
):
"""Should clear immediate tasks and report count"""
docket.register(the_task)

await docket.add(the_task)("arg1")
await docket.add(the_task)("arg2")
await docket.add(the_task)("arg3")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared 3 tasks" in result.output

snapshot = await docket.snapshot()
assert len(snapshot.future) == 0
assert len(snapshot.running) == 0


async def test_clear_command_with_scheduled_tasks(
docket: Docket, runner: CliRunner, the_task: AsyncMock
):
"""Should clear scheduled tasks and report count"""
docket.register(the_task)

future = datetime.now(timezone.utc) + timedelta(seconds=60)
await docket.add(the_task, when=future)("scheduled1")
await docket.add(the_task, when=future + timedelta(seconds=1))("scheduled2")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared 2 tasks" in result.output

snapshot = await docket.snapshot()
assert len(snapshot.future) == 0
assert len(snapshot.running) == 0


async def test_clear_command_with_mixed_tasks(
docket: Docket, runner: CliRunner, the_task: AsyncMock, another_task: AsyncMock
):
"""Should clear both immediate and scheduled tasks"""
docket.register(the_task)
docket.register(another_task)

future = datetime.now(timezone.utc) + timedelta(seconds=60)

await docket.add(the_task)("immediate1")
await docket.add(another_task)("immediate2")
await docket.add(the_task, when=future)("scheduled1")
await docket.add(another_task, when=future + timedelta(seconds=1))("scheduled2")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared 4 tasks" in result.output

snapshot = await docket.snapshot()
assert len(snapshot.future) == 0
assert len(snapshot.running) == 0


async def test_clear_command_with_keyed_tasks(
docket: Docket, runner: CliRunner, the_task: AsyncMock
):
"""Should clear tasks with keys"""
docket.register(the_task)

await docket.add(the_task, key="task1")("arg1")
await docket.add(the_task, key="task2")("arg2")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared 2 tasks" in result.output

snapshot = await docket.snapshot()
assert len(snapshot.future) == 0


async def test_clear_command_basic_functionality(
docket: Docket, runner: CliRunner, the_task: AsyncMock
):
"""Should clear tasks via CLI command"""
docket.register(the_task)

# Add some tasks to clear
await docket.add(the_task)("task1")
future = datetime.now(timezone.utc) + timedelta(seconds=60)
await docket.add(the_task, when=future)("scheduled_task")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared" in result.output

snapshot_after_clear = await docket.snapshot()
assert len(snapshot_after_clear.future) == 0


async def test_clear_command_preserves_strikes(
docket: Docket, runner: CliRunner, the_task: AsyncMock
):
"""Should not affect strikes when clearing"""
docket.register(the_task)

await docket.strike("the_task")
await docket.add(the_task)("arg1")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared" in result.output

# Strikes should still be in effect - clear doesn't affect strikes


async def test_clear_command_with_custom_url(runner: CliRunner):
"""Should handle custom Redis URL"""
result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
"redis://nonexistent:12345/0",
"--docket",
"test-docket",
],
)
assert result.exit_code != 0
assert result.exit_code != 0


async def test_clear_command_with_custom_docket_name(
docket: Docket, runner: CliRunner, the_task: AsyncMock
):
"""Should handle custom docket name"""
docket.register(the_task)
await docket.add(the_task)("test")

result = await asyncio.get_running_loop().run_in_executor(
None,
runner.invoke,
app,
[
"clear",
"--url",
docket.url,
"--docket",
docket.name,
],
)
assert result.exit_code == 0, result.output
assert "Cleared 1 tasks" in result.output
9 changes: 3 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ def redis_server(testrun_uid: str, worker_id: str) -> Generator[Container, None,
s.bind(("", 0))
redis_port = s.getsockname()[1]

# Determine Docker image based on version
if REDIS_VERSION.startswith("valkey-"):
version = REDIS_VERSION.replace("valkey-", "")
image = f"valkey/valkey:{version}"
else:
image = f"redis:{REDIS_VERSION}"
image = f"redis:{REDIS_VERSION}"
if REDIS_VERSION.startswith("valkey-"): # pragma: no branch
image = f"valkey/valkey:{REDIS_VERSION.replace('valkey-', '')}" # pragma: no cover

container = client.containers.run(
image,
Expand Down
Loading