Skip to content

Conversation

@guohelu
Copy link
Collaborator

@guohelu guohelu commented Nov 21, 2025

Reviewed, transaction id: 65429

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 代码审查总结

此 PR 实现了流程并发控制功能。发现几个需要修复的关键问题:

🚨 严重问题:

  • count_running_tasks() 存在 N+1 查询和竞态条件,可能导致并发计数不准确
  • Redis 操作缺少错误处理,lpop/rpush 失败可能导致任务丢失
  • extra_info 直接赋值会覆盖现有数据(应使用 update)

⚡ 性能问题:

  • 每次操作都调用 get_space_infos 查询配置,建议添加缓存层

⚠️ 逻辑错误:

  • callback.py:83 使用 > 而非 >= 判断并发限制,与其他位置不一致

✨ 改进建议:

  • count_running_tasks 中使用 select_for_update 或原子计数器
  • 为 space config 添加本地缓存(TTL 60s)
  • 统一使用 task.extra_info.update() 而非直接赋值

建议在合并前修复竞态条件和数据丢失风险。

# Reviewed, transaction id: 65437
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 并发判断使用 > 而其他位置用 >=,可能导致允许 N+1 个任务运行。应统一使用 >= 保持一致性。

📍 bkflow/task/domains/callback.py:83

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚡ 多处重复调用 get_space_infos 查询配置,建议添加 60s TTL 缓存层或在函数入口处统一获取。

📍 bkflow/task/views.py:200

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 count_running_tasks() 存在竞态条件:多个并发请求可能同时计数导致超限。建议在查询时使用 select_for_update() 或改用 Redis 原子计数器。

📍 bkflow/task/utils.py:215

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 Redis 操作缺少异常处理:lpop 失败可能导致任务丢失。建议添加 try-except 并记录错误日志。

📍 bkflow/task/utils.py:195

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 rpush 失败时任务状态已更新但未入队,导致数据不一致。建议在 Redis 操作成功后再更新 extra_info

📍 bkflow/task/utils.py:183

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 代码审查更新

✅ 已改进的部分:

  • extra_info 改用 .update() 避免覆盖现有数据
  • TaskCallBacker 优化了参数传递减少查询

🚨 仍需修复的关键问题:

  1. 竞态条件: count_running_tasks() 仍存在并发计数不准确风险 (utils.py:215)
  2. 数据丢失风险: Redis 操作缺少异常处理 (utils.py:183, utils.py:195)
  3. 并发判断不一致: callback.py:83 使用 > 而其他位置用 >=
  4. 性能问题: 多处重复调用 get_space_infos 未缓存 (views.py:200, tasks.py:201, callback.py:76, v1_0_0.py:215)

建议在合并前修复竞态条件和错误处理。

{"space_id": task_instance.space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
concurrency_control = space_configs.get("concurrency_control", 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在代码里会越来越多需要读取空间配置来做调度控制的逻辑,这里获取空间配置的逻辑最好抽象一下,并作为后台worker的一个全局单例来复用配置,空间配置不是一个经常变的数据,可以适当加一个缓存(1分钟),来防止任务并发量大后,频繁调接口访问interface的问题

return task_instance


def count_running_tasks(task_instance):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个实现还是太重了,并发量一大,每个任务都需要统计一遍当前task_instances的数据量,db扛不住的

create_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z")
start_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z")
finish_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z")
is_wait = serializers.SerializerMethodField()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里改成is_waiting吧

node_operation = TaskNodeOperation(task_instance, task_data.get("node_id"))
operation_method = getattr(node_operation, operation, None)

operation_method(operator=operation, **task_data.get("node_data", {}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里确认下是否是一个同步操作,如果是同步操作,这里的任务从队列取出并执行的动作,不应该在bamboo_engine_eri_post_set_state_handler这个用于设置状态的逻辑里(而且这里会导致队列里的worker始终跟上一个无关任务在同一个worker中,使任务不够分散),应该重新把任务通过celery来分发出去执行

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在分发任务出去执行时,需要关注下trace的传递是正确的,否则对于这种api或者页面触发变成后台触发的任务,就会丢掉trace相关的信息了


template_id = TaskInstance.objects.get(instance_id=instance_id).template_id
redis_key = f"task_wait_{template_id}"
task_json = redis_cli.lpop(redis_key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里取出任务后,如果调度有问题,任务就丢了,需要加一下保证机制



@redis_inst_check
def push_task_to_queue(redis_cli, task, operation, node_id=None, data=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要加一个最大限制,以免打爆redis的队列,如果超出这个上限了,可以在触发执行任务的地方给用户返回错误,告知当前流程达到最大的执行等待上限

from bkflow.contrib.api.collections.interface import InterfaceModuleClient
from bkflow.exceptions import ValidationError
from bkflow.pipeline_plugins.components.collections.base import BKFlowBaseService
from bkflow.task.utils import push_task_to_queue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

整体方案还需要补充一些可观测性的指标,来发现当前系统中,某些流程的队列在不断增加,有达到上限的风险,这个可以通过with start_trace里注入流程当前队列长度来实现

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants