Skip to content

Retry Failed Job #1325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/howto/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ advanced/cancellation
advanced/queueing_locks
advanced/cron
advanced/retry
advanced/retry_failed
advanced/middleware
advanced/events
advanced/sync_defer
Expand Down
25 changes: 25 additions & 0 deletions docs/howto/advanced/retry_failed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Retry a Failed Job Manually

Sometimes a manual retry, for instance, after we fix an integration's configuration, can be practical.
This is why the job_manager offers an API to do so. Retrying a `failed` job will set the status of the job
back to `todo` while keeping the history of events in place. The action of retrying a failed job,
is also recording a new Event of type `retried`.

:warning: if the job is not `failed`, the method will raise an error.

## Retry a failed job programatically

```python
app.job_manager.retry_failed_job_by_id(
job.id, job.priority, job.queue_name, job.lock
)
# or by using the async method
await app.job_manager.retry_failed_job_by_id_async(
job.id, job.priority, job.queue_name, job.lock
)
```

## For django users

An admin action `Retry Failed Job` can also be invoked from the table view of the
Procrastinate Jobs.
33 changes: 32 additions & 1 deletion procrastinate/contrib/django/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

import json

from django.apps import apps
from django.contrib import admin
from django.db.models import Prefetch
from django.db.models import Prefetch, QuerySet
from django.http.request import HttpRequest
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.html import format_html
from django.utils.safestring import mark_safe

from procrastinate import App
from procrastinate.contrib.django.apps import ProcrastinateConfig
from procrastinate.jobs import Status

from . import models

JOB_STATUS_EMOJI_MAPPING = {
Expand Down Expand Up @@ -128,3 +134,28 @@ def summary(self, instance: models.ProcrastinateJob) -> str:
).strip()
)
return ""

@admin.action(description="Retry Failed Job")
def retry(self, request: HttpRequest, queryset: QuerySet[models.ProcrastinateJob]):
app_config: ProcrastinateConfig = apps.get_app_config("procrastinate") # pyright: ignore [reportAssignmentType]
p_app: App = app_config.app
for job in queryset.filter(status=Status.FAILED.value):
p_app.job_manager.retry_failed_job_by_id(
job.id, job.priority, job.queue_name, job.lock
)

@admin.action(description="Cancel Job (only 'todo' jobs)")
def cancel(self, request: HttpRequest, queryset: QuerySet[models.ProcrastinateJob]):
app_config: ProcrastinateConfig = apps.get_app_config("procrastinate") # pyright: ignore [reportAssignmentType]
p_app: App = app_config.app
for job in queryset.filter(status=Status.TODO.value):
p_app.job_manager.cancel_job_by_id(job.id, abort=False)

@admin.action(description="Abort Job (includes 'todo' & 'doing' jobs)")
def abort(self, request: HttpRequest, queryset: QuerySet[models.ProcrastinateJob]):
app_config: ProcrastinateConfig = apps.get_app_config("procrastinate") # pyright: ignore [reportAssignmentType]
p_app: App = app_config.app
for job in queryset.filter(status__in=(Status.TODO.value, Status.DOING.value)):
p_app.job_manager.cancel_job_by_id(job.id, abort=True)

actions = [retry, cancel, abort]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.1.6 on 2025-02-12 18:07
from __future__ import annotations

from django.db import migrations

from .. import migrations_utils


class Migration(migrations.Migration):
dependencies = [
("procrastinate", "0035_post_add_heartbeat"),
]

operations = [
migrations_utils.RunProcrastinateSQL(
name="03.02.00_01_add_retry_failed_job_procedure.sql"
),
]
80 changes: 80 additions & 0 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,86 @@ def retry_job_by_id(
new_lock=lock,
)

async def retry_failed_job(
self,
job: jobs.Job,
priority: int | None = None,
queue: str | None = None,
lock: str | None = None,
) -> None:
"""
Indicates that a failed job should be retried later.

Parameters
----------
job:
priority:
If set, the job will be retried with this priority. If not set, the priority
remains unchanged.
queue:
If set, the job will be retried on this queue. If not set, the queue remains
unchanged.
lock:
If set, the job will be retried with this lock. If not set, the lock remains
unchanged.
"""
assert job.id # TODO remove this
await self.retry_failed_job_by_id_async(
job_id=job.id,
priority=priority,
queue=queue,
lock=lock,
)

