1717from datetime import datetime , timedelta , timezone
1818
1919from arq import ArqRedis
20+ from arq .jobs import Job as ArqJob
21+ from arq .jobs import JobStatus as ArqJobStatus
2022from sqlalchemy import select
2123from sqlalchemy .orm import Session
2224
3537# Timeout thresholds for detecting stalled jobs (in minutes).
3638# RUNNING_TIMEOUT_MINUTES must stay below ArqWorkerSettings.job_timeout (currently 2 hours)
3739# to avoid marking legitimately running jobs as stalled.
38- QUEUED_TIMEOUT_MINUTES = 10 # QUEUED jobs should start within 10 min
3940RUNNING_TIMEOUT_MINUTES = 90 # RUNNING jobs should complete within 90 min (30 min buffer under ARQ timeout)
40- PENDING_TIMEOUT_MINUTES = 30 # PENDING jobs in pipelines should be enqueued within 30 minutes
41+ PENDING_TIMEOUT_MINUTES = 5 # PENDING jobs which are actionable within pipelines should be enqueued within 5 minutes
4142
4243
4344async def _handle_stalled_job_retry (
@@ -170,9 +171,11 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
170171 and handles them appropriately.
171172
172173 Stalled job detection criteria:
173- - QUEUED: Created > 10 minutes ago but never started (stuck between prepare_queue and ARQ pickup)
174+ - QUEUED: Present in DB as QUEUED but absent from ARQ's Redis queue
175+ (process crashed between prepare_queue and redis.enqueue_job)
174176 - RUNNING: Started > 60 minutes ago but not finished (worker likely crashed)
175- - PENDING: Created > 30 minutes ago in a pipeline (coordination failure)
177+ - PENDING: Created > 5 minutes ago in a pipeline and currently runnable
178+ (coordination failure)
176179
177180 Actions taken:
178181 - If job has retries remaining: Mark PENDING for retry (will be re-enqueued by pipeline)
@@ -195,15 +198,13 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
195198 - Worker started job, marked it RUNNING, then crashed
196199 - After 60 minutes (longer than ARQ timeout), janitor detects and retries
197200 """
198- # Setup initial context and progress
199201 job_manager .save_to_context (
200202 {
201203 "application" : "mavedb-worker" ,
202204 "function" : "cleanup_stalled_jobs" ,
203205 "resource" : "stalled_jobs" ,
204206 "correlation_id" : None ,
205207 "thresholds" : {
206- "queued_timeout_minutes" : QUEUED_TIMEOUT_MINUTES ,
207208 "running_timeout_minutes" : RUNNING_TIMEOUT_MINUTES ,
208209 "pending_timeout_minutes" : PENDING_TIMEOUT_MINUTES ,
209210 },
@@ -222,14 +223,14 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
222223 "pending" : [],
223224 }
224225
225- # Find QUEUED jobs that have been waiting too long
226- # These likely got stuck during enqueue (state marked QUEUED but never reached ARQ)
227- queued_threshold = now - timedelta (minutes = QUEUED_TIMEOUT_MINUTES )
226+ # Find all QUEUED jobs that have never started. The Redis presence check below
227+ # is the definitive stall gate: a job is only acted on if it is absent from
228+ # ARQ's queue, meaning the process crashed after writing QUEUED to the DB but
229+ # before calling redis.enqueue_job(). No time threshold is needed here.
228230 queued_jobs = job_manager .db .scalars (
229231 select (JobRun ).where (
230232 JobRun .status == JobStatus .QUEUED ,
231233 JobRun .started_at .is_ (None ), # Never started
232- JobRun .created_at < queued_threshold , # Created long ago
233234 )
234235 ).all ()
235236
@@ -241,9 +242,22 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
241242 manager = JobManager (job_manager .db , job_manager .redis , job .id )
242243 elapsed_minutes = (now - job .created_at ).total_seconds () / 60
243244
245+ # Confirm the job is genuinely missing from ARQ's Redis queue before acting.
246+ # A healthy job waiting for a worker slot appears QUEUED in the DB and is also
247+ # present in Redis; only a crashed-enqueue job has the DB state without the
248+ # corresponding Redis entry.
249+ arq_status = await ArqJob (arq_job_id (job ), job_manager .redis ).status ()
250+ if arq_status in (ArqJobStatus .queued , ArqJobStatus .in_progress , ArqJobStatus .deferred ):
251+ logger .debug (
252+ f"QUEUED job { job .urn } is present in ARQ Redis (status={ arq_status .value } ); skipping cleanup" ,
253+ extra = manager .logging_context (),
254+ )
255+ continue
256+
244257 logger .warning (
245258 f"Detected stalled QUEUED job { job .urn } "
246- f"(created { job .created_at } , queued for { elapsed_minutes :.1f} minutes)" ,
259+ f"(created { job .created_at } , queued for { elapsed_minutes :.1f} minutes, "
260+ f"absent from ARQ Redis)" ,
247261 extra = manager .logging_context (),
248262 )
249263
@@ -263,9 +277,8 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
263277 running_jobs = job_manager .db .scalars (
264278 select (JobRun ).where (
265279 JobRun .status == JobStatus .RUNNING ,
266- (JobRun .started_at < running_threshold )
267- | (JobRun .started_at .is_ (None )), # Started long ago or missing timestamp
268- JobRun .finished_at .is_ (None ), # Not finished
280+ (JobRun .started_at < running_threshold ) | (JobRun .started_at .is_ (None )),
281+ JobRun .finished_at .is_ (None ),
269282 )
270283 ).all ()
271284
@@ -293,7 +306,6 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
293306 extra = manager .logging_context (),
294307 )
295308
296- # Use unified retry handler
297309 stall_reason = f"Job stalled in RUNNING state for { elapsed_minutes :.1f} minutes (likely worker crash)"
298310 await _handle_stalled_job_retry (job , manager , job_manager .redis , stall_reason , job_manager .db )
299311
@@ -311,7 +323,7 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
311323 pending_jobs = job_manager .db .scalars (
312324 select (JobRun ).where (
313325 JobRun .status == JobStatus .PENDING ,
314- JobRun .created_at < pending_threshold , # Created long ago
326+ JobRun .created_at < pending_threshold ,
315327 )
316328 ).all ()
317329
@@ -340,7 +352,6 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
340352 extra = manager .logging_context (),
341353 )
342354
343- # Use unified retry handler
344355 stall_reason = f"Job stalled in PENDING state for { elapsed_minutes :.1f} minutes"
345356 await _handle_stalled_job_retry (job , manager , job_manager .redis , stall_reason , job_manager .db )
346357
@@ -372,7 +383,6 @@ async def cleanup_stalled_jobs(ctx: dict, job_id: int, job_manager: JobManager)
372383 "pending_jobs" : cleaned_jobs ["pending" ],
373384 "timestamp" : now .isoformat (),
374385 "thresholds" : {
375- "queued_timeout_minutes" : QUEUED_TIMEOUT_MINUTES ,
376386 "running_timeout_minutes" : RUNNING_TIMEOUT_MINUTES ,
377387 "pending_timeout_minutes" : PENDING_TIMEOUT_MINUTES ,
378388 },
0 commit comments