Skip to content

Commit 1196096

Browse files
committed
当工单不在执行态时,记录日志,不报错,避免q-task不断重试
1 parent eb99790 commit 1196096

File tree

2 files changed

+56
-4
lines changed

2 files changed

+56
-4
lines changed

sql/utils/execute_sql.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,42 @@
1717

1818
def execute(workflow_id, user=None):
1919
"""为延时或异步任务准备的execute, 传入工单ID和执行人信息"""
20+
audit_id = Audit.detail_by_workflow_id(
21+
workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW
22+
).audit_id
2023
# 使用当前读防止重复执行
2124
with transaction.atomic():
2225
workflow_detail = SqlWorkflow.objects.select_for_update().get(id=workflow_id)
2326
# 只有排队中和定时执行的数据才可以继续执行,否则直接抛错
2427
if workflow_detail.status not in ["workflow_queuing", "workflow_timingtask"]:
25-
raise Exception("工单状态不正确,禁止执行!")
28+
logger.error(f"工单号[{workflow_id}] 可能被任务调度器重试")
29+
Audit.add_log(
30+
audit_id=audit_id,
31+
operation_type=5,
32+
operation_type_desc="执行工单发生异常",
33+
operation_info="请检查工单执行情况",
34+
operator=user.username if user else "",
35+
operator_display=user.display if user else "系统",
36+
)
37+
result = ReviewSet(
38+
rows=[
39+
ReviewResult(
40+
id=1,
41+
errlevel=2,
42+
stagestatus="执行发生错误",
43+
errormessage=f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
44+
)
45+
],
46+
)
47+
result.error = (f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",)
48+
return result
49+
2650
# 将工单状态修改为执行中
2751
else:
2852
SqlWorkflow(id=workflow_id, status="workflow_executing").save(
2953
update_fields=["status"]
3054
)
3155
# 增加执行日志
32-
audit_id = Audit.detail_by_workflow_id(
33-
workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW
34-
).audit_id
3556
Audit.add_log(
3657
audit_id=audit_id,
3758
operation_type=5,

sql/utils/tests.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,19 @@ def setUp(self):
364364
db_name="some_db",
365365
syntax_type=1,
366366
)
367+
self.wf_executing = SqlWorkflow.objects.create(
368+
workflow_name="some_name",
369+
group_id=1,
370+
group_name="g1",
371+
engineer_display="",
372+
audit_auth_groups="some_group",
373+
create_time=datetime.datetime.now(),
374+
status="workflow_executing",
375+
is_backup=True,
376+
instance=self.ins,
377+
db_name="some_db",
378+
syntax_type=1,
379+
)
367380
SqlWorkflowContent.objects.create(
368381
workflow=self.wf,
369382
sql_content="some_sql",
@@ -409,6 +422,24 @@ def test_execute(self, _get_engine, _execute_workflow, _audit):
409422
operator_display="系统",
410423
)
411424

425+
@patch("sql.utils.execute_sql.Audit")
426+
@patch("sql.engines.mysql.MysqlEngine.execute_workflow")
427+
@patch("sql.engines.get_engine")
428+
def test_execute_in_executing(self, _get_engine, _execute_workflow, _audit):
429+
_audit.detail_by_workflow_id.return_value.audit_id = 1
430+
result = execute(self.wf_executing.id)
431+
_audit.add_log.assert_called_with(
432+
audit_id=1,
433+
operation_type=5,
434+
operation_type_desc="执行工单发生异常",
435+
operation_info="请检查工单执行情况",
436+
operator="",
437+
operator_display="系统",
438+
)
439+
assert result.error == (
440+
f"任务[{self.wf_executing.id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
441+
)
442+
412443
@patch("sql.utils.execute_sql.notify_for_execute")
413444
@patch("sql.utils.execute_sql.Audit")
414445
def test_execute_callback_success(self, _audit, _notify):

0 commit comments

Comments
 (0)