Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bkflow/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class TaskOperationType(Enum):
# 任务节点操作
callback = _("回调")
retry = _("重试")
loop_retry = _("循环重试")
skip = _("跳过")
loop_skip = _("循环跳过")
skip_exg = _("跳过失败网关")
skip_cpg = _("跳过并行条件网关")
pause_subproc = _("暂停节点")
Expand Down
10 changes: 8 additions & 2 deletions bkflow/contrib/operation_record/decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making
蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available.
Expand Down Expand Up @@ -36,7 +35,14 @@ def wrapper(func):
def decorator(*args, **kwargs):
result = func(*args, **kwargs)
try:
recorder = OPERATION_RECORDER.recorders[recorder_type](operate_type, operate_source, extra_info)
adjusted_operate_type = operate_type
if kwargs.get("loop", False) and operate_type == "skip":
adjusted_operate_type = "loop_skip"
elif kwargs.get("loop", False) and operate_type == "retry":
adjusted_operate_type = "loop_retry"
recorder = OPERATION_RECORDER.recorders[recorder_type](
adjusted_operate_type, operate_source, extra_info
)
record_kwargs = {**kwargs, "func_result": result}
recorder.record(*args, **record_kwargs)
except Exception as e:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 异常处理过于宽泛:捕获所有异常可能掩盖严重错误。建议只捕获预期的异常类型(如 KeyError),让其他异常向上传播

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class PipelineTreeSubprocessConverter:
"timeout_config",
"auto_retry",
"template_node_id",
"loop_config",
}
DEFAULT_VALUES = {
"error_ignorable": False,
Expand All @@ -43,6 +44,7 @@ class PipelineTreeSubprocessConverter:
"data": {"subprocess": {"hook": False, "need_render": False, "value": {}}},
"version": "1.0.0",
},
"loop_config": {},
}

