Skip to content

Conversation

@Kerushani
Copy link
Contributor

JIRA ticket link

Ticket Name

Implementation description

Steps to test

What should reviewers focus on?

Checklist

  • My PR name is descriptive and in imperative tense
  • My commit messages are descriptive and in imperative tense. My commits are atomic and trivial commits are squashed or fixup'd into non-trivial commits
  • I have requested a review from the PL, as well as other devs who have background knowledge on this PR or who will be building on top of this PR

@Kerushani Kerushani changed the title James kerushani/add job queue Add job queue Nov 30, 2025
@Kerushani Kerushani closed this Nov 30, 2025
@Kerushani Kerushani reopened this Dec 4, 2025
@Kerushani Kerushani requested a review from Jam-Cai February 4, 2026 01:26
@Jam-Cai Jam-Cai force-pushed the james-kerushani/add-job-queue branch from 7cad7d7 to 9bc88a5 Compare February 4, 2026 01:51

"""
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
try:
op.execute("ALTER TYPE progressenum ADD VALUE 'QUEUED'")
except Exception:
worker_task.cancel()
with suppress(asyncio.CancelledError):
# Expected during shutdown - task was cancelled gracefully
await worker_task
@Jam-Cai
Copy link
Contributor

Jam-Cai commented Feb 4, 2026

@claude review

@claude
Copy link

claude bot commented Feb 4, 2026

Pull Request Review: Add Job Queue

Overview

This PR implements a database-backed job queue system for processing route generation jobs asynchronously. The implementation includes a worker process, job state management, and comprehensive documentation.

✅ Strengths

  1. Excellent Documentation: The README.md is comprehensive, well-structured, and provides clear guidance on architecture, testing, monitoring, and troubleshooting.

  2. Robust State Management: The job state flow (PENDING → QUEUED → RUNNING → COMPLETED/FAILED) is well-designed and properly tracked with timestamps.

  3. Good Error Handling: Proper try-catch blocks with session rollback and detailed logging throughout.

  4. Database as Queue: Using PostgreSQL with SELECT FOR UPDATE SKIP LOCKED is a solid pattern for this use case - no external dependencies and survives restarts.

  5. Graceful Shutdown: The lifespan handler properly cancels the worker task and suppresses expected CancelledError.

🐛 Issues Found

Critical

  1. Incomplete Core Functionality (job_worker.py:173-199)

    • generate_routes() only has a 10-second sleep stub
    • recover_orphaned_jobs() is completely empty (documented as critical feature)
    • check_for_stuck_jobs() is completely empty (documented as critical feature)

    Impact: The two safety mechanisms (orphan recovery and timeout handling) are non-functional, making the system vulnerable to stuck jobs.

  2. Missing Test Coverage

    • No tests for JobWorker class
    • No tests for the new QUEUED state
    • No tests for the updated enqueue() and update_progress() methods
    • Tests only verify the Job model exists, not the worker behavior

    Impact: Cannot verify core functionality works as designed.

High Priority

  1. Race Condition in Worker Loop (job_worker.py:96-128)

    # Session ends here after finding job_id
    async for session in get_session():
        # ...find job...
        job_id = job.job_id
    
    # Then process_job uses a NEW session
    if job_id:
        await self.process_job(job_id)  # Different session!

    Issue: The job lock from SELECT FOR UPDATE SKIP LOCKED is released when the first session ends. Another worker could theoretically pick up the same job between sessions.

    Fix: Either:

    • Move the entire process into one session, or
    • Immediately update job status to RUNNING within the locked session before releasing
  2. Inconsistent Error Handling (job_service.py:93-98)

    except Exception:
        self.logger.exception("Enqueue failed for job %s", job_id)
        try:
            await self.update_progress(job_id, ProgressEnum.FAILED)
        except Exception:
            self.logger.exception("Failed to mark job %s as FAILED", job_id)

    Issue: If enqueue fails, we mark the job as FAILED. But the job should stay PENDING so it can be retried. FAILED should be reserved for jobs that actually ran and failed.

  3. Missing Migration Downgrade (c967b946e4f9_add_queued_to_progress_enum.py:26)

    def downgrade() -> None:
        pass

    Issue: Removing an enum value in PostgreSQL is complex, but this should at least document why downgrade is a no-op or provide manual steps.

Medium Priority

  1. Timeout Not Configurable Per Job (job_worker.py:151)

    • All jobs use the same 30-minute timeout
    • Some route generation tasks might need different timeouts based on problem size

    Suggestion: Consider adding an optional timeout_minutes field to the Job model.

  2. No Job Priority System

    • Jobs are processed strictly FIFO based on created_at
    • No way to prioritize urgent jobs

    Suggestion: Consider adding a priority field for future enhancement.

  3. Worker Health/Status Endpoint Missing

    • No way to check if worker is running via API
    • No metrics on queue depth, processing rate, etc.

    Suggestion: Add a /health/worker endpoint or include worker status in existing health check.

  4. No Job Cancellation Mechanism

    • Once queued, jobs can't be cancelled by users
    • Long-running jobs can't be manually stopped

    Suggestion: Add a CANCELLED state and cancellation logic.

  5. Polling Interval May Be Too Aggressive (app/__init__.py:125)

    • 5-second poll interval could create unnecessary database load at scale
    • Consider exponential backoff when queue is empty

Low Priority

  1. Logging Inconsistencies

    • Some logs use f-strings: f"Job {job_id} failed: {e}" (job_worker.py:163)
    • Some use %-formatting: "Job %s not found during enqueue" (job_service.py:78)

    Suggestion: Pick one style for consistency. The codebase uses ruff which prefers f-strings.

  2. Documentation/Code Mismatch

    • README says "Jobs persist across application restarts" (true)
    • But doesn't mention that jobs in RUNNING state will be lost unless orphan recovery is implemented

    Suggestion: Update docs to clarify current limitations.

  3. Missing Global Worker Reference Check (app/__init__.py:121)

    global job_worker
    • Global is declared but only used for shutdown
    • Could use a local variable instead since it's only referenced in this scope
  4. Type Hint Could Be More Specific (job_worker.py:122)

    job_id: UUID | None = None
    • job_id is only None initially, then always UUID when used
    • Could restructure to avoid the Optional type

🔒 Security Considerations

Good: No obvious security vulnerabilities

  • Input validation happens via Pydantic models
  • No SQL injection risks (using SQLModel/SQLAlchemy)
  • No direct user input to worker (jobs are validated at API level)

⚠️ Consider:

  • Add rate limiting to prevent job creation spam
  • Add job ownership/ACL if multiple tenants use the system

🚀 Performance Considerations

  1. Database Connection Pool: Each worker loop iteration creates a new session. At scale with multiple workers, monitor connection pool usage.

  2. Index Recommendations: Ensure indexes exist on:

    • jobs.progress (for filtering QUEUED jobs)
    • jobs.created_at (for ordering)
    • Composite index on (progress, created_at) would be optimal
  3. Lock Contention: SELECT FOR UPDATE SKIP LOCKED handles this well, but monitor at scale with multiple workers.

📝 Code Quality

Positive:

  • ✅ Follows ruff formatting standards
  • ✅ Type hints used throughout
  • ✅ Docstrings present
  • ✅ Good separation of concerns (service layer, worker, models)
  • ✅ Async/await used correctly

Suggestions:

  • Add __all__ exports where appropriate
  • Consider adding more detailed type hints for complex return types
  • Some methods could benefit from more detailed docstrings (e.g., what exceptions are raised)

🧪 Testing Recommendations

Before merging, add tests for:

  1. Worker behavior:

    • Job is picked up from QUEUED state
    • Job transitions through states correctly
    • Multiple workers don't process the same job
    • Worker handles job failures gracefully
  2. Orphan recovery:

    • RUNNING jobs are reset to QUEUED on startup
    • Completed jobs are not affected
  3. Timeout handling:

    • Long-running jobs are marked FAILED
    • Timeout is configurable
  4. Edge cases:

    • Empty queue behavior
    • Database connection failures during processing
    • Worker shutdown during job processing

🎯 Recommendations

Must Fix Before Merge:

  1. ✅ Implement recover_orphaned_jobs() - critical safety feature
  2. ✅ Implement check_for_stuck_jobs() - critical safety feature
  3. ✅ Fix race condition in worker loop (see issue Bump form-data from 3.0.1 to 3.0.4 in /frontend #3)
  4. ✅ Add basic worker tests
  5. ✅ Fix enqueue error handling (don't mark as FAILED on enqueue failure)

Should Fix Before Merge:

  1. Add database indexes for performance
  2. Add worker health check endpoint
  3. Update documentation to reflect current limitations
  4. Add migration downgrade documentation

Nice to Have:

  1. Job cancellation support
  2. Configurable per-job timeouts
  3. Priority queue support
  4. Worker metrics/observability

Summary

This is a solid foundation for a job queue system with excellent documentation and good architectural decisions. However, critical safety features are incomplete (orphan recovery, timeout handling) and test coverage is missing. The race condition in the worker loop should be addressed before production use.

Recommendation: Request changes to implement the core safety features and add test coverage before merging.


Review generated with focus on code quality, security, performance, and best practices.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants