Skip to content

manual retry API#507

Draft
janbjorge wants to merge 1 commit intomainfrom
consolidate-manual-retry-docs
Draft

manual retry API#507
janbjorge wants to merge 1 commit intomainfrom
consolidate-manual-retry-docs

Conversation

@janbjorge
Copy link
Owner

@janbjorge janbjorge commented Dec 13, 2025

Adds a Manual Retry API that allows retrieving failed jobs from the log table and re-enqueueing them on demand. This is useful when jobs should execute once and the client decides whether to retry.

Closes #506

Example

# Jobs run once with no automatic retries
@pgq.entrypoint("process_order", retry_timer=timedelta(0))
async def process_order(job: Job) -> None:
    await do_order_processing(job.payload)

# Later, review and retry failed jobs
failed_jobs = await queries.get_failed_jobs(entrypoint="process_order")

for log_entry in failed_jobs:
    if should_retry(log_entry):
        new_job_id = await queries.retry_failed_job(log_entry.id)

- Expand Manual Retry API section in pgqueuer.md with full API reference
- Include all methods: get_failed_jobs, get_log_entry, retry_failed_job, retry_failed_jobs
- Add error details access and best practices
- Remove separate manual-retry.md page
- Update index.rst toctree
@janbjorge janbjorge changed the title docs: consolidate manual retry API into main documentation manual retry API Dec 13, 2025
@janbjorge janbjorge requested a review from Copilot December 14, 2025 10:29
@janbjorge janbjorge self-assigned this Dec 14, 2025
@janbjorge janbjorge added the enhancement New feature or request label Dec 14, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a Manual Retry API that enables retrieving failed jobs from the log table and re-enqueueing them on demand. This is designed for scenarios where jobs should execute once without automatic retries, with the client deciding whether to retry based on business logic or human review.

Key Changes:

  • Added database schema columns (payload, headers, retried_as) to the log table to support retry functionality
  • Implemented new query methods: get_failed_jobs(), get_log_entry(), retry_failed_job(), and retry_failed_jobs()
  • Added comprehensive test coverage for the manual retry functionality including pagination, filtering, and error handling

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
test/test_queries.py Updated test fixture to include new log table columns (id, payload, headers, retried_as)
test/test_manual_retry.py Added comprehensive test suite for manual retry API functionality
pgqueuer/queries.py Implemented four new methods for retrieving and retrying failed jobs
pgqueuer/qb.py Added SQL queries for manual retry operations and updated schema with new columns
pgqueuer/models.py Extended Log model with id, payload, headers, and retried_as fields
docs/pgqueuer.md Added documentation section explaining manual retry API usage and best practices

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +813 to +822
result = []
for log_id in log_ids:
job_id = await self.retry_failed_job(
log_id,
priority=priority,
execute_after=execute_after,
)
if job_id is not None:
result.append(job_id)
return result
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry_failed_jobs method performs sequential database operations in a loop. For bulk retry operations, this could lead to poor performance. Consider using asyncio.gather() or a single SQL query with UNNEST() to process all retries in parallel or as a single batch operation.

Suggested change
result = []
for log_id in log_ids:
job_id = await self.retry_failed_job(
log_id,
priority=priority,
execute_after=execute_after,
)
if job_id is not None:
result.append(job_id)
return result
tasks = [
self.retry_failed_job(
log_id,
priority=priority,
execute_after=execute_after,
)
for log_id in log_ids
]
results = await asyncio.gather(*tasks)
return [job_id for job_id in results if job_id is not None]

Copilot uses AI. Check for mistakes.
str: The SQL query string for fetching failed jobs.
"""
return f"""
SELECT * FROM {self.settings.queue_table_log}
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using SELECT * in queries can lead to maintenance issues if columns are added to the table. Consider explicitly listing the columns needed to avoid potential issues with column order changes or unexpected data being returned.

Copilot uses AI. Check for mistakes.
Comment on lines +1051 to +1052
return f"""
SELECT * FROM {self.settings.queue_table_log}
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using SELECT * in queries can lead to maintenance issues if columns are added to the table. Consider explicitly listing the columns needed to avoid potential issues with column order changes or unexpected data being returned.

Suggested change
return f"""
SELECT * FROM {self.settings.queue_table_log}
# Explicitly list columns to avoid SELECT *
return f"""
SELECT id, entrypoint, status, retried_as, priority, payload, headers, created_at, updated_at
FROM {self.settings.queue_table_log}

Copilot uses AI. Check for mistakes.
@janbjorge janbjorge closed this Feb 16, 2026
@janbjorge janbjorge reopened this Feb 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments