Skip to content

Commit c6c9349

Browse files
committed
Merge PR #872 into 19.0
Signed-off-by sbidoul
2 parents 1f23acd + 16bb31b commit c6c9349

File tree

15 files changed

+275
-117
lines changed

15 files changed

+275
-117
lines changed

queue_job/__manifest__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
},
3030
"installable": True,
3131
"development_status": "Mature",
32-
"maintainers": ["guewen"],
32+
"maintainers": ["guewen", "sbidoul"],
3333
"post_init_hook": "post_init_hook",
3434
"post_load": "post_load",
3535
}

queue_job/controllers/main.py

Lines changed: 78 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,48 @@
2727

2828

2929
class RunJobController(http.Controller):
30-
def _try_perform_job(self, env, job):
31-
"""Try to perform the job."""
30+
@classmethod
31+
def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
32+
"""Acquire a job for execution.
33+
34+
- make sure it is in ENQUEUED state
35+
- mark it as STARTED and commit the state change
36+
- acquire the job lock
37+
38+
If successful, return the Job instance, otherwise return None. This
39+
function may fail to acquire the job is not in the expected state or is
40+
already locked by another worker.
41+
"""
42+
env.cr.execute(
43+
"SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
44+
"FOR NO KEY UPDATE SKIP LOCKED",
45+
(job_uuid, ENQUEUED),
46+
)
47+
if not env.cr.fetchone():
48+
_logger.warning(
49+
"was requested to run job %s, but it does not exist, "
50+
"or is not in state %s, or is being handled by another worker",
51+
job_uuid,
52+
ENQUEUED,
53+
)
54+
return None
55+
job = Job.load(env, job_uuid)
56+
assert job and job.state == ENQUEUED
3257
job.set_started()
3358
job.store()
3459
env.cr.commit()
35-
job.lock()
60+
if not job.lock():
61+
_logger.warning(
62+
"was requested to run job %s, but it could not be locked",
63+
job_uuid,
64+
)
65+
return None
66+
return job
3667

68+
@classmethod
69+
def _try_perform_job(cls, env, job):
70+
"""Try to perform the job, mark it done and commit if successful."""
3771
_logger.debug("%s started", job)
38-
3972
job.perform()
4073
# Triggers any stored computed fields before calling 'set_done'
4174
# so that will be part of the 'exec_time'
@@ -46,18 +79,20 @@ def _try_perform_job(self, env, job):
4679
env.cr.commit()
4780
_logger.debug("%s done", job)
4881