async def retry_failed_job_by_id_async(
self,
job_id: int,
priority: int | None = None,
queue: str | None = None,
lock: str | None = None,
) -> None:
"""
Indicates that a job should be retried later.

Parameters
----------
job_id:
priority:
If set, the job will be retried with this priority. If not set, the priority
remains unchanged.
queue:
If set, the job will be retried on this queue. If not set, the queue remains
unchanged.
lock:
If set, the job will be retried with this lock. If not set, the lock remains
unchanged.
"""
await self.connector.execute_query_async(
query=sql.queries["retry_failed_job"],
job_id=job_id,
new_priority=priority,
new_queue_name=queue,
new_lock=lock,
)

def retry_failed_job_by_id(
self,
job_id: int,
priority: int | None = None,
queue: str | None = None,
lock: str | None = None,
) -> None:
"""
Sync version of `retry_failed_job_by_id_async`.
"""
self.connector.get_sync_connector().execute_query(
query=sql.queries["retry_failed_job"],
job_id=job_id,
new_priority=priority,
new_queue_name=queue,
new_lock=lock,
)

async def listen_for_jobs(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
-- Append new event type to reflect transition from failed -> todo
ALTER TYPE procrastinate_job_event_type ADD VALUE 'retried' AFTER 'scheduled';

-- Procedure to retry failed jobs
CREATE FUNCTION procrastinate_retry_failed_job_v1(
job_id bigint,
new_priority integer,
new_queue_name character varying,
new_lock character varying
) RETURNS void LANGUAGE plpgsql AS $$
DECLARE
_job_id bigint;
BEGIN
UPDATE procrastinate_jobs
SET status = 'todo'::procrastinate_job_status,
attempts = attempts + 1,
priority = COALESCE(new_priority, priority),
queue_name = COALESCE(new_queue_name, queue_name),
lock = COALESCE(new_lock, lock)
WHERE id = job_id AND status = 'failed'
RETURNING id INTO _job_id;

IF _job_id IS NULL THEN
RAISE 'Job was not found or not in "failed" status (job id: %)', job_id;
END IF;
END;
$$;

DROP TRIGGER IF EXISTS procrastinate_trigger_status_events_update_v1 ON procrastinate_jobs;
DROP FUNCTION IF EXISTS public.procrastinate_trigger_function_status_events_update_v1;

CREATE FUNCTION public.procrastinate_trigger_function_status_events_update_v1()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
WITH t AS (
SELECT CASE
WHEN OLD.status = 'todo'::procrastinate_job_status
AND NEW.status = 'doing'::procrastinate_job_status
THEN 'started'::procrastinate_job_event_type
WHEN OLD.status = 'doing'::procrastinate_job_status
AND NEW.status = 'todo'::procrastinate_job_status
THEN 'deferred_for_retry'::procrastinate_job_event_type
WHEN OLD.status = 'doing'::procrastinate_job_status
AND NEW.status = 'failed'::procrastinate_job_status
THEN 'failed'::procrastinate_job_event_type
WHEN OLD.status = 'doing'::procrastinate_job_status
AND NEW.status = 'succeeded'::procrastinate_job_status
THEN 'succeeded'::procrastinate_job_event_type
WHEN OLD.status = 'todo'::procrastinate_job_status
AND (
NEW.status = 'cancelled'::procrastinate_job_status
OR NEW.status = 'failed'::procrastinate_job_status
OR NEW.status = 'succeeded'::procrastinate_job_status
)
THEN 'cancelled'::procrastinate_job_event_type
WHEN OLD.status = 'doing'::procrastinate_job_status
AND NEW.status = 'aborted'::procrastinate_job_status
THEN 'aborted'::procrastinate_job_event_type
WHEN OLD.status = 'failed'::procrastinate_job_status
AND NEW.status = 'todo'::procrastinate_job_status
THEN 'retried'::procrastinate_job_event_type
ELSE NULL
END as event_type
)
INSERT INTO procrastinate_events(job_id, type)
SELECT NEW.id, t.event_type
FROM t
WHERE t.event_type IS NOT NULL;
RETURN NEW;
END;
$function$;

CREATE TRIGGER procrastinate_trigger_status_events_update_v1
AFTER UPDATE OF status ON procrastinate_jobs
FOR EACH ROW
EXECUTE PROCEDURE procrastinate_trigger_function_status_events_update_v1();
4 changes: 4 additions & 0 deletions procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ SELECT status FROM procrastinate_jobs WHERE id = %(job_id)s;
-- Retry a job, changing it from "doing" to "todo"
SELECT procrastinate_retry_job_v1(%(job_id)s, %(retry_at)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s);

-- retry_failed_job --
-- Retry a failed job, changing it from "failed" to "todo"
SELECT procrastinate_retry_failed_job_v1(%(job_id)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s);

-- listen_queue --
-- In this one, the argument is an identifier, shoud not be escaped the same way
LISTEN {channel_name};
Expand Down
30 changes: 29 additions & 1 deletion procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ CREATE TYPE procrastinate_job_event_type AS ENUM (
'cancelled', -- todo -> cancelled
'abort_requested', -- not a state transition, but set in a separate field
'aborted', -- doing -> aborted (only allowed when abort_requested field is set)
'scheduled' -- not a state transition, but recording when a task is scheduled for
'scheduled', -- not a state transition, but recording when a task is scheduled for
'retried' -- Manually retried failed job
);

-- Tables
Expand Down Expand Up @@ -306,6 +307,30 @@ BEGIN
END;
$$;

CREATE FUNCTION procrastinate_retry_failed_job_v1(
job_id bigint,
new_priority integer,
new_queue_name character varying,
new_lock character varying
) RETURNS void LANGUAGE plpgsql AS $$
DECLARE
_job_id bigint;
BEGIN
UPDATE procrastinate_jobs
SET status = 'todo'::procrastinate_job_status,
attempts = attempts + 1,
priority = COALESCE(new_priority, priority),
queue_name = COALESCE(new_queue_name, queue_name),
lock = COALESCE(new_lock, lock)
WHERE id = job_id AND status = 'failed'
RETURNING id INTO _job_id;

IF _job_id IS NULL THEN
RAISE 'Job was not found or not in "failed" status (job id: %)', job_id;
END IF;
END;
$$;

CREATE FUNCTION procrastinate_notify_queue_job_inserted_v1()
RETURNS trigger
LANGUAGE plpgsql
Expand Down Expand Up @@ -374,6 +399,9 @@ BEGIN
WHEN OLD.status = 'doing'::procrastinate_job_status
AND NEW.status = 'aborted'::procrastinate_job_status
THEN 'aborted'::procrastinate_job_event_type
WHEN OLD.status = 'failed'::procrastinate_job_status
AND NEW.status = 'todo'::procrastinate_job_status
THEN 'retried'::procrastinate_job_event_type
ELSE NULL
END as event_type
)
Expand Down
11 changes: 11 additions & 0 deletions procrastinate/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,14 @@ async def prune_stalled_workers_all(self, seconds_since_heartbeat):
job["worker_id"] = None

