diff --git a/dbm-ui/backend/ticket/handler.py b/dbm-ui/backend/ticket/handler.py index 61152eeb40..7b17ca4dc6 100644 --- a/dbm-ui/backend/ticket/handler.py +++ b/dbm-ui/backend/ticket/handler.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import asyncio import itertools import json import logging @@ -43,6 +44,7 @@ from backend.ticket.serializers import TodoSerializer from backend.ticket.todos import BaseTodoContext, TodoActorFactory from backend.ticket.todos.itsm_todo import ItsmTodoContext +from backend.utils.batch_request import async_func logger = logging.getLogger("root") @@ -287,13 +289,23 @@ def batch_process_todo(cls, user, action, operations): @param action 动作 @param operations: todo列表,每个item包含todo id和params """ - - results = [] + # 批量处理待办操作 + operation_contents = [] for operation in operations: - todo_id, params = operation["todo_id"], operation["params"] - todo = Todo.objects.get(id=todo_id) - TodoActorFactory.actor(todo).process(user, action, params) - results.append(todo) + operation_contents.append( + ( + operation["todo_id"], + operation["params"], + action, + user, + ) + ) + + # 执行异步处理 + asyncio.run(async_func(TicketHandler.process_single_todo, operation_contents)) + + # 获取处理后的待办事项 + results = [Todo.objects.get(id=operation["todo_id"]) for operation in operations] return TodoSerializer(results, many=True).data @classmethod @@ -458,3 +470,11 @@ def ticket_status_standardization(cls): context=ItsmTodoContext(itsm_flow.id, ticket.id).to_dict(), ) print(f"ticket[{ticket.id}] add a itsm todo") + + @classmethod + def process_single_todo(cls, todo_id, params, act, username): + """ + 处理单个待办的辅助函数 + """ + todo = Todo.objects.select_related("ticket").prefetch_related("ticket__flows").get(id=todo_id) + TodoActorFactory.actor(todo).process(username, act, params) diff --git a/dbm-ui/backend/ticket/models/ticket.py b/dbm-ui/backend/ticket/models/ticket.py index 0df2ee00a1..cfd1f76f44 100644 --- a/dbm-ui/backend/ticket/models/ticket.py +++ b/dbm-ui/backend/ticket/models/ticket.py @@ -146,8 +146,10 @@ def current_flow(self) -> Flow: 1. 取 TicketFlow 中最后一个 flow_obj_id 非空的流程 2. 若 TicketFlow 中都流程都为空,则代表整个单据未开始,取第一个流程 """ - if Flow.objects.filter(ticket=self).exclude(status=TicketFlowStatus.PENDING).exists(): - return Flow.objects.filter(ticket=self).exclude(status=TicketFlowStatus.PENDING).last() + non_pending_flows = [flow for flow in self.flows.all() if flow.status != TicketFlowStatus.PENDING] + if non_pending_flows: + # 返回最后一个符合条件的 Flow 对象 + return non_pending_flows[-1] # 初始化时,当前节点和下一个节点为同一个 return self.next_flow() @@ -155,13 +157,10 @@ def next_flow(self) -> Flow: """ 下一个流程,即 TicketFlow 中第一个为PENDING的流程 """ - next_flows = Flow.objects.filter(ticket=self, status=TicketFlowStatus.PENDING) + next_flows = [flow for flow in self.flows.all() if flow.status == TicketFlowStatus.PENDING] - # 支持跳过人工审批和确认环节 - if env.ITSM_FLOW_SKIP: - next_flows = next_flows.exclude(flow_type__in=[FlowType.BK_ITSM, FlowType.PAUSE]) - - return next_flows.first() + # 返回第一个符合条件的 Flow 对象 + return next_flows[0] if next_flows else None @classmethod def create_ticket( diff --git a/dbm-ui/backend/ticket/views.py b/dbm-ui/backend/ticket/views.py index da5564f73c..83fddbd8bb 100644 --- a/dbm-ui/backend/ticket/views.py +++ b/dbm-ui/backend/ticket/views.py @@ -82,7 +82,6 @@ TodoSerializer, UpdateTicketFlowConfigSerializer, ) -from backend.ticket.todos import TodoActorFactory TICKET_TAG = "ticket" @@ -240,6 +239,7 @@ def perform_create(self, serializer): builder.patch_ticket_detail() builder.init_ticket_flows() + ticket = Ticket.objects.prefetch_related("flows").get(pk=ticket.pk) TicketFlowManager(ticket=ticket).run_next_flow() @swagger_auto_schema( @@ -426,10 +426,9 @@ def process_todo(self, request, *args, **kwargs): ticket = self.get_object() validated_data = self.params_validate(self.get_serializer_class()) - - todo = ticket.todo_of_ticket.get(id=validated_data["todo_id"]) - TodoActorFactory.actor(todo).process(request.user.username, validated_data["action"], validated_data["params"]) - + TicketHandler.process_single_todo( + validated_data["todo_id"], validated_data["params"], validated_data["action"], request.user.username + ) return Response(TodoSerializer(ticket.todo_of_ticket.all(), many=True).data) @common_swagger_auto_schema( diff --git a/dbm-ui/backend/utils/batch_request.py b/dbm-ui/backend/utils/batch_request.py index fba5e955c4..2fd065034f 100644 --- a/dbm-ui/backend/utils/batch_request.py +++ b/dbm-ui/backend/utils/batch_request.py @@ -8,6 +8,8 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import asyncio +import concurrent.futures from concurrent.futures import ThreadPoolExecutor, as_completed from copy import deepcopy from multiprocessing.pool import ThreadPool @@ -226,3 +228,20 @@ def wrapper(wrapped, instance, args, kwargs): return {"data": data, "total": len(data)} return wrapper + + +async def async_func(func, params_list): + """ + 通用异步函数 + @param func: 需要并发执行的函数 + @param params_list: 参数列表,每个元素是一个元组,包含传递给 func 的参数 + """ + + loop = asyncio.get_running_loop() + + # 自定义线程池 + with concurrent.futures.ThreadPoolExecutor() as executor: + # 将所有的任务提交到线程池 + tasks = [loop.run_in_executor(executor, func, *params) for params in params_list] + # 等待所有任务完成 + await asyncio.gather(*tasks)