77- Handles orphaned jobs on startup (jobs stuck in RUNNING state)
88- Survives restarts - jobs persist in database
99"""
10+
1011import asyncio
1112import logging
12- from datetime import datetime , timedelta
1313from uuid import UUID
1414
15- from sqlalchemy .ext .asyncio import AsyncSession
1615from sqlmodel import select
1716
1817from app .models import get_session
19- from app .models .job import Job
2018from app .models .enum import ProgressEnum
19+ from app .models .job import Job
2120from app .services .implementations .job_service import JobService
2221
2322logger = logging .getLogger (__name__ )
2625class JobWorker :
2726 """
2827 Worker that processes jobs from database queue.
29-
28+
3029 Flow:
3130 1. Poll database for QUEUED jobs
3231 2. Mark job as RUNNING
3332 3. Execute route generation
3433 4. Mark job as COMPLETED or FAILED
3534 5. Repeat
3635 """
37-
36+
3837 def __init__ (
3938 self ,
4039 poll_interval : int = 5 ,
4140 job_timeout_minutes : int = 30 ,
42- enable_orphan_recovery : bool = True
41+ enable_orphan_recovery : bool = True ,
4342 ):
4443 """
4544 Initialize the job worker.
46-
45+
4746 Args:
4847 poll_interval: Seconds to wait between checking for new jobs
4948 job_timeout_minutes: Max time a job can run before considered stuck
@@ -52,153 +51,149 @@ def __init__(
5251 self .poll_interval = poll_interval
5352 self .job_timeout_minutes = job_timeout_minutes
5453 self .enable_orphan_recovery = enable_orphan_recovery
55-
54+
5655 self .running = False
5756 self .logger = logging .getLogger (__name__ )
58-
57+
5958 async def start (self ) -> None :
6059 """
6160 Start the worker.
6261 This is the main entry point that runs the worker loop.
6362 """
6463 self .running = True
6564 self .logger .info ("Job worker starting..." )
66-
67- # On startup, handle orphaned jobs (jobs stuck in RUNNING state)
65+
6866 if self .enable_orphan_recovery :
6967 await self .recover_orphaned_jobs ()
70-
71- # Start the main worker loop
68+
7269 await self .worker_loop ()
73-
70+
7471 def stop (self ) -> None :
7572 """Stop the worker gracefully"""
7673 self .logger .info ("Stopping job worker..." )
7774 self .running = False
78-
75+
7976 async def worker_loop (self ) -> None :
8077 """
8178 Main worker loop - continuously polls database for QUEUED jobs.
8279 """
8380 self .logger .info ("Worker loop started - polling for QUEUED jobs" )
84-
81+
8582 while self .running :
8683 try :
8784 await self .check_for_stuck_jobs ()
88-
85+
8986 await self .process_next_job ()
90-
87+
9188 except asyncio .CancelledError :
9289 self .logger .info ("Worker loop cancelled" )
9390 break
9491 except Exception as e :
9592 self .logger .exception (f"Error in worker loop: { e } " )
9693 await asyncio .sleep (self .poll_interval )
97-
94+
9895 self .logger .info ("Worker loop stopped" )
99-
96+
10097 async def process_next_job (self ) -> None :
10198 """
10299 Find the next QUEUED job and process it.
103100 Uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions.
104101 """
105102 job_id : UUID | None = None
106-
103+
107104 async for session in get_session ():
108105 try :
109106 result = await session .execute (
110107 select (Job )
111108 .where (Job .progress == ProgressEnum .QUEUED )
112- .order_by (Job .created_at )
109+ .where (Job .created_at .isnot (None )) # type: ignore[union-attr]
110+ .order_by (Job .created_at ) # type: ignore[arg-type]
113111 .limit (1 )
114112 .with_for_update (skip_locked = True )
115113 )
116114 job = result .scalar_one_or_none ()
117-
115+
118116 if not job :
119117 self .logger .debug ("No queued jobs found" )
120118 await asyncio .sleep (self .poll_interval )
121119 return
122-
120+
123121 job_id = job .job_id
124122 self .logger .info (f"Found job { job_id } , processing..." )
125-
123+
126124 except Exception as e :
127125 self .logger .exception (f"Error finding next job: { e } " )
128126 await asyncio .sleep (self .poll_interval )
129127 return
130-
128+
131129 if job_id :
132130 await self .process_job (job_id )
133-
131+
134132 async def process_job (self , job_id : UUID ) -> None :
135133 """
136134 Process a single job.
137135 Flow: QUEUED → RUNNING → COMPLETED/FAILED
138136 """
139137 async for session in get_session ():
140138 job_service = JobService (logger = self .logger , session = session )
141-
139+
142140 try :
143141 self .logger .info (f"Starting job { job_id } " )
144142 await job_service .update_progress (job_id , ProgressEnum .RUNNING )
145-
143+
146144 job = await job_service .get_job (job_id )
147145 if not job :
148146 self .logger .error (f"Job { job_id } not found, skipping" )
149147 return
150-
148+
151149 self .logger .info (f"Generating routes for job { job_id } ..." )
152-
150+
153151 try :
154152 await asyncio .wait_for (
155- self .generate_routes (job ),
156- timeout = self .job_timeout_minutes * 60
153+ self .generate_routes (job ), timeout = self .job_timeout_minutes * 60
157154 )
158155 except asyncio .TimeoutError :
159156 raise Exception (
160157 f"Job timed out after { self .job_timeout_minutes } minutes"
161- )
162-
158+ ) from None
159+
163160 await job_service .update_progress (job_id , ProgressEnum .COMPLETED )
164161 self .logger .info (f"Job { job_id } completed successfully" )
165-
162+
166163 except Exception as e :
167164 self .logger .exception (f"Job { job_id } failed: { e } " )
168-
165+
169166 try :
170167 await job_service .update_progress (job_id , ProgressEnum .FAILED )
171168 except Exception as update_error :
172169 self .logger .exception (
173170 f"Failed to mark job { job_id } as FAILED: { update_error } "
174171 )
175-
172+
176173 async def generate_routes (self , job : Job ) -> None :
177174 """
178175 Execute the actual route generation algorithm.
179-
176+
180177 TODO: Replace this with your actual implementation.
181178 """
182179 self .logger .info (f"Job { job .job_id } : Starting route generation..." )
183-
180+
184181 await asyncio .sleep (10 )
185-
182+
186183 # TODO: Implement actual route generation
187184
188-
189185 async def recover_orphaned_jobs (self ) -> None :
190186 """
191187 On startup, find jobs stuck in RUNNING state and reset them to QUEUED.
192188 This handles jobs that were being processed when the app crashed.
193-
189+
194190 Jobs persist in database, so when app restarts, we can resume processing.
195191 """
196192 pass
197-
193+
198194 async def check_for_stuck_jobs (self ) -> None :
199195 """
200196 Periodically check for jobs that have been RUNNING too long.
201197 Mark them as FAILED if they exceed the timeout.
202198 """
203199 pass
204-
0 commit comments