-
-
Notifications
You must be signed in to change notification settings - Fork 78
Description
I couldn’t find a built-in way to flush all jobs (remove every row in procrastinate_jobs) via the API or CLI.
My first thought was to run a raw SQL command:
Truncate Table procrastinate_jobs…but I wasn’t sure whether that might leave Procrastinate in an inconsistent state (e.g. other tables, sequences, or bookkeeping).
To be safe, I instead put together a relatively slow helper function using only the official library calls (listing jobs, cancelling/aborting, retrying stalled jobs, then deleting finished ones). It works, but it’s not very efficient compared to a single operation.
👉 Feature request: add a supported way (API method and/or CLI flag) to “flush all jobs,” i.e. remove every job record across all statuses, in a way that guarantees internal consistency.
This would be useful for:
• local development / testing, when you want to start with a clean slate
• CI pipelines that need to reset the state quickly
• avoiding direct SQL in application code
import asyncio
from procrastinate.jobs import Status
from wat_backend.procrastinate_app import app
import logging
async def flush_all_jobs(
grace_seconds: float = 5.0,
poll_interval: float = 0.5,
) -> None:
"""
Flush all Procrastinate jobs using only the Procrastinate API (no direct SQL).
Args:
grace_seconds (float): Maximum time in seconds to wait for running workers
to acknowledge abort requests. Set to 0 to skip waiting.
poll_interval (float): Interval in seconds between checks while waiting
for running jobs to stop during the grace period.
Returns:
None
Notes:
- This function opens and closes the app context internally.
- It tends to run faster when there are no operational workers connected,
because no grace-period waiting is required.
- It logs a warning if any jobs remain in any status afterward. (should never happen)
"""
await app.open_async()
try:
jm = app.job_manager
# 1) Cancel & delete queued jobs (TO DO)
todos = await jm.list_jobs_async(status=Status.TODO.value)
for job in todos:
await jm.cancel_job_by_id_async(job.id, delete_job=True)
# 2) Request abort for running jobs (DOING)
doing = await jm.list_jobs_async(status=Status.DOING.value)
for job in doing:
await jm.cancel_job_by_id_async(job.id, abort=True)
# 3) Grace period: let user's workers acknowledge aborts
if doing and grace_seconds > 0:
loop = asyncio.get_running_loop()
deadline = loop.time() + grace_seconds
while loop.time() < deadline:
still_doing = await jm.list_jobs_async(status=Status.DOING.value)
if not still_doing:
break
await asyncio.sleep(poll_interval)
# 4) Handle stalled jobs, this can happen when workers don't acknowledge aborts
# a work-around is to requeue them and then cancel & delete
stalled = await jm.get_stalled_jobs()
for job in stalled:
await jm.retry_job(job)
for job in stalled:
await jm.cancel_job_by_id_async(job.id, delete_job=True)
# 5) Delete all finished jobs (older than "now" → nb_hours=0)
await jm.delete_old_jobs(
nb_hours=0,
include_failed=True,
include_cancelled=True,
include_aborted=True,
)
# 6) Final check: are there any jobs left in *any* status?
# This should never trigger; did this function miss an edge case, or does Procrastinate have a bug?
total_leftover = 0
for st in Status:
jobs = await jm.list_jobs_async(status=st.value)
total_leftover += len(jobs)
if total_leftover:
logging.warning(f"Flush incomplete — {total_leftover} jobs remain")
finally:
await app.close_async()