Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions base_bg/models/base_bg.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def _get_name(batch_key: str, queue_order: int) -> str:
}
job_kwargs = kwargs.copy()
job_kwargs["_record_ids"] = list(chunk_ids) if chunk_ids else []
job_vals["args_json"] = list(args) if args else []
job_vals["kwargs_json"] = job_kwargs
job_vals["args_json"] = self.check_serializable(list(args)) if args else []
job_vals["kwargs_json"] = self.check_serializable(job_kwargs)
job = self.env["bg.job"].create(job_vals)
jobs |= job
# Link previous job to current so sequence is established in one pass
Expand Down Expand Up @@ -105,8 +105,7 @@ def bg_enqueue(self, method: str, threshold: int | None = None, *args, **kwargs)
...
records.bg_enqueue('method_name', threshold=..., ...)

Delegates to the model API `bg_enqueue_records` using the calling
recordset as the `records` parameter.
Delegates to the model API `bg_enqueue_records` using the calling recordset as the `records` parameter.
"""
return self.bg_enqueue_records(self, method, threshold, *args, **kwargs)

Expand All @@ -127,8 +126,24 @@ def is_serializable(self, value: Any) -> bool:
:param value: The value to check
:return: True if serializable, False otherwise
"""
if isinstance(value, dict):
return all(self.is_serializable(k) and self.is_serializable(v) for k, v in value.items())
if isinstance(value, (list, tuple)):
return all(self.is_serializable(item) for item in value)
try:
json.dumps(value)
return True
except Exception:
return False

@api.model
def check_serializable(self, value: Any) -> Any:
"""
Ensures a value is JSON serializable.

:param value: The value to check
:raises ValueError: If the value is not serializable
"""
if not self.is_serializable(value):
raise ValueError(_("Value %s is not JSON serializable") % repr(value))
return value
122 changes: 75 additions & 47 deletions base_bg/models/bg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,8 @@ def action_cancel(self):
self.ensure_one()
if self.state != "enqueued":
raise UserError(_("Only enqueued jobs can be canceled"))
self.write(
{
"state": "canceled",
"cancel_time": fields.Datetime.now(),
}
)

(self | self._get_next_jobs()).cancel()

def action_retry(self):
"""
Expand All @@ -135,13 +131,9 @@ def action_retry(self):
self.ensure_one()
if self.state != "failed":
raise UserError(_("Only failed jobs can be retried"))
self.write(
{
"state": "enqueued",
"retry_count": 0,
"error_message": False,
}
)

self.enqueue(retry=True)
self._get_next_jobs().wait()

def action_open_records(self) -> dict:
"""
Expand Down Expand Up @@ -181,12 +173,7 @@ def run(self):
if self.state != "enqueued":
raise UserError(_("Only enqueued jobs can be executed"))

self.write(
{
"state": "running",
"start_time": fields.Datetime.now(),
}
)
self.start()
self.env.cr.commit() # pylint: disable=invalid-commit

try:
Expand All @@ -202,15 +189,7 @@ def run(self):

# Execute the method and capture the result
result = getattr(records, self.method)(*args, **kwargs)
self.write(
{
"state": "done",
"end_time": fields.Datetime.now(),
}
)
if self.next_job_id:
self.next_job_id.state = "enqueued"
self.env["base.bg"].sudo()._trigger_crons()
self.finish()
if result:
self._notify_user(result)
self.env.cr.commit() # pylint: disable=invalid-commit
Expand All @@ -219,6 +198,71 @@ def run(self):
self._handle_job_error(e)
raise

def enqueue(self, retry: bool = False):
"""Mark the job as enqueued."""
data = {
"state": "enqueued",
}
if retry:
data.update(
{
"retry_count": 0,
"error_message": False,
}
)
self.write(data)

def start(self):
"""Mark the job as running and set the start time."""
self.write(
{
"state": "running",
"start_time": fields.Datetime.now(),
}
)

def finish(self):
"""
Mark the job as done and set the end time.
Also enqueue the next job in the batch if it exists.
"""
self.write(
{
"state": "done",
"end_time": fields.Datetime.now(),
}
)
self.filtered("next_job_id").mapped("next_job_id").enqueue()
self.env["base.bg"].sudo()._trigger_crons()

