diff --git a/backend/python/app/__init__.py b/backend/python/app/__init__.py index a22f9561..778d1351 100644 --- a/backend/python/app/__init__.py +++ b/backend/python/app/__init__.py @@ -1,5 +1,6 @@ +import asyncio from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from logging.config import dictConfig import firebase_admin @@ -8,11 +9,14 @@ from app.dependencies.services import get_scheduler_service from app.services.jobs import init_jobs +from app.workers.job_worker import JobWorker from .config import settings from .models import init_app as init_models from .routers import init_app as init_routers +job_worker: JobWorker | None = None + def configure_logging() -> None: """Configure application logging based on environment""" @@ -117,6 +121,8 @@ def initialize_firebase() -> None: @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan management""" + global job_worker + # Startup configure_logging() initialize_firebase() @@ -127,11 +133,24 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: scheduler_service.start() init_jobs(scheduler_service) + job_worker = JobWorker( + poll_interval=5, job_timeout_minutes=30, enable_orphan_recovery=True + ) + + worker_task = asyncio.create_task(job_worker.start()) + yield - # Cleanup: stop the scheduler service during application shutdown scheduler_service.stop() + if job_worker: + job_worker.stop() + + worker_task.cancel() + with suppress(asyncio.CancelledError): + # Expected during shutdown - task was cancelled gracefully + await worker_task + def create_app() -> FastAPI: """Create and configure FastAPI application""" diff --git a/backend/python/app/models/enum.py b/backend/python/app/models/enum.py index 3fadfa2e..c45b1779 100644 --- a/backend/python/app/models/enum.py +++ b/backend/python/app/models/enum.py @@ -28,6 +28,7 @@ class RoleEnum(str, Enum): class ProgressEnum(str, Enum): PENDING = "Pending" + QUEUED = "Queued" RUNNING = "Running" COMPLETED = "Completed" FAILED = "Failed" diff --git a/backend/python/app/services/implementations/job_service.py b/backend/python/app/services/implementations/job_service.py index ffd21682..c5f83f83 100644 --- a/backend/python/app/services/implementations/job_service.py +++ b/backend/python/app/services/implementations/job_service.py @@ -47,6 +47,7 @@ async def get_job(self, job_id: UUID) -> Job | None: return result.scalar_one_or_none() async def update_progress(self, job_id: UUID, progress: ProgressEnum) -> None: + """Update job progress and set timestamps appropriately""" try: job = await self.get_job(job_id) if not job: @@ -54,16 +55,22 @@ async def update_progress(self, job_id: UUID, progress: ProgressEnum) -> None: return job.progress = progress job.updated_at = self.utc_now_naive() + + if progress == ProgressEnum.RUNNING and job.started_at is None: + job.started_at = self.utc_now_naive() + if progress in (ProgressEnum.COMPLETED, ProgressEnum.FAILED): job.finished_at = self.utc_now_naive() + self.session.add(job) await self.session.commit() except Exception as error: - self.logger.error("Error creating job") + self.logger.error("Error updating job progress") await self.session.rollback() raise error async def enqueue(self, job_id: UUID) -> None: + """Move job from PENDING to QUEUED (ready for worker to pick up)""" try: job = await self.get_job(job_id) @@ -79,8 +86,8 @@ async def enqueue(self, job_id: UUID) -> None: ) return - job.progress = ProgressEnum.RUNNING - job.started_at = self.utc_now_naive() + job.progress = ProgressEnum.QUEUED + job.updated_at = self.utc_now_naive() self.session.add(job) await self.session.commit() except Exception: diff --git a/backend/python/app/workers/README.md b/backend/python/app/workers/README.md new file mode 100644 index 00000000..8bef0ec8 --- /dev/null +++ b/backend/python/app/workers/README.md @@ -0,0 +1,322 @@ +# Job Worker Documentation + +## Overview + +The Job Worker is a background process that processes jobs from a database queue. Unlike traditional queue systems (Redis/Celery), **the database table itself acts as the queue**. Jobs are stored as rows in the `jobs` table, and the `progress` column tracks their state. + +## Key Features + +- **Database as Queue**: No separate queue system needed - jobs persist in PostgreSQL +- **Survives Restarts**: Jobs persist in database, so worker can resume after crashes +- **Orphan Recovery**: Automatically resets stuck jobs on startup +- **Timeout Protection**: Jobs that run too long are automatically marked as FAILED +- **Race Condition Prevention**: Uses `SELECT FOR UPDATE SKIP LOCKED` to prevent duplicate processing +- **Resilient**: Worker continues running even if individual jobs fail + +## Architecture + +### The Loop + +The worker continuously polls the database for `QUEUED` jobs and processes them: + +``` +Client → API → Database (PENDING → QUEUED) → Worker Loop → Database (RUNNING → COMPLETED) +``` + +### Job States + +Jobs flow through these states: + +1. **PENDING**: Job created by API, not yet ready for processing +2. **QUEUED**: Job is ready for worker to pick up (moved here by `enqueue()`) +3. **RUNNING**: Worker is actively processing the job +4. **COMPLETED**: Job finished successfully +5. **FAILED**: Job encountered an error or timed out + +### Database Schema + +The `jobs` table tracks: +- `job_id`: Unique identifier (UUID) +- `progress`: Current state (enum: PENDING, QUEUED, RUNNING, COMPLETED, FAILED) +- `created_at`: When job was created +- `started_at`: When job started processing (set when moving to RUNNING) +- `finished_at`: When job completed or failed +- `updated_at`: Last update timestamp +- `route_group_id`: Optional reference to route group + +## How It Works + +### 1. Worker Startup + +When the application starts, the worker: +- Initializes with configurable settings (poll interval, timeout, etc.) +- Runs orphan recovery to reset any jobs stuck in RUNNING state +- Starts the main polling loop + +### 2. Main Loop + +The worker continuously: +1. Checks for stuck jobs (jobs running longer than timeout) +2. Polls database for next QUEUED job (oldest first, FIFO) +3. Processes the job if found +4. Waits `poll_interval` seconds before checking again + +### 3. Processing a Job + +When a job is found: + +1. **Lock Job**: Uses `SELECT FOR UPDATE SKIP LOCKED` to prevent race conditions +2. **Mark as RUNNING**: Updates job status and sets `started_at` timestamp +3. **Execute Work**: Calls `generate_routes()` with timeout protection +4. **Mark as COMPLETED**: Updates job status and sets `finished_at` timestamp +5. **Error Handling**: If any step fails, job is marked as FAILED + +### 4. Orphan Recovery + +On startup, the worker finds all jobs in `RUNNING` state and resets them to `QUEUED`. This handles cases where: +- The application crashed while processing a job +- The worker was stopped mid-processing +- The database connection was lost + +### 5. Stuck Job Detection + +Periodically, the worker checks for jobs that have been `RUNNING` longer than the timeout (default: 30 minutes). These are automatically marked as `FAILED` to prevent infinite processing. + +## Configuration + +The worker is configured in `app/__init__.py`: + +```python +job_worker = JobWorker( + poll_interval=5, + job_timeout_minutes=30, + enable_orphan_recovery=True +) +``` + +### Parameters + +- **poll_interval** (int): Seconds between checking for new jobs (default: 5) +- **job_timeout_minutes** (int): Maximum time a job can run before being marked as FAILED (default: 30) +- **enable_orphan_recovery** (bool): Whether to reset orphaned RUNNING jobs on startup (default: True) + +## Testing + +### Manual Testing via Database + +1. **Create a test job**: +```bash +docker-compose exec db psql -U postgres -d f4k -c \ + "INSERT INTO jobs (job_id, progress, created_at) \ + VALUES (gen_random_uuid(), 'QUEUED', NOW()) \ + RETURNING job_id;" +``` + +2. **Monitor the worker logs**: +```bash +docker-compose logs -f backend | grep -i "worker\|job" +``` + +3. **Check job status**: +```bash +# Check all jobs +docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT job_id, progress, started_at, finished_at \ + FROM jobs ORDER BY created_at DESC LIMIT 5;" + +# Check specific job +docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT * FROM jobs WHERE job_id = 'YOUR_JOB_ID';" +``` + +4. **Verify completion**: +```bash +# Wait ~15 seconds (10s sleep in generate_routes + processing time) +docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT job_id, progress, started_at, finished_at, \ + EXTRACT(EPOCH FROM (finished_at - started_at)) as duration_seconds \ + FROM jobs WHERE progress = 'COMPLETED' \ + ORDER BY finished_at DESC LIMIT 1;" +``` + +### Testing via API + +1. **Create and enqueue a job**: +```bash +curl -X POST "http://localhost:8080/jobs/generate" \ + -H "Content-Type: application/json" \ + -d '{"location_group": {...}, "settings": {...}}' +``` + +2. **Check job status**: +```bash +curl "http://localhost:8080/jobs/{job_id}" +``` + +### Expected Behavior + +A successful job should: +- Start as `QUEUED` +- Move to `RUNNING` within ~5 seconds (poll interval) +- Execute `generate_routes()` (currently simulates 10 seconds of work) +- Complete as `COMPLETED` with proper timestamps +- Show logs: "Found job...", "Starting job...", "Generating routes...", "completed successfully" + +## Monitoring + +### Check Worker Status + +```bash +# See if worker is running and polling +docker-compose logs backend --tail=50 | grep "Worker loop\|QUEUED" + +# Check for errors +docker-compose logs backend --tail=100 | grep -i "error\|exception\|failed" +``` + +### Check Job Statistics + +```bash +# Count jobs by status +docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT progress, COUNT(*) as count \ + FROM jobs GROUP BY progress;" + +# Find stuck jobs +docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT job_id, progress, started_at, \ + EXTRACT(EPOCH FROM (NOW() - started_at))/60 as minutes_running \ + FROM jobs \ + WHERE progress = 'RUNNING' \ + AND started_at < NOW() - INTERVAL '30 minutes';" +``` + +## Implementation Details + +### Session Handling + +The worker uses separate database sessions for: +- Finding jobs (`process_next_job`) +- Processing jobs (`process_job`) + +This ensures clean transaction boundaries and prevents session conflicts. + +### Race Condition Prevention + +Uses PostgreSQL's `SELECT FOR UPDATE SKIP LOCKED`: +- Locks the row when selecting +- Skips rows already locked by other workers +- Prevents multiple workers from processing the same job + +### Error Handling + +- Individual job failures don't crash the worker +- Failed jobs are logged and marked as `FAILED` +- Worker continues processing other jobs +- Exceptions are caught and logged at each level + +## Extending the Worker + +### Implementing Route Generation + +The `generate_routes()` method in `job_worker.py` currently simulates work. To implement actual route generation: + +```python +async def generate_routes(self, job: Job) -> None: + """Execute the actual route generation algorithm.""" + self.logger.info(f"Job {job.job_id}: Starting route generation...") + + # 1. Fetch location group data + location_group = await fetch_location_group(job.location_group_id) + + # 2. Run optimization algorithm + routes = await optimize_routes( + locations=location_group.locations, + settings=job.settings + ) + + # 3. Save results + await save_route_results(job.job_id, routes) + + self.logger.info(f"Job {job.job_id}: Route generation complete") +``` + +### Adding New Job Types + +To support different job types: +1. Add a `job_type` field to the `Job` model +2. Add a switch in `generate_routes()` to handle different types +3. Or create separate worker classes for different job types + +## Troubleshooting + +### Jobs Not Processing + +1. **Check if worker is running**: + ```bash + docker-compose logs backend | grep "Job worker starting" + ``` + +2. **Check for QUEUED jobs**: + ```bash + docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT COUNT(*) FROM jobs WHERE progress = 'QUEUED';" + ``` + +3. **Check worker logs for errors**: + ```bash + docker-compose logs backend --tail=100 | grep -i "error\|exception" + ``` + +### Jobs Stuck in RUNNING + +1. **Check for orphaned jobs**: + ```bash + docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT job_id, started_at FROM jobs WHERE progress = 'RUNNING';" + ``` + +2. **Manually reset if needed**: + ```bash + docker-compose exec db psql -U postgres -d f4k -c \ + "UPDATE jobs SET progress = 'QUEUED', started_at = NULL \ + WHERE progress = 'RUNNING' AND job_id = 'YOUR_JOB_ID';" + ``` + +3. **Worker will auto-recover on next restart** (if `enable_orphan_recovery=True`) + +### Jobs Failing Immediately + +1. **Check logs for error details**: + ```bash + docker-compose logs backend | grep -A 10 "Job.*failed" + ``` + +2. **Verify enum values match**: + ```bash + docker-compose exec db psql -U postgres -d f4k -c \ + "SELECT unnest(enum_range(NULL::progressenum));" + ``` + +3. **Check database connection**: + ```bash + docker-compose exec db psql -U postgres -d f4k -c "SELECT 1;" + ``` + +## Related Files + +- **Worker Implementation**: `app/workers/job_worker.py` +- **Job Service**: `app/services/implementations/job_service.py` +- **Job Model**: `app/models/job.py` +- **Progress Enum**: `app/models/enum.py` +- **Worker Integration**: `app/__init__.py` (lifespan function) +- **Job Routes**: `app/routers/job_routes.py` + +## Notes + +- The worker runs as a background task in the FastAPI application +- It starts automatically when the application starts +- It shuts down gracefully when the application stops +- Jobs persist across application restarts +- The database acts as both storage and queue + diff --git a/backend/python/app/workers/__init__.py b/backend/python/app/workers/__init__.py new file mode 100644 index 00000000..af3634c3 --- /dev/null +++ b/backend/python/app/workers/__init__.py @@ -0,0 +1,7 @@ +""" +Background job workers for processing async tasks. +""" + +from app.workers.job_worker import JobWorker + +__all__ = ["JobWorker"] diff --git a/backend/python/app/workers/job_worker.py b/backend/python/app/workers/job_worker.py new file mode 100644 index 00000000..be057e7b --- /dev/null +++ b/backend/python/app/workers/job_worker.py @@ -0,0 +1,199 @@ +""" +Job Worker - Processes jobs from database queue. + +The database table itself acts as the queue. This worker: +- Polls the database for QUEUED jobs +- Processes them through: QUEUED → RUNNING → COMPLETED/FAILED +- Handles orphaned jobs on startup (jobs stuck in RUNNING state) +- Survives restarts - jobs persist in database +""" + +import asyncio +import logging +from uuid import UUID + +from sqlmodel import select + +from app.models import get_session +from app.models.enum import ProgressEnum +from app.models.job import Job +from app.services.implementations.job_service import JobService + +logger = logging.getLogger(__name__) + + +class JobWorker: + """ + Worker that processes jobs from database queue. + + Flow: + 1. Poll database for QUEUED jobs + 2. Mark job as RUNNING + 3. Execute route generation + 4. Mark job as COMPLETED or FAILED + 5. Repeat + """ + + def __init__( + self, + poll_interval: int = 5, + job_timeout_minutes: int = 30, + enable_orphan_recovery: bool = True, + ): + """ + Initialize the job worker. + + Args: + poll_interval: Seconds to wait between checking for new jobs + job_timeout_minutes: Max time a job can run before considered stuck + enable_orphan_recovery: Auto-reset orphaned RUNNING jobs on startup + """ + self.poll_interval = poll_interval + self.job_timeout_minutes = job_timeout_minutes + self.enable_orphan_recovery = enable_orphan_recovery + + self.running = False + self.logger = logging.getLogger(__name__) + + async def start(self) -> None: + """ + Start the worker. + This is the main entry point that runs the worker loop. + """ + self.running = True + self.logger.info("Job worker starting...") + + if self.enable_orphan_recovery: + await self.recover_orphaned_jobs() + + await self.worker_loop() + + def stop(self) -> None: + """Stop the worker gracefully""" + self.logger.info("Stopping job worker...") + self.running = False + + async def worker_loop(self) -> None: + """ + Main worker loop - continuously polls database for QUEUED jobs. + """ + self.logger.info("Worker loop started - polling for QUEUED jobs") + + while self.running: + try: + await self.check_for_stuck_jobs() + + await self.process_next_job() + + except asyncio.CancelledError: + self.logger.info("Worker loop cancelled") + break + except Exception as e: + self.logger.exception(f"Error in worker loop: {e}") + await asyncio.sleep(self.poll_interval) + + self.logger.info("Worker loop stopped") + + async def process_next_job(self) -> None: + """ + Find the next QUEUED job and process it. + Uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions. + """ + job_id: UUID | None = None + + async for session in get_session(): + try: + result = await session.execute( + select(Job) + .where(Job.progress == ProgressEnum.QUEUED) + .where(Job.created_at.isnot(None)) # type: ignore[union-attr] + .order_by(Job.created_at) # type: ignore[arg-type] + .limit(1) + .with_for_update(skip_locked=True) + ) + job = result.scalar_one_or_none() + + if not job: + self.logger.debug("No queued jobs found") + await asyncio.sleep(self.poll_interval) + return + + job_id = job.job_id + self.logger.info(f"Found job {job_id}, processing...") + + except Exception as e: + self.logger.exception(f"Error finding next job: {e}") + await asyncio.sleep(self.poll_interval) + return + + if job_id: + await self.process_job(job_id) + + async def process_job(self, job_id: UUID) -> None: + """ + Process a single job. + Flow: QUEUED → RUNNING → COMPLETED/FAILED + """ + async for session in get_session(): + job_service = JobService(logger=self.logger, session=session) + + try: + self.logger.info(f"Starting job {job_id}") + await job_service.update_progress(job_id, ProgressEnum.RUNNING) + + job = await job_service.get_job(job_id) + if not job: + self.logger.error(f"Job {job_id} not found, skipping") + return + + self.logger.info(f"Generating routes for job {job_id}...") + + try: + await asyncio.wait_for( + self.generate_routes(job), timeout=self.job_timeout_minutes * 60 + ) + except asyncio.TimeoutError: + raise Exception( + f"Job timed out after {self.job_timeout_minutes} minutes" + ) from None + + await job_service.update_progress(job_id, ProgressEnum.COMPLETED) + self.logger.info(f"Job {job_id} completed successfully") + + except Exception as e: + self.logger.exception(f"Job {job_id} failed: {e}") + + try: + await job_service.update_progress(job_id, ProgressEnum.FAILED) + except Exception as update_error: + self.logger.exception( + f"Failed to mark job {job_id} as FAILED: {update_error}" + ) + + async def generate_routes(self, job: Job) -> None: + """ + Execute the actual route generation algorithm. + + TODO: Replace this with your actual implementation. + """ + self.logger.info(f"Job {job.job_id}: Starting route generation...") + + await asyncio.sleep(10) + + # TODO: Implement actual route generation + + async def recover_orphaned_jobs(self) -> None: + """ + On startup, find jobs stuck in RUNNING state and reset them to QUEUED. + This handles jobs that were being processed when the app crashed. + + Jobs persist in database, so when app restarts, we can resume processing. + """ + pass + + async def check_for_stuck_jobs(self) -> None: + """ + Periodically check for jobs that have been RUNNING too long. + Mark them as FAILED if they exceed the timeout. + """ + pass diff --git a/backend/python/migrations/versions/c967b946e4f9_add_queued_to_progress_enum.py b/backend/python/migrations/versions/c967b946e4f9_add_queued_to_progress_enum.py new file mode 100644 index 00000000..2b3f6c2f --- /dev/null +++ b/backend/python/migrations/versions/c967b946e4f9_add_queued_to_progress_enum.py @@ -0,0 +1,27 @@ +"""add_queued_to_progress_enum + +Revision ID: c967b946e4f9 +Revises: 7af7d4689b08 +Create Date: 2025-11-28 00:44:35.958724 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c967b946e4f9' +down_revision = '7af7d4689b08' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + try: + op.execute("ALTER TYPE progressenum ADD VALUE 'QUEUED'") + except Exception: + pass + + +def downgrade() -> None: + pass