1919from deepagents .backends import LocalShellBackend
2020from agent .deepagent .tools .tool_call_manager import get_tool_call_manager
2121from langgraph .checkpoint .memory import InMemorySaver
22+ from langgraph .errors import GraphInterrupt
23+ from langgraph .types import Command
2224from common .llm_util import get_llm
2325from common .minio_util import MinioUtils
2426from constants .code_enum import DataTypeEnum , IntentEnum
@@ -194,19 +196,29 @@ async def _create_agent(
194196 system_prompt : Optional [str ] = None ,
195197 mcp_tools : Optional [list ] = None ,
196198 session_workdir : Optional [Path ] = None ,
199+ selected_skills : Optional [list ] = None ,
197200 ):
198201 """创建 deep agent 实例
199202
200203 Args:
201204 mcp_tools: 预获取的 MCP 工具列表,为 None 时内部获取
202205 session_workdir: session 级工作目录,优先于全局 agent_workspace_dir
206+ selected_skills: 用户选中的技能名称列表,为 None 时使用所有已启用技能
203207 """
204208 if mcp_tools is None :
205209 mcp_tools = await self ._get_mcp_tools ()
206210
211+ from agent .common .tools .ask_user_tool import ask_user
207212 from services .skill_service import SkillService
208213
209- skill_paths = str (current_dir / "skills" )
214+ # 根据用户选择加载技能
215+ if selected_skills :
216+ skill_paths_list = SkillService .get_enabled_skill_paths (selected_skills , scope = "common" )
217+ else :
218+ skill_paths_list = [str (current_dir / "skills" )]
219+
220+ # 注入 ask_user 工具,让 Agent 可以向用户提问
221+ all_tools = (mcp_tools or []) + [ask_user ]
210222
211223 model = get_llm (timeout = self .DEFAULT_LLM_TIMEOUT )
212224 workdir = session_workdir or agent_workspace_dir
@@ -218,10 +230,10 @@ async def _create_agent(
218230
219231 return create_deep_agent (
220232 model = model ,
221- tools = mcp_tools ,
233+ tools = all_tools ,
222234 system_prompt = system_prompt ,
223235 memory = memory ,
224- skills = [ skill_paths ] ,
236+ skills = skill_paths_list ,
225237 backend = LocalShellBackend (
226238 root_dir = str (workdir ),
227239 inherit_env = True ,
@@ -369,6 +381,7 @@ async def enter_execution():
369381 return
370382 tracker .current_phase = Phase .EXECUTION
371383 tracker .has_tool_called = True
384+ tracker .has_sent_content = False
372385 await self ._send_phase_progress (
373386 response , Phase .EXECUTION , "start" , progress_id
374387 )
@@ -449,11 +462,16 @@ async def close_execution_enter_reporting():
449462
450463 # 工具切换时输出标签
451464 if tool_name != tracker .current_tool_name :
465+ # 同一个工具连续调用时,输出之间加换行分隔
466+ if tracker .current_tool_name and tracker .has_sent_content :
467+ await write_and_collect ("\n \n " )
452468 tracker .current_tool_name = tool_name
469+ tracker .has_sent_content = False
453470 await write_and_collect (_format_tool_label (tool_name ))
454471
455472 if text :
456473 await write_and_collect (text )
474+ tracker .has_sent_content = True
457475 continue
458476
459477 # ===== 非工具节点的 AI 文本 =====
@@ -473,6 +491,37 @@ async def close_execution_enter_reporting():
473491 await write_and_collect (SECTION_CLOSE )
474492 tracker .execution_opened = False
475493
494+ except GraphInterrupt as e :
495+ # Agent 调用了 ask_user 工具,暂停执行等待用户输入
496+ if tracker .execution_opened :
497+ await write_and_collect (SECTION_CLOSE )
498+ tracker .execution_opened = False
499+
500+ # 从中断值提取问题
501+ question = "请提供更多信息"
502+ if e .interrupts :
503+ interrupt_value = e .interrupts [0 ].value
504+ if isinstance (interrupt_value , dict ):
505+ question = interrupt_value .get ("question" , question )
506+ elif isinstance (interrupt_value , str ):
507+ question = interrupt_value
508+
509+ thread_id = config .get ("configurable" , {}).get ("thread_id" , "" )
510+ # 发送 t15 用户输入请求
511+ interrupt_data = {
512+ "data" : {
513+ "type" : "user_input_required" ,
514+ "question" : question ,
515+ "thread_id" : thread_id ,
516+ },
517+ "dataType" : "t15" ,
518+ }
519+ await response .write (
520+ "data:" + json .dumps (interrupt_data , ensure_ascii = False ) + "\n \n "
521+ )
522+ # 不发送 t99 流结束标记,对话处于暂停状态
523+ logger .info (f"Agent 暂停等待用户输入: thread_id={ thread_id } , question={ question } " )
524+ return
476525 except asyncio .CancelledError :
477526 await self ._safe_write (response , "\n > 这条消息已停止" , "info" )
478527 await self ._safe_write (response , "" , "end" )
@@ -494,6 +543,7 @@ async def run_agent(
494543 uuid_str : str = None ,
495544 user_token = None ,
496545 file_list : dict = None ,
546+ selected_skills : list = None ,
497547 ):
498548 """
499549 运行增强智能体
@@ -556,6 +606,7 @@ async def run_agent(
556606 system_prompt = None ,
557607 mcp_tools = mcp_tools ,
558608 session_workdir = session_workdir ,
609+ selected_skills = selected_skills ,
559610 )
560611
561612 # 带超时保护的任务
@@ -669,6 +720,178 @@ async def run_agent(
669720 if task_id in self .running_tasks :
670721 del self .running_tasks [task_id ]
671722
723+ async def resume_agent (
724+ self ,
725+ response ,
726+ thread_id : str ,
727+ user_input : str ,
728+ user_token : str = None ,
729+ ):
730+ """恢复暂停的 Agent,将用户回答注入并继续执行"""
731+ # JWT 解码获取用户信息
732+ user_dict = await decode_jwt_token (user_token )
733+ task_id = user_dict ["id" ]
734+ task_context = {"cancelled" : False }
735+ self .running_tasks [task_id ] = task_context
736+
737+ try :
738+ t02_answer_data = []
739+
740+ config = {
741+ "configurable" : {
742+ "thread_id" : thread_id ,
743+ "user_id" : str (task_id ),
744+ },
745+ "recursion_limit" : self .DEFAULT_RECURSION_LIMIT ,
746+ }
747+
748+ mcp_tools = await self ._get_mcp_tools ()
749+ agent = await self ._create_agent (
750+ system_prompt = None ,
751+ mcp_tools = mcp_tools ,
752+ )
753+
754+ # 使用 Command(resume=...) 恢复 LangGraph 执行
755+ # 传入 None 作为 input,通过 Command 提供恢复值
756+ task = asyncio .create_task (
757+ self ._stream_resume_response (
758+ agent , config , user_input , response , task_id , t02_answer_data
759+ )
760+ )
761+
762+ await asyncio .wait_for (task , timeout = self .TASK_TIMEOUT )
763+
764+ # 发送结束标记
765+ if not task_context .get ("cancelled" ):
766+ await response .write (
767+ "data:"
768+ + json .dumps (
769+ {"data" : "DONE" , "dataType" : DataTypeEnum .STREAM_END .value [0 ]},
770+ ensure_ascii = False ,
771+ )
772+ + "\n \n "
773+ )
774+
775+ except asyncio .TimeoutError :
776+ await self ._safe_write (response , "\n > 任务超时,已自动停止" , "info" )
777+ await self ._safe_write (response , "" , "end" )
778+ except Exception as e :
779+ logger .error (f"Resume agent 异常: { e } " , exc_info = True )
780+ await self ._safe_write (
781+ response , f"[ERROR] 恢复执行异常: { str (e )[:200 ]} " , "error"
782+ )
783+ finally :
784+ if task_id in self .running_tasks :
785+ del self .running_tasks [task_id ]
786+
787+ async def _stream_resume_response (
788+ self , agent , config , user_input , response , session_id , answer_collector
789+ ):
790+ """恢复执行的流式响应处理"""
791+ import uuid
792+
793+ tracker = PhaseTracker ()
794+ tracker .current_phase = Phase .EXECUTION
795+ last_keepalive = asyncio .get_event_loop ().time ()
796+ progress_id = str (uuid .uuid4 ())
797+
798+ async def write_and_collect (content : str ):
799+ await self ._safe_write (response , content )
800+ answer_collector .append (content )
801+
802+ try :
803+ # 使用 Command(resume=user_input) 恢复暂停的 graph
804+ async for mode , chunk in agent .astream (
805+ Command (resume = user_input ),
806+ config ,
807+ stream_mode = ["messages" , "updates" ],
808+ ):
809+ if self .running_tasks .get (session_id , {}).get ("cancelled" ):
810+ await self ._safe_write (response , "\n > 这条消息已停止" , "info" )
811+ return
812+
813+ current_time = asyncio .get_event_loop ().time ()
814+ if current_time - last_keepalive >= self .STREAM_KEEPALIVE_INTERVAL :
815+ try :
816+ await response .write (": keepalive\n \n " )
817+ last_keepalive = current_time
818+ except Exception :
819+ pass
820+
821+ if mode == "updates" :
822+ if isinstance (chunk , dict ):
823+ for node_name , node_output in chunk .items ():
824+ if not isinstance (node_output , dict ):
825+ continue
826+ todos = node_output .get ("todos" )
827+ if todos and isinstance (todos , list ):
828+ await write_and_collect (_format_todos (todos ))
829+ continue
830+
831+ message_chunk , metadata = chunk
832+ langgraph_node = metadata .get ("langgraph_node" , "" )
833+ text = (
834+ self ._extract_text (message_chunk .content )
835+ if hasattr (message_chunk , "content" )
836+ else ""
837+ )
838+
839+ if langgraph_node == "tools" :
840+ tool_name = getattr (message_chunk , "name" , None ) or "未知工具"
841+ if tool_name == "write_todos" :
842+ continue
843+ if tool_name != tracker .current_tool_name :
844+ # 同一个工具连续调用时,输出之间加换行分隔
845+ if tracker .current_tool_name and tracker .has_sent_content :
846+ await write_and_collect ("\n \n " )
847+ tracker .current_tool_name = tool_name
848+ tracker .has_sent_content = False
849+ await write_and_collect (_format_tool_label (tool_name ))
850+ if text :
851+ await write_and_collect (text )
852+ tracker .has_sent_content = True
853+ continue
854+
855+ if not text :
856+ continue
857+
858+ if tracker .current_phase == Phase .EXECUTION :
859+ tracker .current_tool_name = ""
860+ tracker .current_phase = Phase .REPORTING
861+
862+ await write_and_collect (text )
863+
864+ except GraphInterrupt as e :
865+ # Agent 再次调用 ask_user,继续暂停
866+ question = "请提供更多信息"
867+ if e .interrupts :
868+ interrupt_value = e .interrupts [0 ].value
869+ if isinstance (interrupt_value , dict ):
870+ question = interrupt_value .get ("question" , question )
871+ elif isinstance (interrupt_value , str ):
872+ question = interrupt_value
873+
874+ thread_id = config .get ("configurable" , {}).get ("thread_id" , "" )
875+ interrupt_data = {
876+ "data" : {
877+ "type" : "user_input_required" ,
878+ "question" : question ,
879+ "thread_id" : thread_id ,
880+ },
881+ "dataType" : "t15" ,
882+ }
883+ await response .write (
884+ "data:" + json .dumps (interrupt_data , ensure_ascii = False ) + "\n \n "
885+ )
886+ return
887+ except asyncio .CancelledError :
888+ await self ._safe_write (response , "\n > 这条消息已停止" , "info" )
889+ except Exception as e :
890+ logger .error (f"Resume 流式响应异常: { e } " , exc_info = True )
891+ await self ._safe_write (
892+ response , f"[ERROR] 响应异常: { str (e )[:100 ]} " , "error"
893+ )
894+
672895 async def cancel_task (self , task_id : str ) -> bool :
673896 """取消指定的任务"""
674897 if task_id in self .running_tasks :
0 commit comments