def wait(self):
Comment thread
joa-adhoc marked this conversation as resolved.
"""Mark the job as waiting for the previous job to complete."""
self.write(
{
"state": "waiting",
}
)

def fail(self, error_message: str):
"""Mark the job as failed with an error message."""
self.write(
{
"state": "failed",
"end_time": fields.Datetime.now(),
"error_message": error_message,
}
)

def cancel(self, message: str | None = None):
"""Cancel the jobs received."""
self.write(
{
"state": "canceled",
"cancel_time": fields.Datetime.now(),
"error_message": message,
}
)

def _handle_job_error(self, error: Exception | str):
"""
Handle job execution error
Expand All @@ -228,28 +272,12 @@ def _handle_job_error(self, error: Exception | str):
error_msg = str(error)
self.retry_count += 1
if self.retry_count < self.max_retries:
self.write(
{
"state": "enqueued",
}
)
self.enqueue()
_logger.warning("Job %s failed, scheduling retry #%d: %s", self.name, self.retry_count, error_msg)
else:
# Max retries reached, mark as failed
self.write(
{
"state": "failed",
"end_time": fields.Datetime.now(),
"error_message": error_msg,
}
)
self._get_next_jobs().write(
{
"state": "canceled",
"cancel_time": fields.Datetime.now(),
"error_message": _("Canceled due to previous job failure in batch"),
}
)
self.fail(error_msg)
self._get_next_jobs().cancel(message=_("Previous job in batch failed"))
_logger.error("Job %s failed permanently: %s", self.name, error_msg)

def _notify_user(self, result: str):
Expand Down
43 changes: 39 additions & 4 deletions base_bg/tests/test_bg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,29 @@ def test_create_bg_job(self):

def test_job_cancel(self):
"""Basic test for job cancellation."""
job = self._create_job(name="Cancel Test Job")
# Create a chain of jobs
job1 = self._create_job(name="Cancel Test Job 1")
job2 = self._create_job(name="Cancel Test Job 2", state="waiting")
job3 = self._create_job(name="Cancel Test Job 3", state="waiting")
job1.next_job_id = job2
job2.next_job_id = job3

job.action_cancel()
self.assertEqual(job.state, "canceled")
# Cancel the first job
job1.action_cancel()

# Refresh from DB
job1 = self.BgJob.browse(job1.id)
job2 = self.BgJob.browse(job2.id)
job3 = self.BgJob.browse(job3.id)

# All jobs in the chain should be canceled
self.assertEqual(job1.state, "canceled")
self.assertEqual(job2.state, "canceled")
self.assertEqual(job3.state, "canceled")
# Canceled jobs should have cancel_time set
self.assertIsNotNone(job1.cancel_time)
self.assertIsNotNone(job2.cancel_time)
self.assertIsNotNone(job3.cancel_time)

def test_job_retry(self):
"""Basic test for job retry."""
Expand Down Expand Up @@ -309,7 +328,6 @@ def test_fail_first_job_cancels_following_batch_jobs(self):
self.assertEqual(job3.state, "canceled")
# Canceled jobs must have a cancel_time and an explanatory error_message
self.assertIsNotNone(job2.cancel_time)
self.assertIn("Canceled due to previous job failure", job2.error_message)

def test_bg_enqueue_helper_delegates_to_bg_enqueue_records(self):
"""bg_enqueue helper must delegate to bg_enqueue_records with self as records."""
Expand Down Expand Up @@ -365,3 +383,20 @@ def test_job_completion_enqueues_next_job(self):

job2.invalidate_recordset()
self.assertEqual(job2.state, "enqueued")

def test_check_serializable(self):
"""check_serializable must raise ValueError for unserializable objects."""
base_bg = self.env["base.bg"]
# env is not serializable
dict_data = {
"serializable": "ok",
"unserializable": self.env,
}
list_data = ["ok", self.env, 123]
function_data = lambda x: x
with self.assertRaises(ValueError):
base_bg.check_serializable(dict_data)
with self.assertRaises(ValueError):
base_bg.check_serializable(list_data)
with self.assertRaises(ValueError):
base_bg.check_serializable(function_data)