diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cb51b28e..daa8e679 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,7 +34,7 @@ jobs: - name: Run tests env: REDIS_VERSION: ${{ matrix.redis-version }} - run: uv run pytest --cov-branch --cov-report=xml --cov-report=term-missing:skip-covered + run: uv run pytest --cov-branch --cov-fail-under=100 --cov-report=xml --cov-report=term-missing:skip-covered - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v5 diff --git a/src/docket/cli.py b/src/docket/cli.py index 7dfbabc4..ee7d9ba6 100644 --- a/src/docket/cli.py +++ b/src/docket/cli.py @@ -358,6 +358,32 @@ async def run() -> None: asyncio.run(run()) +@app.command(help="Clear all pending and scheduled tasks from the docket") +def clear( + docket_: Annotated[ + str, + typer.Option( + "--docket", + help="The name of the docket", + envvar="DOCKET_NAME", + ), + ] = "docket", + url: Annotated[ + str, + typer.Option( + help="The URL of the Redis server", + envvar="DOCKET_URL", + ), + ] = "redis://localhost:6379/0", +) -> None: + async def run() -> None: + async with Docket(name=docket_, url=url) as docket: + cleared_count = await docket.clear() + print(f"Cleared {cleared_count} tasks from docket '{docket_}'") + + asyncio.run(run()) + + @app.command(help="Restores a task or parameters to the Docket") def restore( function: Annotated[ diff --git a/src/docket/docket.py b/src/docket/docket.py index 4aa374fc..3d4e0324 100644 --- a/src/docket/docket.py +++ b/src/docket/docket.py @@ -743,3 +743,46 @@ async def task_workers(self, task_name: str) -> Collection[WorkerInfo]: workers.append(WorkerInfo(worker_name, last_seen, task_names)) return workers + + async def clear(self) -> int: + """Clear all pending and scheduled tasks from the docket. + + This removes all tasks from the stream (immediate tasks) and queue + (scheduled tasks), along with their associated parked data. Running + tasks are not affected. + + Returns: + The total number of tasks that were cleared. + """ + with tracer.start_as_current_span( + "docket.clear", + attributes=self.labels(), + ): + async with self.redis() as redis: + async with redis.pipeline() as pipeline: + # Get counts before clearing + pipeline.xlen(self.stream_key) + pipeline.zcard(self.queue_key) + pipeline.zrange(self.queue_key, 0, -1) + + stream_count: int + queue_count: int + scheduled_keys: list[bytes] + stream_count, queue_count, scheduled_keys = await pipeline.execute() + + # Clear all data + # Trim stream to 0 messages instead of deleting it to preserve consumer group + if stream_count > 0: + pipeline.xtrim(self.stream_key, maxlen=0, approximate=False) + pipeline.delete(self.queue_key) + + # Clear parked task data and known task keys + for key_bytes in scheduled_keys: + key = key_bytes.decode() + pipeline.delete(self.parked_task_key(key)) + pipeline.delete(self.known_task_key(key)) + + await pipeline.execute() + + total_cleared = stream_count + queue_count + return total_cleared diff --git a/tests/cli/test_clear.py b/tests/cli/test_clear.py new file mode 100644 index 00000000..826d68b0 --- /dev/null +++ b/tests/cli/test_clear.py @@ -0,0 +1,253 @@ +import asyncio +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock + +import pytest +from typer.testing import CliRunner + +from docket.cli import app +from docket.docket import Docket + + +@pytest.fixture(autouse=True) +async def empty_docket(docket: Docket): + """Ensure that the docket starts empty""" + await docket.clear() + + +async def test_clear_command_empty_docket(docket: Docket, runner: CliRunner): + """Should clear empty docket and report 0 tasks cleared""" + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared 0 tasks" in result.output + + +async def test_clear_command_with_immediate_tasks( + docket: Docket, runner: CliRunner, the_task: AsyncMock +): + """Should clear immediate tasks and report count""" + docket.register(the_task) + + await docket.add(the_task)("arg1") + await docket.add(the_task)("arg2") + await docket.add(the_task)("arg3") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared 3 tasks" in result.output + + snapshot = await docket.snapshot() + assert len(snapshot.future) == 0 + assert len(snapshot.running) == 0 + + +async def test_clear_command_with_scheduled_tasks( + docket: Docket, runner: CliRunner, the_task: AsyncMock +): + """Should clear scheduled tasks and report count""" + docket.register(the_task) + + future = datetime.now(timezone.utc) + timedelta(seconds=60) + await docket.add(the_task, when=future)("scheduled1") + await docket.add(the_task, when=future + timedelta(seconds=1))("scheduled2") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared 2 tasks" in result.output + + snapshot = await docket.snapshot() + assert len(snapshot.future) == 0 + assert len(snapshot.running) == 0 + + +async def test_clear_command_with_mixed_tasks( + docket: Docket, runner: CliRunner, the_task: AsyncMock, another_task: AsyncMock +): + """Should clear both immediate and scheduled tasks""" + docket.register(the_task) + docket.register(another_task) + + future = datetime.now(timezone.utc) + timedelta(seconds=60) + + await docket.add(the_task)("immediate1") + await docket.add(another_task)("immediate2") + await docket.add(the_task, when=future)("scheduled1") + await docket.add(another_task, when=future + timedelta(seconds=1))("scheduled2") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared 4 tasks" in result.output + + snapshot = await docket.snapshot() + assert len(snapshot.future) == 0 + assert len(snapshot.running) == 0 + + +async def test_clear_command_with_keyed_tasks( + docket: Docket, runner: CliRunner, the_task: AsyncMock +): + """Should clear tasks with keys""" + docket.register(the_task) + + await docket.add(the_task, key="task1")("arg1") + await docket.add(the_task, key="task2")("arg2") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared 2 tasks" in result.output + + snapshot = await docket.snapshot() + assert len(snapshot.future) == 0 + + +async def test_clear_command_basic_functionality( + docket: Docket, runner: CliRunner, the_task: AsyncMock +): + """Should clear tasks via CLI command""" + docket.register(the_task) + + # Add some tasks to clear + await docket.add(the_task)("task1") + future = datetime.now(timezone.utc) + timedelta(seconds=60) + await docket.add(the_task, when=future)("scheduled_task") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared" in result.output + + snapshot_after_clear = await docket.snapshot() + assert len(snapshot_after_clear.future) == 0 + + +async def test_clear_command_preserves_strikes( + docket: Docket, runner: CliRunner, the_task: AsyncMock +): + """Should not affect strikes when clearing""" + docket.register(the_task) + + await docket.strike("the_task") + await docket.add(the_task)("arg1") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared" in result.output + + # Strikes should still be in effect - clear doesn't affect strikes + + +async def test_clear_command_with_custom_url(runner: CliRunner): + """Should handle custom Redis URL""" + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + "redis://nonexistent:12345/0", + "--docket", + "test-docket", + ], + ) + assert result.exit_code != 0 + assert result.exit_code != 0 + + +async def test_clear_command_with_custom_docket_name( + docket: Docket, runner: CliRunner, the_task: AsyncMock +): + """Should handle custom docket name""" + docket.register(the_task) + await docket.add(the_task)("test") + + result = await asyncio.get_running_loop().run_in_executor( + None, + runner.invoke, + app, + [ + "clear", + "--url", + docket.url, + "--docket", + docket.name, + ], + ) + assert result.exit_code == 0, result.output + assert "Cleared 1 tasks" in result.output diff --git a/tests/conftest.py b/tests/conftest.py index ef26289c..70e27aa1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -90,12 +90,9 @@ def redis_server(testrun_uid: str, worker_id: str) -> Generator[Container, None, s.bind(("", 0)) redis_port = s.getsockname()[1] - # Determine Docker image based on version - if REDIS_VERSION.startswith("valkey-"): - version = REDIS_VERSION.replace("valkey-", "") - image = f"valkey/valkey:{version}" - else: - image = f"redis:{REDIS_VERSION}" + image = f"redis:{REDIS_VERSION}" + if REDIS_VERSION.startswith("valkey-"): # pragma: no branch + image = f"valkey/valkey:{REDIS_VERSION.replace('valkey-', '')}" # pragma: no cover container = client.containers.run( image, diff --git a/tests/test_docket.py b/tests/test_docket.py index bff6cb30..69400c02 100644 --- a/tests/test_docket.py +++ b/tests/test_docket.py @@ -1,3 +1,7 @@ +from datetime import datetime, timedelta, timezone +from typing import cast +from unittest.mock import AsyncMock + import pytest import redis.exceptions @@ -12,3 +16,153 @@ async def test_docket_aenter_propagates_connection_errors(): await docket.__aenter__() await docket.__aexit__(None, None, None) + + +async def test_clear_empty_docket(docket: Docket): + """Clearing an empty docket should succeed without error""" + result = await docket.clear() + assert result == 0 + + +async def test_clear_with_immediate_tasks(docket: Docket, the_task: AsyncMock): + """Should clear immediate tasks from the stream""" + docket.register(the_task) + + await docket.add(the_task)("arg1", kwarg1="value1") + await docket.add(the_task)("arg2", kwarg1="value2") + await docket.add(the_task)("arg3", kwarg1="value3") + + snapshot_before = await docket.snapshot() + assert len(snapshot_before.future) == 3 + + result = await docket.clear() + assert result == 3 + + snapshot_after = await docket.snapshot() + assert len(snapshot_after.future) == 0 + assert len(snapshot_after.running) == 0 + + +async def test_clear_with_scheduled_tasks(docket: Docket, the_task: AsyncMock): + """Should clear scheduled future tasks from the queue""" + docket.register(the_task) + + future = datetime.now(timezone.utc) + timedelta(seconds=60) + await docket.add(the_task, when=future)("arg1") + await docket.add(the_task, when=future + timedelta(seconds=1))("arg2") + + snapshot_before = await docket.snapshot() + assert len(snapshot_before.future) == 2 + + result = await docket.clear() + assert result == 2 + + snapshot_after = await docket.snapshot() + assert len(snapshot_after.future) == 0 + assert len(snapshot_after.running) == 0 + + +async def test_clear_with_mixed_tasks( + docket: Docket, the_task: AsyncMock, another_task: AsyncMock +): + """Should clear both immediate and scheduled tasks""" + docket.register(the_task) + docket.register(another_task) + + future = datetime.now(timezone.utc) + timedelta(seconds=60) + + await docket.add(the_task)("immediate1") + await docket.add(another_task)("immediate2") + await docket.add(the_task, when=future)("scheduled1") + await docket.add(another_task, when=future + timedelta(seconds=1))("scheduled2") + + snapshot_before = await docket.snapshot() + assert len(snapshot_before.future) == 4 + + result = await docket.clear() + assert result == 4 + + snapshot_after = await docket.snapshot() + assert len(snapshot_after.future) == 0 + assert len(snapshot_after.running) == 0 + + +async def test_clear_with_parked_tasks(docket: Docket, the_task: AsyncMock): + """Should clear parked tasks (tasks with specific keys)""" + docket.register(the_task) + + await docket.add(the_task, key="task1")("arg1") + await docket.add(the_task, key="task2")("arg2") + + snapshot_before = await docket.snapshot() + assert len(snapshot_before.future) == 2 + + result = await docket.clear() + assert result == 2 + + snapshot_after = await docket.snapshot() + assert len(snapshot_after.future) == 0 + + +async def test_clear_preserves_strikes(docket: Docket, the_task: AsyncMock): + """Should not affect strikes when clearing""" + docket.register(the_task) + + await docket.strike("the_task") + await docket.add(the_task)("arg1") + + # Check that the task wasn't scheduled due to the strike + snapshot_before = await docket.snapshot() + assert len(snapshot_before.future) == 0 # Task was stricken, so not scheduled + + result = await docket.clear() + assert result == 0 # Nothing to clear since task was stricken + + # Strikes should still be in effect - clear doesn't affect strikes + snapshot_after = await docket.snapshot() + assert len(snapshot_after.future) == 0 + + +async def test_clear_returns_total_count(docket: Docket, the_task: AsyncMock): + """Should return the total number of tasks cleared""" + docket.register(the_task) + + future = datetime.now(timezone.utc) + timedelta(seconds=60) + + await docket.add(the_task)("immediate1") + await docket.add(the_task)("immediate2") + await docket.add(the_task, when=future)("scheduled1") + await docket.add(the_task, key="keyed1")("keyed1") + + result = await docket.clear() + assert result == 4 + + +async def test_clear_no_redis_key_leaks(docket: Docket, the_task: AsyncMock): + """Should not leak Redis keys when clearing tasks""" + docket.register(the_task) + + await docket.add(the_task)("immediate1") + await docket.add(the_task)("immediate2") + await docket.add(the_task, key="keyed1")("keyed_task") + + future = datetime.now(timezone.utc) + timedelta(seconds=60) + await docket.add(the_task, when=future)("scheduled1") + await docket.add(the_task, when=future + timedelta(seconds=1))("scheduled2") + + async with docket.redis() as r: + keys_before = cast(list[str], await r.keys("*")) # type: ignore + keys_before_count = len(keys_before) + + result = await docket.clear() + assert result == 5 + + async with docket.redis() as r: + keys_after = cast(list[str], await r.keys("*")) # type: ignore + keys_after_count = len(keys_after) + + assert keys_after_count <= keys_before_count + + snapshot = await docket.snapshot() + assert len(snapshot.future) == 0 + assert len(snapshot.running) == 0