-
Notifications
You must be signed in to change notification settings - Fork 18
feat: 新增流程创建任务并发控制 --story=128208486 #539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
# Reviewed, transaction id: 68146
# Reviewed, transaction id: 68150
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 代码审查总结
本次审查发现 7 个需要关注的问题,主要集中在并发控制、资源管理和错误处理方面。
🚨 关键问题
- 并发竞态条件:多处缺少 Redis 操作的原子性保护
- 资源泄漏风险:队列满时未做正确处理
- 错误处理不完善:多个地方异常处理后继续执行可能导致状态不一致
⚡ 性能考量
get_running_task_count在高并发下可能成为性能瓶颈- 缓存机制需要考虑多实例一致性
建议优先级
- 修复竞态条件(使用 Lua 脚本或 Redis 事务)
- 完善错误处理逻辑
- 优化性能瓶颈
详细问题见行内评论 👇
| return task_instance | ||
|
|
||
|
|
||
| def get_running_task_count(space_id, template_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡ 性能瓶颈:每次检查都查询所有运行中任务并逐个调用 get_task_states(),高并发下开销大。建议维护 Redis 计数器增量更新。
| name = "concurrency_control" | ||
| desc = _("流程并发控制") | ||
| default_value = 0 | ||
| LEAST_NUMBER = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✨ 配置合理性建议:LEAST_NUMBER = 1 意味着最小并发为 1,但 default_value = 0 表示默认关闭并发控制。建议在文档中明确说明配置语义。
|
|
||
| operation_result = operation_method(operator=operation, **task_data.get("node_data", {})) | ||
| if operation_result.result: | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_waiting=False 但执行失败,可能导致状态不一致。建议失败时恢复 is_waiting 或添加失败状态。
| if not concurrency_control: | ||
| return False | ||
|
|
||
| redis_key = f"task_wait_{template_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 并发控制判断非原子:llen 和 get_running_task_count 之间存在时间窗口,可能导致判断不准确。建议整合为原子操作。
# Reviewed, transaction id: 68153
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 代码审查总结
本次审查发现先前报告的 竞态条件问题已修复,代码质量有显著改进。仍有 3 个需要关注的问题。
✅ 已解决问题
- 竞态条件修复:
push_task_to_queue现使用 Lua 脚本确保原子性 ✨ - 队列满时异常处理:现已抛出异常而非静默返回 False
- 缓存一致性改进:已改用 Redis 作为共享缓存
🚨 仍需关注
- 重试失败后状态不一致:
process_task_from_queue中失败任务未正确回滚状态 - 并发判断仍有时间窗口:
task_concurrency_limit_reached中的检查非原子 - 性能瓶颈:
get_running_task_count在高并发下开销较大
详细问题见行内评论 👇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
代码审查完成
|
|
||
| operation_result = operation_method(operator=operation, **task_data.get("node_data", {})) | ||
| if operation_result.result: | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_waiting=False 未回滚,导致任务既不在队列也不在运行中。建议在最终失败时恢复 is_waiting=True 或添加专门的失败状态。
| if not concurrency_control: | ||
| return False | ||
|
|
||
| redis_key = f"task_wait_{template_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 并发判断存在竞态条件:llen 检查和 get_running_task_count 之间有时间窗口,可能在检查后立即有任务完成导致判断失效。建议使用 Redis 计数器配合 Lua 脚本实现原子判断。
| return task_instance | ||
|
|
||
|
|
||
| def get_running_task_count(space_id, template_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡ 性能问题:每次调用都遍历所有未完成任务并调用 get_task_states(),大量任务时延迟显著。建议在任务状态变更时维护 Redis 计数器,避免实时查询。
# Reviewed, transaction id: 68155
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔍 代码审查总结
此 PR 为流程创建任务添加并发控制功能,整体实现思路清晰。已识别 5 个高严重度问题需要修复:
🚨 关键问题
- 竞态条件:
push_task_to_queue在 Lua 脚本检查后仍可能在保存前失败,导致任务状态不一致 - 数据库异常处理:
process_task_from_queue对不存在的 task_id 缺少异常处理,可能导致任务崩溃 - 性能瓶颈:
get_running_task_count为每个实例创建 TaskOperation 对象并调用 API,在大量任务时性能差 - 逻辑错误:
task_concurrency_limit_reached的is_exemption参数在子流程回调场景下语义不清晰 - 子流程回调错误:callback.py 中错误地将 parent_task 推入队列,而不是继续回调子流程节点
✨ 建议
- 在 Redis Lua 脚本中原子化队列操作和状态标记
- 添加数据库异常处理和降级逻辑
- 优化任务计数查询,使用单次数据库查询 + 批量状态检查
- 重新设计回调逻辑,确保子流程正确触发父流程节点回调
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📝 行内代码审查
已识别 5 个关键问题,详见下方行内评论
| logger.info(f"Task {task.id} added to queue for template {template_id}, new queue size: {result}") | ||
|
|
||
| task.extra_info.update({"is_waiting": True}) | ||
| task.save() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 竞态条件风险:Lua 脚本成功后到 task.save() 之间若失败,任务已在队列但标记未更新。建议将状态标记移入 Lua 脚本或添加补偿机制
|
|
||
| task_data = json.loads(task_json) | ||
| operation = task_data.get("operation") | ||
| task_instance = TaskInstance.objects.get(id=task_data.get("task_id")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 缺少异常处理:若 task_id 不存在会抛出 DoesNotExist 异常导致 celery 任务失败。建议添加 try-except TaskInstance.DoesNotExist 并记录日志
| space_id=space_id, template_id=template_id, is_deleted=False, is_started=True, is_finished=False | ||
| ) | ||
|
|
||
| task_operations = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡ 性能瓶颈:每个实例创建 TaskOperation 并调用 API 获取状态,在任务数百个时会极慢。建议批量查询状态或使用数据库字段缓存运行状态
| return True | ||
|
|
||
| running_count = get_running_task_count(space_id, template_id) | ||
| if is_exemption: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_exemption=True 时用 > 而非 >=,语义为 豁免 但实际允许超出 1 个。建议重命名参数或明确注释此特殊逻辑的业务含义
| and self.extra_info["task_success"] is True | ||
| ): | ||
| try: | ||
| push_task_to_queue(parent_task, "callback") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 回调逻辑错误:此处应该让子流程节点继续回调(bamboo_engine_api.callback),而不是将 parent_task 推入队列。推入队列会丢失子流程的执行结果,导致父流程节点无法正常推进
No description provided.