2626
2727
2828class RunJobController (http .Controller ):
29- def _try_perform_job (self , env , job ):
30- """Try to perform the job."""
29+ @classmethod
30+ def _acquire_job (cls , env : api .Environment , job_uuid : str ) -> Job | None :
31+ """Acquire a job for execution.
32+
33+ - make sure it is in ENQUEUED state
34+ - mark it as STARTED and commit the state change
35+ - acquire the job lock
36+
37+ If successful, return the Job instance, otherwise return None. This
38+ function may fail to acquire the job is not in the expected state or is
39+ already locked by another worker.
40+ """
41+ env .cr .execute (
42+ "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
43+ "FOR NO KEY UPDATE SKIP LOCKED" ,
44+ (job_uuid , ENQUEUED ),
45+ )
46+ if not env .cr .fetchone ():
47+ _logger .warning (
48+ "was requested to run job %s, but it does not exist, "
49+ "or is not in state %s, or is being handled by another worker" ,
50+ job_uuid ,
51+ ENQUEUED ,
52+ )
53+ return None
54+ job = Job .load (env , job_uuid )
55+ assert job and job .state == ENQUEUED
3156 job .set_started ()
3257 job .store ()
3358 env .cr .commit ()
34- job .lock ()
59+ if not job .lock ():
60+ _logger .warning (
61+ "was requested to run job %s, but it could not be locked" ,
62+ job_uuid ,
63+ )
64+ return None
65+ return job
3566
67+ @classmethod
68+ def _try_perform_job (cls , env , job ):
69+ """Try to perform the job, mark it done and commit if successful."""
3670 _logger .debug ("%s started" , job )
37-
3871 job .perform ()
3972 # Triggers any stored computed fields before calling 'set_done'
4073 # so that will be part of the 'exec_time'
@@ -45,18 +78,20 @@ def _try_perform_job(self, env, job):
4578 env .cr .commit ()
4679 _logger .debug ("%s done" , job )
4780
48- def _enqueue_dependent_jobs (self , env , job ):
81+ @classmethod
82+ def _enqueue_dependent_jobs (cls , env , job ):
4983 tries = 0
5084 while True :
5185 try :
52- job .enqueue_waiting ()
86+ with job .env .cr .savepoint ():
87+ job .enqueue_waiting ()
5388 except OperationalError as err :
5489 # Automatically retry the typical transaction serialization
5590 # errors
5691 if err .pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY :
5792 raise
5893 if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE :
59- _logger .info (
94+ _logger .error (
6095 "%s, maximum number of tries reached to update dependencies" ,
6196 errorcodes .lookup (err .pgcode ),
6297 )
@@ -74,17 +109,8 @@ def _enqueue_dependent_jobs(self, env, job):
74109 else :
75110 break
76111
77- @http .route (
78- "/queue_job/runjob" ,
79- type = "http" ,
80- auth = "none" ,
81- save_session = False ,
82- readonly = False ,
83- )
84- def runjob (self , db , job_uuid , ** kw ):
85- http .request .session .db = db
86- env = http .request .env (user = SUPERUSER_ID )
87-
112+ @classmethod
113+ def _runjob (cls , env : api .Environment , job : Job ) -> None :
88114 def retry_postpone (job , message , seconds = None ):
89115 job .env .clear ()
90116 with registry (job .env .cr .dbname ).cursor () as new_cr :
@@ -93,26 +119,9 @@ def retry_postpone(job, message, seconds=None):
93119 job .set_pending (reset_retry = False )
94120 job .store ()
95121
96- # ensure the job to run is in the correct state and lock the record
97- env .cr .execute (
98- "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE" ,
99- (job_uuid , ENQUEUED ),
100- )
101- if not env .cr .fetchone ():
102- _logger .warning (
103- "was requested to run job %s, but it does not exist, "
104- "or is not in state %s" ,
105- job_uuid ,
106- ENQUEUED ,
107- )
108- return ""
109-
110- job = Job .load (env , job_uuid )
111- assert job and job .state == ENQUEUED
112-
113122 try :
114123 try :
115- self ._try_perform_job (env , job )
124+ cls ._try_perform_job (env , job )
116125 except OperationalError as err :
117126 # Automatically retry the typical transaction serialization
118127 # errors
@@ -141,7 +150,6 @@ def retry_postpone(job, message, seconds=None):
141150 # traceback in the logs we should have the traceback when all
142151 # retries are exhausted
143152 env .cr .rollback ()
144- return ""
145153
146154 except (FailedJobError , Exception ) as orig_exception :
147155 buff = StringIO ()
@@ -151,19 +159,18 @@ def retry_postpone(job, message, seconds=None):
151159 job .env .clear ()
152160 with registry (job .env .cr .dbname ).cursor () as new_cr :
153161 job .env = job .env (cr = new_cr )
154- vals = self ._get_failure_values (job , traceback_txt , orig_exception )
162+ vals = cls ._get_failure_values (job , traceback_txt , orig_exception )
155163 job .set_failed (** vals )
156164 job .store ()
157165 buff .close ()
158166 raise
159167
160168 _logger .debug ("%s enqueue depends started" , job )
161- self ._enqueue_dependent_jobs (env , job )
169+ cls ._enqueue_dependent_jobs (env , job )
162170 _logger .debug ("%s enqueue depends done" , job )
163171
164- return ""
165-
166- def _get_failure_values (self , job , traceback_txt , orig_exception ):
172+ @classmethod
173+ def _get_failure_values (cls , job , traceback_txt , orig_exception ):
167174 """Collect relevant data from exception."""
168175 exception_name = orig_exception .__class__ .__name__
169176 if hasattr (orig_exception , "__module__" ):
@@ -177,6 +184,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
177184 "exc_message" : exc_message ,
178185 }
179186
187+ @http .route (
188+ "/queue_job/runjob" ,
189+ type = "http" ,
190+ auth = "none" ,
191+ save_session = False ,
192+ readonly = False ,
193+ )
194+ def runjob (self , db , job_uuid , ** kw ):
195+ http .request .session .db = db
196+ env = http .request .env (user = SUPERUSER_ID )
197+ job = self ._acquire_job (env , job_uuid )
198+ if not job :
199+ return ""
200+ self ._runjob (env , job )
201+ return ""
202+
180203 # flake8: noqa: C901
181204 @http .route ("/queue_job/create_test_job" , type = "http" , auth = "user" )
182205 def create_test_job (
@@ -187,6 +210,7 @@ def create_test_job(
187210 description = "Test job" ,
188211 size = 1 ,
189212 failure_rate = 0 ,
213+ job_duration = 0 ,
190214 ):
191215 """Create test jobs
192216
@@ -207,6 +231,12 @@ def create_test_job(
207231 except (ValueError , TypeError ):
208232 failure_rate = 0
209233
234+ if job_duration is not None :
235+ try :
236+ job_duration = float (job_duration )
237+ except (ValueError , TypeError ):
238+ job_duration = 0
239+
210240 if not (0 <= failure_rate <= 1 ):
211241 raise BadRequest ("failure_rate must be between 0 and 1" )
212242
@@ -235,6 +265,7 @@ def create_test_job(
235265 channel = channel ,
236266 description = description ,
237267 failure_rate = failure_rate ,
268+ job_duration = job_duration ,
238269 )
239270
240271 if size > 1 :
@@ -245,6 +276,7 @@ def create_test_job(
245276 channel = channel ,
246277 description = description ,
247278 failure_rate = failure_rate ,
279+ job_duration = job_duration ,
248280 )
249281 return ""
250282
@@ -256,6 +288,7 @@ def _create_single_test_job(
256288 description = "Test job" ,
257289 size = 1 ,
258290 failure_rate = 0 ,
291+ job_duration = 0 ,
259292 ):
260293 delayed = (
261294 http .request .env ["queue.job" ]
@@ -265,7 +298,7 @@ def _create_single_test_job(
265298 channel = channel ,
266299 description = description ,
267300 )
268- ._test_job (failure_rate = failure_rate )
301+ ._test_job (failure_rate = failure_rate , job_duration = job_duration )
269302 )
270303 return "job uuid: %s" % (delayed .db_record ().uuid ,)
271304
@@ -279,6 +312,7 @@ def _create_graph_test_jobs(
279312 channel = None ,
280313 description = "Test job" ,
281314 failure_rate = 0 ,
315+ job_duration = 0 ,
282316 ):
283317 model = http .request .env ["queue.job" ]
284318 current_count = 0
@@ -301,7 +335,7 @@ def _create_graph_test_jobs(
301335 max_retries = max_retries ,
302336 channel = channel ,
303337 description = "%s #%d" % (description , current_count ),
304- )._test_job (failure_rate = failure_rate )
338+ )._test_job (failure_rate = failure_rate , job_duration = job_duration )
305339 )
306340
307341 grouping = random .choice (possible_grouping_methods )
0 commit comments