Skip to content

Commit 151dc60

Browse files
committed
Merge PR #910 into 16.0
Signed-off-by guewen
2 parents ced1d77 + a6459c4 commit 151dc60

File tree

8 files changed

+90
-23
lines changed

8 files changed

+90
-23
lines changed

queue_job/controllers/main.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
from psycopg2 import OperationalError, errorcodes
1414
from werkzeug.exceptions import BadRequest, Forbidden
1515

16-
from odoo import SUPERUSER_ID, _, api, http, registry, tools
16+
from odoo import SUPERUSER_ID, _, api, http, tools
1717
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
18+
from odoo.tools import config
1819

1920
from ..delay import chain, group
2021
from ..exception import FailedJobError, NothingToDoJob, RetryableJobError
@@ -38,8 +39,10 @@ def _prevent_commit(cr):
3839
def forbidden_commit(*args, **kwargs):
3940
raise RuntimeError(
4041
"Commit is forbidden in queue jobs. "
41-
"If the current job is a cron running as queue job, "
42-
"modify it to run as a normal cron."
42+
'You may want to enable the "Allow Commit" option on the Job '
43+
"Function. Alternatively, if the current job is a cron running as "
44+
"queue job, you can modify it to run as a normal cron. More details on: "
45+
"https://github.com/OCA/queue/wiki/Upgrade-warning:-commits-inside-jobs"
4346
)
4447

4548
original_commit = cr.commit
@@ -103,7 +106,8 @@ def _try_perform_job(cls, env, job):
103106
job.set_done()
104107
job.store()
105108
env.flush_all()
106-
env.cr.commit()
109+
if not config["test_enable"]:
110+
env.cr.commit()
107111
_logger.debug("%s done", job)
108112

109113
@classmethod
@@ -141,8 +145,7 @@ def _enqueue_dependent_jobs(cls, env, job):
141145
def _runjob(cls, env: api.Environment, job: Job) -> None:
142146
def retry_postpone(job, message, seconds=None):
143147
job.env.clear()
144-
with registry(job.env.cr.dbname).cursor() as new_cr:
145-
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
148+
with job.in_temporary_env():
146149
job.postpone(result=message, seconds=seconds)
147150
job.set_pending(reset_retry=False)
148151
job.store()
@@ -178,15 +181,15 @@ def retry_postpone(job, message, seconds=None):
178181
# traceback in the logs we should have the traceback when all
179182
# retries are exhausted
180183
env.cr.rollback()
184+
return
181185