return pruned_workers

async def retry_failed_job_run(
self, job_id, new_priority, new_queue_name, new_lock
):
job_row = self.jobs[job_id]
job_row["status"] = "todo"
job_row["attempts"] += 1
job_row["priority"] = new_priority
job_row["queue_name"] = new_queue_name
job_row["lock"] = new_lock
self.events[job_id].append({"type": "retried", "at": utils.utcnow()})
36 changes: 36 additions & 0 deletions tests/unit/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,42 @@ def test_retry_job_by_id(job_manager, connector, job_factory, dt):
)


async def test_retry_failed_job_by_id_async(job_manager, connector, job_factory, dt):
job = await job_manager.defer_job_async(job=job_factory())

await job_manager.retry_failed_job_by_id_async(
job_id=job.id, priority=7, queue="some_queue", lock="some_lock"
)

assert connector.queries[-1] == (
"retry_failed_job",
{
"job_id": 1,
"new_priority": 7,
"new_queue_name": "some_queue",
"new_lock": "some_lock",
},
)


def test_retry_failed_job_by_id(job_manager, connector, job_factory, dt):
job = job_manager.defer_job(job=job_factory())

job_manager.retry_failed_job_by_id(
job_id=job.id, priority=7, queue="some_queue", lock="some_lock"
)

assert connector.queries[-1] == (
"retry_failed_job",
{
"job_id": 1,
"new_priority": 7,
"new_queue_name": "some_queue",
"new_lock": "some_lock",
},
)


async def test_list_jobs_async(job_manager, job_factory):
job = await job_manager.defer_job_async(job=job_factory())

Expand Down
Loading