Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions docs/pgqueuer.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,93 @@ By integrating `RetryWithBackoffEntrypointExecutor`, you can build robust workfl

---

### Manual Retry API

For scenarios where you want jobs to execute **once** and let the client decide whether to retry on failure, PgQueuer provides a Manual Retry API. This is useful when:

- You need human review before retrying failed jobs
- External systems track job state using the job ID
- Retry decisions depend on business logic not available at processing time

#### Quick Example

```python
# Jobs run once with no automatic retries
@pgq.entrypoint("process_order", retry_timer=timedelta(0))
async def process_order(job: Job) -> None:
await do_order_processing(job.payload)

# Later, review and retry failed jobs
failed_jobs = await queries.get_failed_jobs(entrypoint="process_order")

for log_entry in failed_jobs:
if should_retry(log_entry):
new_job_id = await queries.retry_failed_job(log_entry.id)
print(f"Job {log_entry.job_id} retried as {new_job_id}")
```

The failed job's payload and headers are preserved in the log table, so the retry uses the exact same data as the original job.

#### API Reference

**`get_failed_jobs`** - Retrieve failed jobs from the log table:

```python
failed = await queries.get_failed_jobs(
entrypoint="process_order", # Filter by entrypoint (optional)
limit=100, # Max results (default: 100)
after_id=last_id, # Cursor for pagination (optional)
)
```

**`get_log_entry`** - Retrieve a specific log entry:

```python
log_entry = await queries.get_log_entry(log_id)
```

**`retry_failed_job`** - Re-enqueue a single failed job:

```python
new_job_id = await queries.retry_failed_job(
log_id,
priority=10, # Override priority (optional)
execute_after=timedelta(minutes=5), # Delay before eligible (optional)
)
```

**`retry_failed_jobs`** - Re-enqueue multiple failed jobs:

```python
new_job_ids = await queries.retry_failed_jobs(
[log.id for log in failed_jobs],
execute_after=timedelta(seconds=5),
)
```

#### Accessing Error Details

The `Log` model's `traceback` field contains exception information:

```python
if log_entry.traceback:
print(f"Exception: {log_entry.traceback.exception_type}")
print(f"Message: {log_entry.traceback.exception_message}")
print(f"Traceback:\n{log_entry.traceback.traceback}")
```

#### Best Practices

1. **Use `log_id`, not `job_id`**: The `job_id` can appear multiple times in the log. Always use the log entry's `id` field for retry operations.

2. **Double retries are prevented**: The API checks `retried_as IS NULL`, so a second retry attempt returns `None`.

3. **Consider delayed retries**: For transient failures, add a delay with `execute_after=timedelta(minutes=5)`.

4. **Schema upgrade required**: Run `await queries.upgrade()` or `pgq upgrade` to add the `payload`, `headers`, and `retried_as` columns to the log table.

---

### Scheduler

Manage recurring tasks with cron-like expressions.
Expand Down
13 changes: 12 additions & 1 deletion pgqueuer/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,15 @@ def sentry_headers(self) -> dict[str, Any] | None:

class Log(BaseModel):
"""
Represents a job with attributes such as ID, priority,
Represents a logged job with attributes such as ID, priority,
creation time, status, entrypoint, and optional payload.

The payload and headers fields are preserved to support manual retry
functionality, allowing failed jobs to be re-enqueued with their
original data.
"""

id: int
created: AwareDatetime
job_id: JobId
status: JOB_STATUS
Expand All @@ -182,6 +187,12 @@ class Log(BaseModel):
BeforeValidator(lambda x: None if x is None else from_json(x)),
]
aggregated: bool
payload: bytes | None = None
headers: Annotated[
dict[str, Any] | None,
BeforeValidator(lambda x: None if x is None else from_json(x)),
] = None
retried_as: int | None = None


