Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions backend/python/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from logging.config import dictConfig

import firebase_admin
Expand All @@ -8,11 +9,14 @@

from app.dependencies.services import get_scheduler_service
from app.services.jobs import init_jobs
from app.workers.job_worker import JobWorker

from .config import settings
from .models import init_app as init_models
from .routers import init_app as init_routers

job_worker: JobWorker | None = None


def configure_logging() -> None:
"""Configure application logging based on environment"""
Expand Down Expand Up @@ -117,6 +121,8 @@ def initialize_firebase() -> None:
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifespan management"""
global job_worker

# Startup
configure_logging()
initialize_firebase()
Expand All @@ -127,11 +133,24 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
scheduler_service.start()
init_jobs(scheduler_service)

job_worker = JobWorker(
poll_interval=5, job_timeout_minutes=30, enable_orphan_recovery=True
)

worker_task = asyncio.create_task(job_worker.start())

yield

# Cleanup: stop the scheduler service during application shutdown
scheduler_service.stop()

if job_worker:
job_worker.stop()

worker_task.cancel()
with suppress(asyncio.CancelledError):
# Expected during shutdown - task was cancelled gracefully
await worker_task


def create_app() -> FastAPI:
"""Create and configure FastAPI application"""
Expand Down
1 change: 1 addition & 0 deletions backend/python/app/models/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class RoleEnum(str, Enum):

class ProgressEnum(str, Enum):
PENDING = "Pending"
QUEUED = "Queued"
RUNNING = "Running"
COMPLETED = "Completed"
FAILED = "Failed"
13 changes: 10 additions & 3 deletions backend/python/app/services/implementations/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,30 @@ async def get_job(self, job_id: UUID) -> Job | None:
return result.scalar_one_or_none()

async def update_progress(self, job_id: UUID, progress: ProgressEnum) -> None:
"""Update job progress and set timestamps appropriately"""
try:
job = await self.get_job(job_id)
if not job:
self.logger.error("No job with corresponding job ID")
return
job.progress = progress
job.updated_at = self.utc_now_naive()

if progress == ProgressEnum.RUNNING and job.started_at is None:
job.started_at = self.utc_now_naive()

if progress in (ProgressEnum.COMPLETED, ProgressEnum.FAILED):
job.finished_at = self.utc_now_naive()

self.session.add(job)
await self.session.commit()
except Exception as error:
self.logger.error("Error creating job")
self.logger.error("Error updating job progress")
await self.session.rollback()
raise error

async def enqueue(self, job_id: UUID) -> None:
"""Move job from PENDING to QUEUED (ready for worker to pick up)"""
try:
job = await self.get_job(job_id)

Expand All @@ -79,8 +86,8 @@ async def enqueue(self, job_id: UUID) -> None:
)
return

job.progress = ProgressEnum.RUNNING
job.started_at = self.utc_now_naive()
job.progress = ProgressEnum.QUEUED
job.updated_at = self.utc_now_naive()
self.session.add(job)
await self.session.commit()
except Exception:
Expand Down
Loading