Skip to content

Commit 11d4941

Browse files
committed
add job queue :)
1 parent e4abac5 commit 11d4941

File tree

7 files changed

+595
-4
lines changed

7 files changed

+595
-4
lines changed

backend/python/app/__init__.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
from collections.abc import AsyncGenerator
22
from contextlib import asynccontextmanager
33
from logging.config import dictConfig
4+
import asyncio
45

56
import firebase_admin
67
from fastapi import FastAPI
78
from fastapi.middleware.cors import CORSMiddleware
89

910
from app.dependencies.services import get_scheduler_service
1011
from app.services.jobs import init_jobs
12+
from app.workers.job_worker import JobWorker
1113

1214
from .config import settings
1315
from .models import init_app as init_models
1416
from .routers import init_app as init_routers
1517

18+
job_worker: JobWorker | None = None
19+
1620

1721
def configure_logging() -> None:
1822
"""Configure application logging based on environment"""
@@ -117,6 +121,8 @@ def initialize_firebase() -> None:
117121
@asynccontextmanager
118122
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
119123
"""Application lifespan management"""
124+
global job_worker
125+
120126
# Startup
121127
configure_logging()
122128
initialize_firebase()
@@ -126,11 +132,28 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
126132
scheduler_service = get_scheduler_service()
127133
scheduler_service.start()
128134
init_jobs(scheduler_service)
135+
136+
job_worker = JobWorker(
137+
poll_interval=5,
138+
job_timeout_minutes=30,
139+
enable_orphan_recovery=True
140+
)
141+
142+
worker_task = asyncio.create_task(job_worker.start())
129143

130144
yield
131145

132-
# Cleanup: stop the scheduler service during application shutdown
133146
scheduler_service.stop()
147+
148+
if job_worker:
149+
job_worker.stop()
150+
151+
worker_task.cancel()
152+
try:
153+
await worker_task
154+
except asyncio.CancelledError:
155+
# Expected during shutdown - task was cancelled gracefully
156+
pass
134157

135158

136159
def create_app() -> FastAPI:

backend/python/app/models/enum.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class RoleEnum(str, Enum):
2828

2929
class ProgressEnum(str, Enum):
3030
PENDING = "Pending"
31+
QUEUED = "Queued"
3132
RUNNING = "Running"
3233
COMPLETED = "Completed"
3334
FAILED = "Failed"

backend/python/app/services/implementations/job_service.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,23 +47,30 @@ async def get_job(self, job_id: UUID) -> Job | None:
4747
return result.scalar_one_or_none()
4848

4949
async def update_progress(self, job_id: UUID, progress: ProgressEnum) -> None:
50+
"""Update job progress and set timestamps appropriately"""
5051
try:
5152
job = await self.get_job(job_id)
5253
if not job:
5354
self.logger.error("No job with corresponding job ID")
5455
return
5556
job.progress = progress
5657
job.updated_at = self.utc_now_naive()
58+
59+
if progress == ProgressEnum.RUNNING and job.started_at is None:
60+
job.started_at = self.utc_now_naive()
61+
5762
if progress in (ProgressEnum.COMPLETED, ProgressEnum.FAILED):
5863
job.finished_at = self.utc_now_naive()
64+
5965
self.session.add(job)
6066
await self.session.commit()
6167
except Exception as error:
62-
self.logger.error("Error creating job")
68+
self.logger.error("Error updating job progress")
6369
await self.session.rollback()
6470
raise error
6571

6672
async def enqueue(self, job_id: UUID) -> None:
73+
"""Move job from PENDING to QUEUED (ready for worker to pick up)"""
6774
try:
6875
job = await self.get_job(job_id)
6976

@@ -79,8 +86,8 @@ async def enqueue(self, job_id: UUID) -> None:
7986
)
8087
return
8188

82-
job.progress = ProgressEnum.RUNNING
83-
job.started_at = self.utc_now_naive()
89+
job.progress = ProgressEnum.QUEUED
90+
job.updated_at = self.utc_now_naive()
8491
self.session.add(job)
8592
await self.session.commit()
8693
except Exception:

0 commit comments

Comments
 (0)