###### Statistics ######
Expand Down
109 changes: 102 additions & 7 deletions pgqueuer/qb.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,16 @@ def build_install_query(self) -> str:
priority INT NOT NULL,
entrypoint TEXT NOT NULL,
traceback JSONB DEFAULT NULL,
aggregated BOOLEAN DEFAULT FALSE
aggregated BOOLEAN DEFAULT FALSE,
payload BYTEA,
headers JSONB,
retried_as INTEGER
);
CREATE INDEX {self.settings.queue_table_log}_not_aggregated ON {self.settings.queue_table_log} ((1)) WHERE not aggregated;
CREATE INDEX {self.settings.queue_table_log}_created ON {self.settings.queue_table_log} (created);
CREATE INDEX {self.settings.queue_table_log}_status ON {self.settings.queue_table_log} (status);
CREATE INDEX {self.settings.queue_table_log}_job_id_status ON {self.settings.queue_table_log} (job_id, created DESC);
CREATE INDEX {self.settings.queue_table_log}_retried_as ON {self.settings.queue_table_log} (retried_as) WHERE retried_as IS NOT NULL;

CREATE {durability_policy.statistics_table} TABLE {self.settings.statistics_table} (
id SERIAL PRIMARY KEY,
Expand Down Expand Up @@ -454,6 +458,11 @@ def build_upgrade_queries(self) -> Generator[str, None, None]:
yield f"CREATE UNIQUE INDEX IF NOT EXISTS {self.settings.queue_table}_unique_dedupe_key ON {self.settings.queue_table} (dedupe_key) WHERE ((status IN ('queued', 'picked') AND dedupe_key IS NOT NULL));" # noqa
yield f"CREATE INDEX IF NOT EXISTS {self.settings.queue_table_log}_job_id_status ON {self.settings.queue_table_log} (job_id, created DESC);" # noqa: E501
yield f"ALTER TABLE {self.settings.queue_table} ADD COLUMN IF NOT EXISTS headers JSONB;" # noqa: E501
# Manual retry API: store payload and headers in log table for retry capability
yield f"ALTER TABLE {self.settings.queue_table_log} ADD COLUMN IF NOT EXISTS payload BYTEA;" # noqa: E501
yield f"ALTER TABLE {self.settings.queue_table_log} ADD COLUMN IF NOT EXISTS headers JSONB;" # noqa: E501
yield f"ALTER TABLE {self.settings.queue_table_log} ADD COLUMN IF NOT EXISTS retried_as INTEGER;" # noqa: E501
yield f"CREATE INDEX IF NOT EXISTS {self.settings.queue_table_log}_retried_as ON {self.settings.queue_table_log} (retried_as) WHERE retried_as IS NOT NULL;" # noqa: E501

def build_table_has_column_query(self) -> str:
"""
Expand Down Expand Up @@ -896,16 +905,17 @@ def build_log_job_query(self) -> str:
Constructs an SQL query that deletes specified jobs from the queue table
and inserts corresponding entries into the statistics (log) table.
It captures details such as priority, entrypoint, time in queue,
creation time, and final status. The query uses upsert logic to handle
conflicts and aggregate counts.
creation time, final status, payload, and headers. The query uses upsert
logic to handle conflicts and aggregate counts. Payload and headers are
preserved to support manual retry functionality.

Returns:
str: The SQL query string to log jobs.
"""
return f"""WITH deleted AS (
DELETE FROM {self.settings.queue_table}
WHERE id = ANY($1::integer[])
RETURNING id, entrypoint, priority
RETURNING id, entrypoint, priority, payload, headers
), job_status AS (
SELECT
UNNEST($1::integer[]) AS id,
Expand All @@ -917,7 +927,9 @@ def build_log_job_query(self) -> str:
job_status.status AS status,
job_status.traceback AS traceback,
deleted.entrypoint AS entrypoint,
deleted.priority AS priority
deleted.priority AS priority,
deleted.payload AS payload,
deleted.headers AS headers
FROM job_status
INNER JOIN deleted
ON deleted.id = job_status.id
Expand All @@ -927,9 +939,11 @@ def build_log_job_query(self) -> str:
status,
entrypoint,
priority,
traceback
traceback,
payload,
headers
)
SELECT id, status, entrypoint, priority, traceback FROM merged
SELECT id, status, entrypoint, priority, traceback, payload, headers FROM merged
"""

def build_truncate_log_statistics_query(self) -> str:
Expand Down Expand Up @@ -1006,6 +1020,87 @@ def build_delete_log_query(self) -> str:
def build_fetch_log_query(self) -> str:
return f"SELECT * FROM {self.settings.queue_table_log}"

def build_get_failed_jobs_query(self) -> str:
"""
Generate SQL query to retrieve failed jobs from the log table.

Returns failed jobs (status='exception') that have not been retried yet,
with optional filtering by entrypoint. Results are ordered by id descending
(newest first) and support cursor-based pagination via after_id.

Returns:
str: The SQL query string for fetching failed jobs.
"""
return f"""
SELECT * FROM {self.settings.queue_table_log}
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using SELECT * in queries can lead to maintenance issues if columns are added to the table. Consider explicitly listing the columns needed to avoid potential issues with column order changes or unexpected data being returned.

Copilot uses AI. Check for mistakes.
WHERE status = 'exception'
AND retried_as IS NULL
AND ($1::text[] IS NULL OR entrypoint = ANY($1))
AND ($2::bigint IS NULL OR id < $2)
ORDER BY id DESC
LIMIT $3
"""

def build_get_log_entry_query(self) -> str:
"""
Generate SQL query to retrieve a specific log entry by ID.

Returns:
str: The SQL query string for fetching a log entry.
"""
return f"""
SELECT * FROM {self.settings.queue_table_log}
Comment on lines +1051 to +1052
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using SELECT * in queries can lead to maintenance issues if columns are added to the table. Consider explicitly listing the columns needed to avoid potential issues with column order changes or unexpected data being returned.

Suggested change
return f"""
SELECT * FROM {self.settings.queue_table_log}
# Explicitly list columns to avoid SELECT *
return f"""
SELECT id, entrypoint, status, retried_as, priority, payload, headers, created_at, updated_at
FROM {self.settings.queue_table_log}

Copilot uses AI. Check for mistakes.
WHERE id = $1
"""

def build_retry_from_log_query(self) -> str:
"""
Generate SQL query to re-enqueue a single failed job from the log table.

This query:
1. Selects the log entry (must be 'exception' status and not already retried)
2. Inserts a new job into the queue table with the original payload/headers
3. Updates the log entry with the new job ID (retried_as column)

Args:
$1: log_id (bigint) - The log entry ID to retry
$2: priority (int or NULL) - Override priority, or NULL to use original
$3: execute_after (interval or NULL) - Delay before job is eligible

Returns:
str: The SQL query string for retrying a failed job.
"""
return f"""
WITH log_entry AS (
SELECT id, entrypoint, priority, payload, headers
FROM {self.settings.queue_table_log}
WHERE id = $1
AND status = 'exception'
AND retried_as IS NULL
),
inserted AS (
INSERT INTO {self.settings.queue_table} (
priority, status, entrypoint, payload, headers, execute_after
)
SELECT
COALESCE($2::int, priority),
'queued'::{self.settings.queue_status_type},
entrypoint,
payload,
headers,
NOW() + COALESCE($3::interval, interval '0')
FROM log_entry
RETURNING id
),
updated AS (
UPDATE {self.settings.queue_table_log}
SET retried_as = inserted.id
FROM inserted
WHERE {self.settings.queue_table_log}.id = $1
)
SELECT id FROM inserted
"""

def build_aggregate_log_data_to_statistics_query(self) -> str:
"""
Generate SQL query to aggregate data from the log table and insert
Expand Down
Loading