182186
except (FailedJobError, Exception) as orig_exception:
183187
buff = StringIO()
184188
traceback.print_exc(file=buff)
185189
traceback_txt = buff.getvalue()
186190
_logger.error(traceback_txt)
187191
job.env.clear()
188-
with registry(job.env.cr.dbname).cursor() as new_cr:
189-
job.env = job.env(cr=new_cr)
192+
with job.in_temporary_env():
190193
vals = cls._get_failure_values(job, traceback_txt, orig_exception)
191194
job.set_failed(**vals)
192195
job.store()
@@ -240,6 +243,7 @@ def create_test_job(
240243
failure_rate=0,
241244
job_duration=0,
242245
commit_within_job=False,
246+
failure_retry_seconds=0,
243247
):
244248
"""Create test jobs
245249
@@ -287,6 +291,12 @@ def create_test_job(
287291
except ValueError:
288292
max_retries = None
289293

294+
if failure_retry_seconds is not None:
295+
try:
296+
failure_retry_seconds = int(failure_retry_seconds)
297+
except ValueError:
298+
failure_retry_seconds = 0
299+
290300
if size == 1:
291301
return self._create_single_test_job(
292302
priority=priority,
@@ -296,6 +306,7 @@ def create_test_job(
296306
failure_rate=failure_rate,
297307
job_duration=job_duration,
298308
commit_within_job=commit_within_job,
309+
failure_retry_seconds=failure_retry_seconds,
299310
)
300311

301312
if size > 1:
@@ -308,6 +319,7 @@ def create_test_job(
308319
failure_rate=failure_rate,
309320
job_duration=job_duration,
310321
commit_within_job=commit_within_job,
322+
failure_retry_seconds=failure_retry_seconds,
311323
)
312324
return ""
313325

@@ -321,6 +333,7 @@ def _create_single_test_job(
321333
failure_rate=0,
322334
job_duration=0,
323335
commit_within_job=False,
336+
failure_retry_seconds=0,
324337
):
325338
delayed = (
326339
http.request.env["queue.job"]
@@ -334,6 +347,7 @@ def _create_single_test_job(
334347
failure_rate=failure_rate,
335348
job_duration=job_duration,
336349
commit_within_job=commit_within_job,
350+
failure_retry_seconds=failure_retry_seconds,
337351
)
338352
)
339353
return "job uuid: %s" % (delayed.db_record().uuid,)
@@ -350,6 +364,7 @@ def _create_graph_test_jobs(
350364
failure_rate=0,
351365
job_duration=0,
352366
commit_within_job=False,
367+
failure_retry_seconds=0,
353368
):
354369
model = http.request.env["queue.job"]
355370
current_count = 0
@@ -376,6 +391,7 @@ def _create_graph_test_jobs(
376391
failure_rate=failure_rate,
377392
job_duration=job_duration,
378393
commit_within_job=commit_within_job,
394+
failure_retry_seconds=failure_retry_seconds,
379395
)
380396
)
381397

queue_job/job.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
import uuid
1010
import weakref
11+
from contextlib import contextmanager, nullcontext
1112
from datetime import datetime, timedelta
1213
from random import randint
1314

@@ -423,14 +424,9 @@ def __init__(
423424
raise TypeError("Job accepts only methods of Models")
424425

425426
recordset = func.__self__
426-
env = recordset.env
427427
self.method_name = func.__name__
428428
self.recordset = recordset
429429

430-
self.env = env
431-
self.job_model = self.env["queue.job"]
432-
self.job_model_name = "queue.job"
433-
434430
self.job_config = (
435431
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
436432
)
@@ -480,10 +476,10 @@ def __init__(
480476
self.exc_message = None
481477
self.exc_info = None
482478

483-
if "company_id" in env.context:
484-
company_id = env.context["company_id"]
479+
if "company_id" in self.env.context:
480+
company_id = self.env.context["company_id"]
485481
else:
486-
company_id = env.company.id
482+
company_id = self.env.company.id
487483
self.company_id = company_id
488484
self._eta = None
489485
self.eta = eta
@@ -508,7 +504,12 @@ def perform(self):
508504
"""
509505
self.retry += 1
510506
try:
511-
self.result = self.func(*tuple(self.args), **self.kwargs)
507+
if self.job_config.allow_commit:
508+
env_context_manager = self.in_temporary_env()
509+
else:
510+
env_context_manager = nullcontext()
511+
with env_context_manager:
512+
self.result = self.func(*tuple(self.args), **self.kwargs)
512513
except RetryableJobError as err:
513514
if err.ignore_retry:
514515
self.retry -= 1
@@ -528,6 +529,16 @@ def perform(self):
528529

529530
return self.result
530531

