Skip to content

Commit d602539

Browse files
committed
refactor(backend): flow 重构 #6423
# Reviewed, transaction id: 17152
1 parent e3dfa20 commit d602539

File tree

6 files changed

+633
-0
lines changed

6 files changed

+633
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
4+
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
5+
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at https://opensource.org/licenses/MIT
7+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
8+
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
specific language governing permissions and limitations under the License.
10+
"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
4+
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
5+
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at https://opensource.org/licenses/MIT
7+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
8+
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
specific language governing permissions and limitations under the License.
10+
"""
11+
import copy
12+
import json
13+
import logging
14+
import re
15+
from abc import ABCMeta
16+
from typing import Dict, Optional
17+
18+
from django.utils.translation import ugettext as _
19+
from pipeline.core.flow.activity import StaticIntervalGenerator
20+
21+
from backend import env
22+
from backend.components import JobApi
23+
from backend.flow.consts import SUCCESS_LIST, WriteContextOpType
24+
from backend.flow.plugins.components.collections.base.base_service import BaseService
25+
26+
logger = logging.getLogger("flow")
27+
ACTUATOR_CONTEXT_RE = re.compile("<ctx>(?P<context>.+?)</ctx>") # 非贪婪模式,只匹配第一次出现的自定义tag
28+
29+
30+
class BkJobService(BaseService, metaclass=ABCMeta):
31+
__need_schedule__ = True
32+
interval = StaticIntervalGenerator(5)
33+
34+
@staticmethod
35+
def __status__(instance_id: str) -> Optional[Dict]:
36+
"""
37+
获取任务状态
38+
"""
39+
payload = {
40+
"bk_biz_id": env.JOB_BLUEKING_BIZ_ID,
41+
"job_instance_id": instance_id,
42+
"return_ip_result": True,
43+
}
44+
resp = JobApi.get_job_instance_status(payload, raw=True)
45+
return resp
46+
47+
def get_job_log(
48+
self,
49+
job_instance_id: int,
50+
step_instance_id: int,
51+
ip_dict: dict,
52+
):
53+
"""
54+
获取任务日志
55+
"""
56+
payload = {
57+
"bk_biz_id": env.JOB_BLUEKING_BIZ_ID,
58+
"job_instance_id": job_instance_id,
59+
"step_instance_id": step_instance_id,
60+
}
61+
return JobApi.get_job_instance_ip_log({**payload, **ip_dict}, raw=True)
62+
63+
def __get_target_ip_context(
64+
self,
65+
job_instance_id: int,
66+
step_instance_id: int,
67+
ip_dict: dict,
68+
data,
69+
trans_data,
70+
write_payload_var: str,
71+
write_op: str,
72+
):
73+
"""
74+
对单个节点获取执行后log,并赋值给定义好流程上下文的trans_data
75+
write_op 控制写入变量的方式,rewrite是默认值,代表覆盖写入;append代表以{"ip":xxx} 形式追加里面变量里面
76+
"""
77+
resp = self.get_job_log(job_instance_id, step_instance_id, ip_dict)
78+
if not resp["result"]:
79+
# 结果返回异常,则异常退出
80+
return False
81+
try:
82+
# 以dict形式追加写入
83+
result = json.loads(re.search(ACTUATOR_CONTEXT_RE, resp["data"]["log_content"]).group("context"))
84+
if write_op == WriteContextOpType.APPEND.value:
85+
context = copy.deepcopy(getattr(trans_data, write_payload_var))
86+
ip = ip_dict["ip"]
87+
if context:
88+
context[ip] = copy.deepcopy(result)
89+
else:
90+
context = {ip: copy.deepcopy(result)}
91+
92+
setattr(trans_data, write_payload_var, copy.deepcopy(context))
93+
94+
else:
95+
# 默认覆盖写入
96+
setattr(trans_data, write_payload_var, copy.deepcopy(result))
97+
data.outputs["trans_data"] = trans_data
98+
99+
return True
100+
101+
except Exception as e:
102+
self.log_error(_("[写入上下文结果失败] failed: {}").format(e))
103+
return False
104+
105+
def _schedule(self, data, parent_data, callback_data=None) -> bool:
106+
ext_result = data.get_one_of_outputs("ext_result")
107+
exec_ips = data.get_one_of_outputs("exec_ips")
108+
kwargs = data.get_one_of_inputs("kwargs")
109+
write_payload_var = data.get_one_of_inputs("write_payload_var")
110+
trans_data = data.get_one_of_inputs("trans_data")
111+
112+
node_name = kwargs["node_name"]
113+
114+
# 在轮询的时候ext_result都不会改变,考虑收敛日志
115+
if not kwargs.get(f"{node_name}_ext_result_cached"):
116+
kwargs[f"{node_name}_ext_result_cached"] = True
117+
self.log_info(f"[{node_name}] ext_result: {ext_result}")
118+
119+
if isinstance(ext_result, bool):
120+
# ext_result 为 布尔类型 表示 不需要任务是同步进行,不需要调用api去监听任务状态
121+
self.finish_schedule()
122+
return ext_result
123+
124+
if not ext_result["result"]:
125+
# 调用结果检测到失败
126+
self.log_error(f"[{node_name}] schedule status failed: {ext_result['error']}")
127+
return False
128+
129+
job_instance_id = ext_result["data"]["job_instance_id"]
130+
resp = self.__status__(job_instance_id)
131+
132+
# 获取任务状态:
133+
# """
134+
# 1.未执行; 2.正在执行; 3.执行成功; 4.执行失败; 5.跳过; 6.忽略错误;
135+
# 7.等待用户; 8.手动结束; 9.状态异常; 10.步骤强制终止中; 11.步骤强制终止成功; 12.步骤强制终止失败
136+
# """
137+
if not (resp["result"] and resp["data"]["finished"]):
138+
self.log_info(_("[{}] 任务正在执行🤔").format(node_name))
139+
return True
140+
141+
# 获取job的状态
142+
job_status = resp["data"]["job_instance"]["status"]
143+
144+
# 默认dbm调用job是一个步骤,所以统一获取第一个步骤id
145+
step_instance_id = resp["data"]["step_instance_list"][0]["step_instance_id"]
146+
147+
# 获取本次执行的所有ip信息
148+
ip_dicts = []
149+
if exec_ips:
150+
for i in exec_ips:
151+
if isinstance(i, dict) and i.get("ip") is not None and i.get("bk_cloud_id") is not None:
152+
ip_dicts.append(i)
153+
else:
154+
# 兼容之前代码
155+
ip_dicts.append({"bk_cloud_id": kwargs["bk_cloud_id"], "ip": i})
156+
157+
# 判断本次job任务是否异常
158+
if job_status not in SUCCESS_LIST:
159+
self.log_info("{} job status: {}".format(node_name, resp))
160+
self.log_info(_("[{}] 任务调度失败😱").format(node_name))
161+
162+
# 转载job脚本节点报错日志,兼容多IP执行场景的日志输出
163+
if ip_dicts:
164+
for ip_dict in ip_dicts:
165+
resp = self.get_job_log(job_instance_id, step_instance_id, ip_dict)
166+
if resp.get("result"):
167+
self.log_error(f"{ip_dict}:{resp['data']['log_content']}")
168+
169+
self.finish_schedule()
170+
return False
171+
172+
self.log_info(_("[{}]任务调度成功🥳︎").format(node_name))
173+
if not write_payload_var:
174+
self.finish_schedule()
175+
return True
176+
177+
# 写入上下文,支持多IP传入上下文捕捉场景
178+
# 写入上下文的位置是trans_data.{write_payload_var} 属性上,分别执行覆盖写入和追加写入
179+
# 覆盖写入是会直接赋值给上下文属性上,不管之前有什么值,这是默认写入 WriteContextOpType.REWRITE
180+
# 追加写入是特殊行为,如果想IP日志结果都写入,可以选择追加写入,上下文变成list,每个元素是{"ip":"log"} WriteContextOpType.APPEND
181+
self.log_info(_("[{}]该节点需要获取执行后日志,赋值到流程上下文").format(node_name))
182+
183+
is_false = False
184+
for ip_dict in ip_dicts:
185+
if not self.__get_target_ip_context(
186+
job_instance_id=job_instance_id,
187+
step_instance_id=step_instance_id,
188+
ip_dict=ip_dict,
189+
data=data,
190+
trans_data=trans_data,
191+
write_payload_var=write_payload_var,
192+
write_op=kwargs.get("write_op", WriteContextOpType.REWRITE.value),
193+
):
194+
self.log_error(_("[{}] 获取执行后写入流程上下文失败,ip:[{}]").format(node_name, ip_dict["ip"]))
195+
is_false = True
196+
197+
if is_false:
198+
self.finish_schedule()
199+
return False
200+
201+
self.finish_schedule()
202+
return True

0 commit comments

Comments
 (0)