49-
def _enqueue_dependent_jobs(self, env, job):
82+
@classmethod
83+
def _enqueue_dependent_jobs(cls, env, job):
5084
tries = 0
5185
while True:
5286
try:
53-
job.enqueue_waiting()
87+
with job.env.cr.savepoint():
88+
job.enqueue_waiting()
5489
except OperationalError as err:
5590
# Automatically retry the typical transaction serialization
5691
# errors
5792
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
5893
raise
5994
if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE:
60-
_logger.info(
95+
_logger.error(
6196
"%s, maximum number of tries reached to update dependencies",
6297
errorcodes.lookup(err.pgcode),
6398
)
@@ -75,17 +110,8 @@ def _enqueue_dependent_jobs(self, env, job):
75110
else:
76111
break
77112

78-
@http.route(
79-
"/queue_job/runjob",
80-
type="http",
81-
auth="none",
82-
save_session=False,
83-
readonly=False,
84-
)
85-
def runjob(self, db, job_uuid, **kw):
86-
http.request.session.db = db
87-
env = http.request.env(user=SUPERUSER_ID)
88-
113+
@classmethod
114+
def _runjob(cls, env: api.Environment, job: Job) -> None:
89115
def retry_postpone(job, message, seconds=None):
90116
job.env.clear()
91117
with Registry(job.env.cr.dbname).cursor() as new_cr:
@@ -94,26 +120,9 @@ def retry_postpone(job, message, seconds=None):
94120
job.set_pending(reset_retry=False)
95121
job.store()
96122

97-
# ensure the job to run is in the correct state and lock the record
98-
env.cr.execute(
99-
"SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE",
100-
(job_uuid, ENQUEUED),
101-
)
102-
if not env.cr.fetchone():
103-
_logger.warning(
104-
"was requested to run job %s, but it does not exist, "
105-
"or is not in state %s",
106-
job_uuid,
107-
ENQUEUED,
108-
)
109-
return ""
110-
111-
job = Job.load(env, job_uuid)
112-
assert job and job.state == ENQUEUED
113-
114123
try:
115124
try:
116-
self._try_perform_job(env, job)
125+
cls._try_perform_job(env, job)
117126
except OperationalError as err:
118127
# Automatically retry the typical transaction serialization
119128
# errors
@@ -131,7 +140,6 @@ def retry_postpone(job, message, seconds=None):
131140
# traceback in the logs we should have the traceback when all
132141
# retries are exhausted
133142
env.cr.rollback()
134-
return ""
135143

136144
except (FailedJobError, Exception) as orig_exception:
137145
buff = StringIO()
@@ -141,19 +149,18 @@ def retry_postpone(job, message, seconds=None):
141149
job.env.clear()
142150
with Registry(job.env.cr.dbname).cursor() as new_cr:
143151
job.env = job.env(cr=new_cr)
144-
vals = self._get_failure_values(job, traceback_txt, orig_exception)
152+
vals = cls._get_failure_values(job, traceback_txt, orig_exception)
145153
job.set_failed(**vals)
146154
job.store()
147155
buff.close()
148156
raise
149157

150158
_logger.debug("%s enqueue depends started", job)
151-
self._enqueue_dependent_jobs(env, job)
159+
cls._enqueue_dependent_jobs(env, job)
152160
_logger.debug("%s enqueue depends done", job)
153161

154-
return ""
155-
156-
def _get_failure_values(self, job, traceback_txt, orig_exception):
162+
@classmethod
163+
def _get_failure_values(cls, job, traceback_txt, orig_exception):
157164
"""Collect relevant data from exception."""
158165
exception_name = orig_exception.__class__.__name__
159166
if hasattr(orig_exception, "__module__"):
@@ -167,6 +174,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
167174
"exc_message": exc_message,
168175
}
169176

177+
@http.route(
178+
"/queue_job/runjob",
179+
type="http",
180+
auth="none",
181+
save_session=False,
182+
readonly=False,
183+
)
184+
def runjob(self, db, job_uuid, **kw):
185+
http.request.session.db = db
186+
env = http.request.env(user=SUPERUSER_ID)
187+
job = self._acquire_job(env, job_uuid)
188+
if not job:
189+
return ""
190+
self._runjob(env, job)
191+
return ""
192+
170193
# flake8: noqa: C901
171194
@http.route("/queue_job/create_test_job", type="http", auth="user")
172195
def create_test_job(
@@ -177,6 +200,7 @@ def create_test_job(
177200
description="Test job",
178201
size=1,
179202
failure_rate=0,
203+
job_duration=0,
180204
):
181205
if not http.request.env.user.has_group("base.group_erp_manager"):
182206
raise Forbidden(http.request.env._("Access Denied"))
@@ -187,6 +211,12 @@ def create_test_job(
187211
except (ValueError, TypeError):
188212
failure_rate = 0
189213

214+
if job_duration is not None:
215+
try:
216+
job_duration = float(job_duration)
217+
except (ValueError, TypeError):
218+
job_duration = 0
219+
190220
if not (0 <= failure_rate <= 1):
191221
raise BadRequest("failure_rate must be between 0 and 1")
192222

@@ -215,6 +245,7 @@ def create_test_job(
215245
channel=channel,
216246
description=description,
217247
failure_rate=failure_rate,
248+
job_duration=job_duration,
218249
)
219250

220251
if size > 1:
@@ -225,6 +256,7 @@ def create_test_job(
225256
channel=channel,
226257
description=description,
227258
failure_rate=failure_rate,
259+
job_duration=job_duration,
228260
)
229261
return ""
230262

@@ -236,6 +268,7 @@ def _create_single_test_job(
236268
description="Test job",
237269
size=1,
238270
failure_rate=0,
271+
job_duration=0,
239272
):
240273
delayed = (
241274
http.request.env["queue.job"]
@@ -245,7 +278,7 @@ def _create_single_test_job(
245278
channel=channel,
246279
description=description,
247280
)
248-
._test_job(failure_rate=failure_rate)
281+
._test_job(failure_rate=failure_rate, job_duration=job_duration)
249282
)
250283
return f"job uuid: {delayed.db_record().uuid}"
251284

@@ -259,6 +292,7 @@ def _create_graph_test_jobs(
259292
channel=None,
260293
description="Test job",
261294
failure_rate=0,
295+
job_duration=0,
262296
):
263297
model = http.request.env["queue.job"]
264298
current_count = 0
@@ -281,7 +315,7 @@ def _create_graph_test_jobs(
281315
max_retries=max_retries,
282316
channel=channel,
283317
description=f"{description} #{current_count}",
284-
)._test_job(failure_rate=failure_rate)
318+
)._test_job(failure_rate=failure_rate, job_duration=job_duration)
285319
)
286320

