Skip to content

Commit 78f98f3

Browse files
hoangtrannmaurochip
authored andcommitted
[IMP] queue_job: requeue orphaned jobs
1 parent c9e92aa commit 78f98f3

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

queue_job/jobrunner/runner.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,35 @@ def _query_requeue_dead_jobs(self):
386386
RETURNING uuid
387387
"""
388388

389+
def _query_requeue_orphaned_jobs(self):
390+
"""Query to requeue jobs stuck in 'enqueued' state without a lock.
391+
392+
This handles the edge case where the runner marks a job as 'enqueued'
393+
but the HTTP request to start the job never reaches the Odoo server
394+
(e.g., due to server shutdown/crash between setting enqueued and
395+
the controller receiving the request). These jobs have no lock record
396+
because set_started() was never called, so they are invisible to
397+
_query_requeue_dead_jobs().
398+
"""
399+
return """
400+
UPDATE
401+
queue_job
402+
SET
403+
state='pending'
404+
WHERE
405+
state = 'enqueued'
406+
AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
407+
AND NOT EXISTS (
408+
SELECT
409+
1
410+
FROM
411+
queue_job_lock
412+
WHERE
413+
queue_job_id = queue_job.id
414+
)
415+
RETURNING uuid
416+
"""
417+
389418
def requeue_dead_jobs(self):
390419
"""
391420
Set started and enqueued jobs but not locked to pending
@@ -414,6 +443,14 @@ def requeue_dead_jobs(self):
414443
for (uuid,) in cr.fetchall():
415444
_logger.warning("Re-queued dead job with uuid: %s", uuid)
416445

446+
# Requeue orphaned jobs (enqueued but never started, no lock)
447+
query = self._query_requeue_orphaned_jobs()
448+
cr.execute(query)
449+
for (uuid,) in cr.fetchall():
450+
_logger.warning(
451+
"Re-queued orphaned job (enqueued without lock) with uuid: %s", uuid
452+
)
453+
417454

418455
class QueueJobRunner:
419456
def __init__(

queue_job/tests/test_requeue_dead_job.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,34 @@ def test_requeue_dead_jobs(self):
131131
# because we committed the cursor, the savepoint of the test method is
132132
# gone, and this would break TransactionCase cleanups
133133
self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)
134+
135+
def test_requeue_orphaned_jobs(self):
136+
uuid = "test_enqueued_job"
137+
queue_job = self._get_demo_job(uuid)
138+
job_obj = Job.load(self.env, queue_job.uuid)
139+
140+
# Only enqueued job, don't set it to started to simulate the scenario
141+
# that system shutdown before job is starting
142+
job_obj.set_enqueued()
143+
job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
144+
job_obj.store()
145+
146+
# job ins't actually picked up by the first requeue attempt
147+
query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
148+
self.env.cr.execute(query)
149+
uuids_requeued = self.env.cr.fetchall()
150+
self.assertFalse(uuids_requeued)
151+
152+
# job is picked up by the 2nd requeue attempt
153+
query = Database(self.env.cr.dbname)._query_requeue_orphaned_jobs()
154+
self.env.cr.execute(query)
155+
uuids_requeued = self.env.cr.fetchall()
156+
self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)
157+
158+
# clean up
159+
queue_job.unlink()
160+
self.env.cr.commit() # pylint: disable=E8102
161+
162+
# because we committed the cursor, the savepoint of the test method is
163+
# gone, and this would break TransactionCase cleanups
164+
self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)

0 commit comments

Comments
 (0)