532+
@contextmanager
533+
def in_temporary_env(self):
534+
with self.env.registry.cursor() as new_cr:
535+
env = self.env
536+
self._env = env(cr=new_cr)
537+
try:
538+
yield
539+
finally:
540+
self._env = env
541+
531542
def _get_common_dependent_jobs_query(self):
532543
return """
533544
UPDATE queue_job
@@ -686,6 +697,14 @@ def __hash__(self):
686697
def db_record(self):
687698
return self.db_records_from_uuids(self.env, [self.uuid])
688699

700+
@property
701+
def env(self):
702+
return self.recordset.env
703+
704+
@env.setter
705+
def _env(self, env):
706+
self.recordset = self.recordset.with_env(env)
707+
689708
@property
690709
def func(self):
691710
recordset = self.recordset.with_context(job_uuid=self.uuid)
@@ -750,7 +769,7 @@ def model_name(self):
750769

751770
@property
752771
def user_id(self):
753-
return self.recordset.env.uid
772+
return self.env.uid
754773

755774
@property
756775
def eta(self):

queue_job/models/queue_job.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from odoo.addons.base_sparse_field.models.fields import Serialized
1313

1414
from ..delay import Graph
15-
from ..exception import JobError
15+
from ..exception import JobError, RetryableJobError
1616
from ..fields import JobSerialized
1717
from ..job import (
1818
CANCELLED,
@@ -459,10 +459,23 @@ def related_action_open_record(self):
459459
)
460460
return action
461461

462-
def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False):
462+
def _test_job(
463+
self,
464+
failure_rate=0,
465+
job_duration=0,
466+
commit_within_job=False,
467+
failure_retry_seconds=0,
468+
):
463469
_logger.info("Running test job.")
464470
if random.random() <= failure_rate:
465-
raise JobError("Job failed")
471+
if failure_retry_seconds:
472+
raise RetryableJobError(
473+
f"Retryable job failed, will be retried in "
474+
f"{failure_retry_seconds} seconds",
475+
seconds=failure_retry_seconds,
476+
)
477+
else:
478+
raise JobError("Job failed")
466479
if job_duration:
467480
time.sleep(job_duration)
468481
if commit_within_job:

queue_job/models/queue_job_function.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class QueueJobFunction(models.Model):
2828
"related_action_enable "
2929
"related_action_func_name "
3030
"related_action_kwargs "
31-
"job_function_id ",
31+
"job_function_id "
32+
"allow_commit",
3233
)
3334

3435
def _default_channel(self):
@@ -79,6 +80,12 @@ def _default_channel(self):
7980
"enable, func_name, kwargs.\n"
8081
"See the module description for details.",
8182
)
83+
allow_commit = fields.Boolean(
84+
help="Allows the job to commit transactions during execution. "
85+
"Under the hood, this executes the job in a new database cursor, "
86+
"which incurs an overhead as it requires an extra connection to "
87+
"the database. "
88+
)
8289

8390
@api.depends("model_id.model", "method")
8491
def _compute_name(self):
@@ -149,6 +156,7 @@ def job_default_config(self):
149156
related_action_func_name=None,
150157
related_action_kwargs={},
151158
job_function_id=None,
159+
allow_commit=False,
152160
)
153161

154162
def _parse_retry_pattern(self):
@@ -184,6 +192,7 @@ def job_config(self, name):
184192
related_action_func_name=config.related_action.get("func_name"),
185193
related_action_kwargs=config.related_action.get("kwargs", {}),
186194
job_function_id=config.id,
195+
allow_commit=config.allow_commit,
187196
)
188197

189198
def _retry_pattern_format_error_message(self):

queue_job/tests/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ def _add_job(self, *args, **kwargs):
276276

277277
def _prepare_context(self, job):
278278
# pylint: disable=context-overridden
279-
job_model = job.job_model.with_context({})
279+
job_model = job.env["queue.job"].with_context({})
280280
field_records = job_model._fields["records"]
281281
# Filter the context to simulate store/load of the job
282282
job.recordset = field_records.convert_to_write(job.recordset, job_model)

queue_job/tests/test_model_job_function.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def test_function_job_config(self):
4242
' "func_name": "related_action_foo",'
4343
' "kwargs": {"b": 1}}'
4444
),
45+
"allow_commit": True,
4546
}
4647
)
4748
self.assertEqual(
@@ -53,5 +54,6 @@ def test_function_job_config(self):
5354
related_action_func_name="related_action_foo",
5455
related_action_kwargs={"b": 1},
5556
job_function_id=job_function.id,
57+
allow_commit=True,
5658
),
5759
)

queue_job/tests/test_run_rob_controller.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@ def test_get_failure_values(self):
1515
self.assertEqual(
1616
rslt, {"exc_info": "info", "exc_name": "Exception", "exc_message": "zero"}
1717
)
18+
19+
def test_runjob_success(self):
20+
job = self.env["queue.job"].with_delay()._test_job()
21+
RunJobController._runjob(self.env, job)
22+
self.assertEqual(job.state, "done")
23+
self.assertEqual(job.db_record().state, "done")

queue_job/views/queue_job_function_views.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<field name="model_id" required="1" />
1212
<field name="method" required="1" />
1313
<field name="channel_id" />
14+
<field name="allow_commit" />
1415
<field name="edit_retry_pattern" widget="ace" />
1516
<field name="edit_related_action" widget="ace" />
1617
</group>
@@ -25,6 +26,7 @@
2526
<tree>
2627
<field name="name" />
2728
<field name="channel_id" />
29+
<field name="allow_commit" />
2830
</tree>
2931
</field>
3032
</record>

0 commit comments

Comments
 (0)