287321
grouping = random.choice(possible_grouping_methods)

queue_job/job.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def load_many(cls, env, job_uuids):
222222
recordset = cls.db_records_from_uuids(env, job_uuids)
223223
return {cls._load_from_db_record(record) for record in recordset}
224224

225-
def add_lock_record(self):
225+
def add_lock_record(self) -> None:
226226
"""
227227
Create row in db to be locked while the job is being performed.
228228
"""
@@ -242,13 +242,11 @@ def add_lock_record(self):
242242
[self.uuid],
243243
)
244244

245-
def lock(self):
246-
"""
247-
Lock row of job that is being performed
245+
def lock(self) -> bool:
246+
"""Lock row of job that is being performed.
248247
249-
If a job cannot be locked,
250-
it means that the job wasn't started,
251-
a RetryableJobError is thrown.
248+
Return False if a job cannot be locked: it means that the job is not in
249+
STARTED state or is already locked by another worker.
252250
"""
253251
self.env.cr.execute(
254252
"""
@@ -264,18 +262,15 @@ def lock(self):
264262
queue_job
265263
WHERE
266264
uuid = %s
267-
AND state='started'
265+
AND state = %s
268266
)
269-
FOR UPDATE;
267+
FOR NO KEY UPDATE SKIP LOCKED;
270268
""",
271-
[self.uuid],
269+
[self.uuid, STARTED],
272270
)
273271

274272
# 1 job should be locked
275-
if 1 != len(self.env.cr.fetchall()):
276-
raise RetryableJobError(
277-
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
278-
)
273+
return bool(self.env.cr.fetchall())
279274

280275
@classmethod
281276
def _load_from_db_record(cls, job_db_record):

queue_job/jobrunner/runner.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -357,23 +357,26 @@ def _query_requeue_dead_jobs(self):
357357
ELSE exc_info
358358
END)
359359
WHERE
360-
id in (
361-
SELECT
362-
queue_job_id
363-
FROM
364-
queue_job_lock
365-
WHERE
366-
queue_job_id in (
367-
SELECT
368-
id
369-
FROM
370-
queue_job
371-
WHERE
372-
state IN ('enqueued','started')
373-
AND date_enqueued <
374-
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
375-
)
376-
FOR UPDATE SKIP LOCKED
360+
state IN ('enqueued','started')
361+
AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
362+
AND (
363+
id in (
364+
SELECT
365+
queue_job_id
366+
FROM
367+
queue_job_lock
368+
WHERE
369+
queue_job_lock.queue_job_id = queue_job.id
370+
FOR NO KEY UPDATE SKIP LOCKED
371+
)
372+
OR NOT EXISTS (
373+
SELECT
374+
1
375+
FROM
376+
queue_job_lock
377+
WHERE
378+
queue_job_lock.queue_job_id = queue_job.id
379+
)
377380
)
378381
RETURNING uuid
379382
"""
@@ -396,6 +399,12 @@ def requeue_dead_jobs(self):
396399
However, when the Odoo server crashes or is otherwise force-stopped,
397400
running jobs are interrupted while the runner has no chance to know
398401
they have been aborted.
402+
403+
This also handles orphaned jobs (enqueued but never started, no lock).
404+
This edge case occurs when the runner marks a job as 'enqueued'
405+
but the HTTP request to start the job never reaches the Odoo server
406+
(e.g., due to server shutdown/crash between setting enqueued and
407+
the controller receiving the request).
399408
"""
400409

401410
with closing(self.conn.cursor()) as cr:

0 commit comments

Comments
 (0)