diff --git a/docs/howto/advanced.md b/docs/howto/advanced.md index 51884d9b5..55e25742f 100644 --- a/docs/howto/advanced.md +++ b/docs/howto/advanced.md @@ -12,6 +12,7 @@ advanced/cancellation advanced/queueing_locks advanced/cron advanced/retry +advanced/retry_failed advanced/middleware advanced/events advanced/sync_defer diff --git a/docs/howto/advanced/retry_failed.md b/docs/howto/advanced/retry_failed.md new file mode 100644 index 000000000..ffe0cd633 --- /dev/null +++ b/docs/howto/advanced/retry_failed.md @@ -0,0 +1,25 @@ +# Retry a Failed Job Manually + +Sometimes a manual retry, for instance, after we fix an integration's configuration, can be practical. +This is why the job_manager offers an API to do so. Retrying a `failed` job will set the status of the job +back to `todo` while keeping the history of events in place. The action of retrying a failed job, +is also recording a new Event of type `retried`. + +:warning: if the job is not `failed`, the method will raise an error. + +## Retry a failed job programatically + +```python +app.job_manager.retry_failed_job_by_id( + job.id, job.priority, job.queue_name, job.lock +) +# or by using the async method +await app.job_manager.retry_failed_job_by_id_async( + job.id, job.priority, job.queue_name, job.lock +) +``` + +## For django users + +An admin action `Retry Failed Job` can also be invoked from the table view of the +Procrastinate Jobs. diff --git a/procrastinate/contrib/django/admin.py b/procrastinate/contrib/django/admin.py index 1abd662b0..08a46ca10 100644 --- a/procrastinate/contrib/django/admin.py +++ b/procrastinate/contrib/django/admin.py @@ -2,13 +2,19 @@ import json +from django.apps import apps from django.contrib import admin -from django.db.models import Prefetch +from django.db.models import Prefetch, QuerySet +from django.http.request import HttpRequest from django.template.loader import render_to_string from django.utils import timezone from django.utils.html import format_html from django.utils.safestring import mark_safe +from procrastinate import App +from procrastinate.contrib.django.apps import ProcrastinateConfig +from procrastinate.jobs import Status + from . import models JOB_STATUS_EMOJI_MAPPING = { @@ -128,3 +134,28 @@ def summary(self, instance: models.ProcrastinateJob) -> str: ).strip() ) return "" + + @admin.action(description="Retry Failed Job") + def retry(self, request: HttpRequest, queryset: QuerySet[models.ProcrastinateJob]): + app_config: ProcrastinateConfig = apps.get_app_config("procrastinate") # pyright: ignore [reportAssignmentType] + p_app: App = app_config.app + for job in queryset.filter(status=Status.FAILED.value): + p_app.job_manager.retry_failed_job_by_id( + job.id, job.priority, job.queue_name, job.lock + ) + + @admin.action(description="Cancel Job (only 'todo' jobs)") + def cancel(self, request: HttpRequest, queryset: QuerySet[models.ProcrastinateJob]): + app_config: ProcrastinateConfig = apps.get_app_config("procrastinate") # pyright: ignore [reportAssignmentType] + p_app: App = app_config.app + for job in queryset.filter(status=Status.TODO.value): + p_app.job_manager.cancel_job_by_id(job.id, abort=False) + + @admin.action(description="Abort Job (includes 'todo' & 'doing' jobs)") + def abort(self, request: HttpRequest, queryset: QuerySet[models.ProcrastinateJob]): + app_config: ProcrastinateConfig = apps.get_app_config("procrastinate") # pyright: ignore [reportAssignmentType] + p_app: App = app_config.app + for job in queryset.filter(status__in=(Status.TODO.value, Status.DOING.value)): + p_app.job_manager.cancel_job_by_id(job.id, abort=True) + + actions = [retry, cancel, abort] diff --git a/procrastinate/contrib/django/migrations/0036_retry_failed_job.py b/procrastinate/contrib/django/migrations/0036_retry_failed_job.py new file mode 100644 index 000000000..8b8e5e34a --- /dev/null +++ b/procrastinate/contrib/django/migrations/0036_retry_failed_job.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.6 on 2025-02-12 18:07 +from __future__ import annotations + +from django.db import migrations + +from .. import migrations_utils + + +class Migration(migrations.Migration): + dependencies = [ + ("procrastinate", "0035_post_add_heartbeat"), + ] + + operations = [ + migrations_utils.RunProcrastinateSQL( + name="03.02.00_01_add_retry_failed_job_procedure.sql" + ), + ] diff --git a/procrastinate/manager.py b/procrastinate/manager.py index a39a11d4f..ea397c429 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -496,6 +496,86 @@ def retry_job_by_id( new_lock=lock, ) + async def retry_failed_job( + self, + job: jobs.Job, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, + ) -> None: + """ + Indicates that a failed job should be retried later. + + Parameters + ---------- + job: + priority: + If set, the job will be retried with this priority. If not set, the priority + remains unchanged. + queue: + If set, the job will be retried on this queue. If not set, the queue remains + unchanged. + lock: + If set, the job will be retried with this lock. If not set, the lock remains + unchanged. + """ + assert job.id # TODO remove this + await self.retry_failed_job_by_id_async( + job_id=job.id, + priority=priority, + queue=queue, + lock=lock, + ) + + async def retry_failed_job_by_id_async( + self, + job_id: int, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, + ) -> None: + """ + Indicates that a job should be retried later. + + Parameters + ---------- + job_id: + priority: + If set, the job will be retried with this priority. If not set, the priority + remains unchanged. + queue: + If set, the job will be retried on this queue. If not set, the queue remains + unchanged. + lock: + If set, the job will be retried with this lock. If not set, the lock remains + unchanged. + """ + await self.connector.execute_query_async( + query=sql.queries["retry_failed_job"], + job_id=job_id, + new_priority=priority, + new_queue_name=queue, + new_lock=lock, + ) + + def retry_failed_job_by_id( + self, + job_id: int, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, + ) -> None: + """ + Sync version of `retry_failed_job_by_id_async`. + """ + self.connector.get_sync_connector().execute_query( + query=sql.queries["retry_failed_job"], + job_id=job_id, + new_priority=priority, + new_queue_name=queue, + new_lock=lock, + ) + async def listen_for_jobs( self, *, diff --git a/procrastinate/sql/migrations/03.02.00_01_add_retry_failed_job_procedure.sql b/procrastinate/sql/migrations/03.02.00_01_add_retry_failed_job_procedure.sql new file mode 100644 index 000000000..bcfa50eda --- /dev/null +++ b/procrastinate/sql/migrations/03.02.00_01_add_retry_failed_job_procedure.sql @@ -0,0 +1,78 @@ +-- Append new event type to reflect transition from failed -> todo +ALTER TYPE procrastinate_job_event_type ADD VALUE 'retried' AFTER 'scheduled'; + +-- Procedure to retry failed jobs +CREATE FUNCTION procrastinate_retry_failed_job_v1( + job_id bigint, + new_priority integer, + new_queue_name character varying, + new_lock character varying +) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + _job_id bigint; +BEGIN + UPDATE procrastinate_jobs + SET status = 'todo'::procrastinate_job_status, + attempts = attempts + 1, + priority = COALESCE(new_priority, priority), + queue_name = COALESCE(new_queue_name, queue_name), + lock = COALESCE(new_lock, lock) + WHERE id = job_id AND status = 'failed' + RETURNING id INTO _job_id; + + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "failed" status (job id: %)', job_id; + END IF; +END; +$$; + +DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update_v1 ON procrastinate_jobs; +DROP FUNCTION IF EXISTS public.procrastinate_trigger_function_status_events_update_v1; + +CREATE FUNCTION public.procrastinate_trigger_function_status_events_update_v1() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + WITH t AS ( + SELECT CASE + WHEN OLD.status = 'todo'::procrastinate_job_status + AND NEW.status = 'doing'::procrastinate_job_status + THEN 'started'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'deferred_for_retry'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'failed'::procrastinate_job_status + THEN 'failed'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'succeeded'::procrastinate_job_status + THEN 'succeeded'::procrastinate_job_event_type + WHEN OLD.status = 'todo'::procrastinate_job_status + AND ( + NEW.status = 'cancelled'::procrastinate_job_status + OR NEW.status = 'failed'::procrastinate_job_status + OR NEW.status = 'succeeded'::procrastinate_job_status + ) + THEN 'cancelled'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'aborted'::procrastinate_job_status + THEN 'aborted'::procrastinate_job_event_type + WHEN OLD.status = 'failed'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'retried'::procrastinate_job_event_type + ELSE NULL + END as event_type + ) + INSERT INTO procrastinate_events(job_id, type) + SELECT NEW.id, t.event_type + FROM t + WHERE t.event_type IS NOT NULL; + RETURN NEW; +END; +$function$; + +CREATE TRIGGER procrastinate_trigger_status_events_update_v1 + AFTER UPDATE OF status ON procrastinate_jobs + FOR EACH ROW + EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v1(); diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 8748f8a64..271b7d29f 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -80,6 +80,10 @@ SELECT status FROM procrastinate_jobs WHERE id = %(job_id)s; -- Retry a job, changing it from "doing" to "todo" SELECT procrastinate_retry_job_v1(%(job_id)s, %(retry_at)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s); +-- retry_failed_job -- +-- Retry a failed job, changing it from "failed" to "todo" +SELECT procrastinate_retry_failed_job_v1(%(job_id)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s); + -- listen_queue -- -- In this one, the argument is an identifier, shoud not be escaped the same way LISTEN {channel_name}; diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 890698e7d..92403ff0e 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -23,7 +23,8 @@ CREATE TYPE procrastinate_job_event_type AS ENUM ( 'cancelled', -- todo -> cancelled 'abort_requested', -- not a state transition, but set in a separate field 'aborted', -- doing -> aborted (only allowed when abort_requested field is set) - 'scheduled' -- not a state transition, but recording when a task is scheduled for + 'scheduled', -- not a state transition, but recording when a task is scheduled for + 'retried' -- Manually retried failed job ); -- Tables @@ -306,6 +307,30 @@ BEGIN END; $$; +CREATE FUNCTION procrastinate_retry_failed_job_v1( + job_id bigint, + new_priority integer, + new_queue_name character varying, + new_lock character varying +) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + _job_id bigint; +BEGIN + UPDATE procrastinate_jobs + SET status = 'todo'::procrastinate_job_status, + attempts = attempts + 1, + priority = COALESCE(new_priority, priority), + queue_name = COALESCE(new_queue_name, queue_name), + lock = COALESCE(new_lock, lock) + WHERE id = job_id AND status = 'failed' + RETURNING id INTO _job_id; + + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "failed" status (job id: %)', job_id; + END IF; +END; +$$; + CREATE FUNCTION procrastinate_notify_queue_job_inserted_v1() RETURNS trigger LANGUAGE plpgsql @@ -374,6 +399,9 @@ BEGIN WHEN OLD.status = 'doing'::procrastinate_job_status AND NEW.status = 'aborted'::procrastinate_job_status THEN 'aborted'::procrastinate_job_event_type + WHEN OLD.status = 'failed'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'retried'::procrastinate_job_event_type ELSE NULL END as event_type ) diff --git a/procrastinate/testing.py b/procrastinate/testing.py index ec62a38e0..066e46715 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -415,3 +415,14 @@ async def prune_stalled_workers_all(self, seconds_since_heartbeat): job["worker_id"] = None return pruned_workers + + async def retry_failed_job_run( + self, job_id, new_priority, new_queue_name, new_lock + ): + job_row = self.jobs[job_id] + job_row["status"] = "todo" + job_row["attempts"] += 1 + job_row["priority"] = new_priority + job_row["queue_name"] = new_queue_name + job_row["lock"] = new_lock + self.events[job_id].append({"type": "retried", "at": utils.utcnow()}) diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index 489be8118..3c7a985bf 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -515,6 +515,42 @@ def test_retry_job_by_id(job_manager, connector, job_factory, dt): ) +async def test_retry_failed_job_by_id_async(job_manager, connector, job_factory, dt): + job = await job_manager.defer_job_async(job=job_factory()) + + await job_manager.retry_failed_job_by_id_async( + job_id=job.id, priority=7, queue="some_queue", lock="some_lock" + ) + + assert connector.queries[-1] == ( + "retry_failed_job", + { + "job_id": 1, + "new_priority": 7, + "new_queue_name": "some_queue", + "new_lock": "some_lock", + }, + ) + + +def test_retry_failed_job_by_id(job_manager, connector, job_factory, dt): + job = job_manager.defer_job(job=job_factory()) + + job_manager.retry_failed_job_by_id( + job_id=job.id, priority=7, queue="some_queue", lock="some_lock" + ) + + assert connector.queries[-1] == ( + "retry_failed_job", + { + "job_id": 1, + "new_priority": 7, + "new_queue_name": "some_queue", + "new_lock": "some_lock", + }, + ) + + async def test_list_jobs_async(job_manager, job_factory): job = await job_manager.defer_job_async(job=job_factory())