def __init__(self, pipeline_tree, constants=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ def _render_parent_parameters(self, pipeline_tree, parent_task):
root_pipeline_inputs = {
key: inputs.value for key, inputs in self.runtime.get_data_inputs(self.top_pipeline_id).items()
}
context = Context(self.runtime, context_values, root_pipeline_inputs)
if self.runtime.get_node(self.id).loop_strategy:
context: Context = Context(self.runtime, context_values, root_pipeline_inputs, self.inner_loop)
else:
context = Context(self.runtime, context_values, root_pipeline_inputs)
hydrated_context = context.hydrate(deformat=True)
self.logger.info(f"subprocess parent hydrated context: {hydrated_context}")

Expand Down
14 changes: 14 additions & 0 deletions bkflow/pipeline_plugins/static/variables/loop.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

(function () {
$.atoms.loop = [
{
tag_code: "loop",
type: "input",
attrs: {
name: gettext("循环变量"),
hookable: true,
validation: []
}
},
]
})();
41 changes: 41 additions & 0 deletions bkflow/pipeline_plugins/variables/collections/loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
TencentBlueKing is pleased to support the open source community by making
蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available.
Copyright (C) 2024 THL A29 Limited,
a Tencent company. All rights reserved.
Licensed under the MIT License (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the License for the
specific language governing permissions and limitations under the License.
We undertake not to change the open source license (MIT license) applicable
to the current version of the project delivered to anyone in the future.
"""

from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from pipeline.core.data.var import LazyVariable
from pipeline.core.flow.io import StringItemSchema

from bkflow.pipeline_plugins.variables.base import SelfExplainVariable


class Loop(LazyVariable, SelfExplainVariable):
code = "loop"
name = _("循环变量")
type = "general"
tag = "loop.loop"
form = "{}variables/{}.js".format(settings.STATIC_URL, code)
schema = StringItemSchema(description=_("循环变量"))

def get_value(self):
# 循环节点因引用
if hasattr(self, "inner_loop") and self.inner_loop != -1:
return self.value.split(",")[self.inner_loop - 1]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 索引越界风险:当 inner_loop 超过数组长度时会抛出 IndexError。建议添加边界检查:

values = self.value.split(",")
if self.inner_loop > len(values):
    raise ValueError(f"inner_loop {self.inner_loop} exceeds values length {len(values)}")
return values[self.inner_loop - 1]

# 普通节点引用
return self.value
9 changes: 9 additions & 0 deletions bkflow/pipeline_web/parser/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ def validate_web_pipeline_tree(web_pipeline_tree):
# constants key pattern validate
key_validation_errors = []
context_values = []

for name, act in web_pipeline_tree["activities"].items():
loop_config = act.get("loop_config", {})
if not loop_config.get("enable", False):
continue
loop_params = loop_config.get("loop_params", [])
if loop_config["loop_times"] != min([len(param.split(",")) for key, param in loop_params.items()]):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 空列表异常:当 loop_params 为空字典时,min() 会抛出 ValueError。建议先检查是否为空:

if not loop_params:
    raise exceptions.ParserWebTreeException("loop_params is empty")
lengths = [len(param.split(",")) for key, param in loop_params.items()]
if loop_config["loop_times"] != min(lengths):
    raise exceptions.ParserWebTreeException("loop times not matched")

raise exceptions.ParserWebTreeException("loop times not matched")

classification = classify_constants(web_pipeline_tree["constants"], is_subprocess=False)
for key, const in web_pipeline_tree["constants"].items():
key_value = const.get("key")
Expand Down
26 changes: 21 additions & 5 deletions bkflow/task/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,16 @@ def retry(self, operator: str, *args, **kwargs) -> OperationResult:
api_result = bamboo_engine_api.get_data(runtime=self.runtime, node_id=self.node_id)
if not api_result.result:
return api_result
loop_retry = kwargs.get("loop", False)
return bamboo_engine_api.retry_node(
runtime=self.runtime, node_id=self.node_id, data=kwargs.get("inputs") or None
runtime=self.runtime, node_id=self.node_id, data=kwargs.get("inputs") or None, loop_retry=loop_retry
)

@record_operation(RecordType.task_node.name, TaskOperationType.skip.name, TaskOperationSource.app.name)
@uniform_task_operation_result
def skip(self, operator: str, *args, **kwargs) -> OperationResult:
return bamboo_engine_api.skip_node(runtime=self.runtime, node_id=self.node_id)
loop_skip = kwargs.get("loop", False)
return bamboo_engine_api.skip_node(runtime=self.runtime, node_id=self.node_id, loop_skip=loop_skip)

@record_operation(RecordType.task_node.name, TaskOperationType.callback.name, TaskOperationSource.api.name)
@uniform_task_operation_result
Expand Down Expand Up @@ -461,8 +463,9 @@ def get_node_detail(
detail = detail[self.node_id]
# 默认只请求最后一次循环结果
format_bamboo_engine_status(detail)
node_info = self.runtime.get_node(self.node_id)
if loop is None or int(loop) >= detail["loop"]:
loop = detail["loop"]
loop = detail["loop"] if not node_info.loop_strategy else -1
hist_result = bamboo_engine_api.get_node_histories(runtime=runtime, node_id=self.node_id, loop=loop)
if not hist_result:
logger.exception("bamboo_engine_api.get_node_histories fail")
Expand All @@ -482,8 +485,21 @@ def get_node_detail(
detail["version"] = hist_result.data[-1]["version"]

for hist in detail["histories"]:
# 重试记录必然是因为失败才重试
hist.setdefault("state", bamboo_engine_states.FAILED)
raw_inputs = hist["inputs"].get("subprocess")
if raw_inputs:
inputs = raw_inputs["constants"]
inputs = {key[2:-1]: value.get("value") for key, value in inputs.items()}
hist["inputs"] = inputs

# 重试记录必然是因为失败才重试,设置了循环策略的节点只有成功才能接着循环
if node_info.loop_strategy:
if hist["skip"] or hist["outputs"].get("_result"):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 状态判断可能不够健壮:仅通过 outputs.get("_result") 判断成功可能不准确,_result 可能为 False 但节点实际成功。建议明确检查:

if hist["skip"] or (hist["outputs"].get("_result") is True):

state = bamboo_engine_states.FINISHED
else:
state = bamboo_engine_states.FAILED
else:
state = bamboo_engine_states.FAILED
hist.setdefault("state", state)
hist["history_id"] = hist["id"]
format_bamboo_engine_status(hist)
# 节点未执行
Expand Down
Binary file not shown.
Binary file not shown.
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ bk-notice-sdk==1.3.0

# engine service
boto3==1.26.133
bamboo-pipeline==3.29.5
# bamboo-pipeline==3.29.5
./packages/bamboo_engine-2.11.1rc1-py3-none-any.whl
./packages/bamboo_pipeline-3.29.6rc1-py3-none-any.whl
pydantic==1.10.6
django-extensions==3.2.1

Expand Down
Loading