Skip to content

Commit f038a2a

Browse files
committed
[IMP] queue_job: run jobs in cron workers
1 parent 178e98a commit f038a2a

File tree

5 files changed

+91
-5
lines changed

5 files changed

+91
-5
lines changed

queue_job/__manifest__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"wizards/queue_requeue_job_views.xml",
2121
"views/queue_job_menus.xml",
2222
"data/queue_data.xml",
23+
"data/queue_job_executor_cron.xml",
2324
"data/queue_job_function_data.xml",
2425
],
2526
"assets": {

queue_job/controllers/main.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def retry_postpone(job, message, seconds=None):
161161

162162
_logger.debug("%s enqueue depends started", job)
163163
cls._enqueue_dependent_jobs(env, job)
164+
env.cr.commit()
164165
_logger.debug("%s enqueue depends done", job)
165166

166167
@classmethod
@@ -185,13 +186,26 @@ def _get_failure_values(cls, job, traceback_txt, orig_exception):
185186
save_session=False,
186187
readonly=False,
187188
)
188-
def runjob(self, db, job_uuid, **kw):
189+
def runjob(self, db: str, job_uuid: str | None, **kw):
189190
http.request.session.db = db
190191
env = http.request.env(user=SUPERUSER_ID)
191-
job = self._acquire_job(env, job_uuid)
192-
if not job:
193-
return ""
194-
self._runjob(env, job)
192+
run_as = env["ir.config_parameter"].get_param("queue_job.run_as")
193+
if run_as == "cron":
194+
crons = env["ir.cron"].search(
195+
env["queue.job.executor"]._executor_cron_domain()
196+
)
197+
assert crons, "No queue_job executor cron found"
198+
for cron in crons:
199+
# TODO Awaking all of them is a bit wasteful although not very
200+
# costly. Ideally we should awaken only one that is not already
201+
# running.
202+
cron._trigger()
203+
else:
204+
# Run in this http worker
205+
job = self._acquire_job(env, job_uuid)
206+
if not job:
207+
return ""
208+
self._runjob(env, job)
195209
return ""
196210

197211
# flake8: noqa: C901
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<odoo>
3+
<record id="queue_job_executor_cron" model="ir.cron">
4+
<field name="name">Queue Job Executor</field>
5+
<field name="model_id" ref="model_queue_job_executor"/>
6+
<field name="state">code</field>
7+
<field name="code">model._execute_ready_jobs()</field>
8+
<field name='interval_number'>1</field>
9+
<field name='interval_type'>hours</field>
10+
</record>
11+
</odoo>

queue_job/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
from . import ir_model_fields
33
from . import queue_job
44
from . import queue_job_channel
5+
from . import queue_job_executor
56
from . import queue_job_function
67
from . import queue_job_lock
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright (c) 2026 ACSONE SA/NV (<http://acsone.eu>)
2+
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
3+
4+
import logging
5+
6+
from odoo import api, models
7+
8+
from ..controllers.main import RunJobController
9+
from ..job import Job
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
class QueueJobExecutor(models.AbstractModel):
14+
_name = "queue.job.executor"
15+
_description = "Queue Job Executor"
16+
17+
@api.model
18+
def _execute_job(self, job: Job) -> None:
19+
RunJobController._runjob(self.env, job)
20+
21+
@api.model
22+
def _executor_cron_domain(self) -> list:
23+
model_id = self.env["ir.model"]._get("queue.job.executor").id
24+
return [
25+
("model_id", "=", model_id),
26+
("state", "=", "code"),
27+
("code", "=", "model._execute_ready_jobs()"),
28+
]
29+
30+
@api.model
31+
def _ensure_executor_crons(self, capacity: int) -> None:
32+
"""Since Odoo cron can't run cron jobs in parallel, we create several.
33+
34+
`capacity` should be equal to the root channel capacity. If it's more,
35+
it's wasteful. If it's less, job will stay in ENQUEUED state longer than
36+
needed and loop back to PENDING due to the dead jobs requeuer.
37+
"""
38+
if capacity < 1:
39+
return
40+
ref_cron = self.env.ref("queue_job.queue_job_executor_cron")
41+
ref_cron.active = True
42+
# remove clones
43+
self.env["ir.cron"].with_context(active_test=False).search(
44+
self._executor_cron_domain() + [("id", "!=", ref_cron.id)]
45+
).unlink()
46+
# re-create desired number of clones
47+
for _i in range(1, capacity):
48+
ref_cron.copy()
49+
50+
@api.model
51+
def _enable_executor_cron(self, capacity: int) -> None:
52+
self._ensure_executor_crons(capacity)
53+
self.env["ir.config_parameter"].set_param("queue_job.run_as", "cron")
54+
55+
@api.model
56+
def _execute_ready_jobs(self) -> None:
57+
while job := RunJobController._acquire_job(self.env):
58+
_logger.debug("executor cron running queue job %s", job.uuid)
59+
self._execute_job(job)

0 commit comments

Comments
 (0)