44
55import json
66import logging
7- from collections .abc import AsyncGenerator , Callable , Awaitable
7+ from collections .abc import AsyncGenerator
88from typing import Any
99
1010from mtv_agent .server .llm .client import LLMClient
1111from mtv_agent .server .mcp .manager import MCPManager
12+ from mtv_agent .server .tools import ApproveFunc , execute_tool_call , trim_history
1213
1314logger = logging .getLogger (__name__ )
1415
15- ApproveFunc = Callable [[str , dict ], Awaitable [tuple [bool , str | None ]]]
16-
1716
1817async def run_stream (
1918 message : str ,
@@ -24,34 +23,62 @@ async def run_stream(
2423 history : list [dict ] | None = None ,
2524 namespace : str | None = None ,
2625 command : str | None = None ,
26+ session_id : str | None = None ,
2727 max_iterations : int = 20 ,
28+ max_history_chars : int = 80_000 ,
2829) -> AsyncGenerator [dict [str , Any ], None ]:
2930 """Run the agent loop, yielding SSE-ready event dicts.
3031
3132 Iterates until the LLM produces a text response or hits *max_iterations*.
33+
34+ **Initial message setup** (built by ``_build_messages``):
35+
36+ 1. System prompt -- always first, sets the agent persona and instructions.
37+ 2. History -- previous user/assistant turns from the chat session, trimmed
38+ from the oldest end to stay within *max_history_chars* so we don't
39+ exceed the LLM context window.
40+ 3. User message -- the current request. When the user invokes a
41+ slash-command (e.g. ``/check-cluster-health``), its body replaces the
42+ plain user message, with the original input appended for context.
43+
44+ **Iteration loop** (each pass through the ``for`` loop):
45+
46+ - Send the messages + tool definitions to the LLM.
47+ - If the LLM responds with plain text (no tool calls), yield it and stop.
48+ - If the LLM requests tool calls, append its response (via ``model_dump()``)
49+ to the messages list, then execute each tool and append the result.
50+ The OpenAI API requires this pairing: an assistant message with
51+ ``tool_calls`` followed by a ``tool`` message for each call.
52+ - The loop then repeats, giving the LLM the tool results so it can
53+ decide to call more tools or produce a final text answer.
54+
55+ Example messages list after one tool-call iteration::
56+
57+ [
58+ {"role": "system", "content": "<system prompt>"},
59+ {"role": "user", "content": "<history msg 1>"},
60+ {"role": "assistant", "content": "<history msg 2>"},
61+ {"role": "user", "content": "<user message or command + user message>"},
62+ {"role": "assistant", "tool_calls": [{"id": "...", ...}]},
63+ {"role": "tool", "tool_call_id": "...", "content": "<result>"},
64+ ]
3265 """
3366 tools = mcp .get_tool_definitions ()
3467
35- tools_with_flags = {
36- td ["function" ]["name" ]
37- for td in tools
38- if "flags" in td .get ("function" , {}).get ("parameters" , {}).get ("properties" , {})
39- }
68+ messages = _build_messages (
69+ system_prompt , command , history , message , max_history_chars
70+ )
4071
41- trimmed_history = _trim_history (history or [])
42- messages : list [dict ] = [
43- {"role" : "system" , "content" : system_prompt },
44- ]
45- if command :
46- messages .append (
47- {"role" : "system" , "content" : f"Follow this command:\n \n { command } " }
48- )
49- messages .extend ([* trimmed_history , {"role" : "user" , "content" : message }])
72+ if llm .dumper and session_id :
73+ llm .dumper .set_session (session_id )
5074
5175 for iteration in range (max_iterations ):
5276 logger .debug ("Agent iteration %d" , iteration + 1 )
5377 yield {"event" : "thinking" }
5478
79+ if llm .dumper :
80+ llm .dumper .next_iteration ()
81+
5582 try :
5683 response = await llm .chat (messages , tools or None )
5784 except Exception as exc :
@@ -62,108 +89,59 @@ async def run_stream(
6289 choice = response .choices [0 ]
6390
6491 if not choice .message .tool_calls :
65- yield {
66- "event" : "content" ,
67- "content" : choice .message .content or "" ,
68- }
92+ yield {"event" : "content" , "content" : choice .message .content or "" }
6993 return
7094
7195 messages .append (choice .message .model_dump ())
7296
7397 for tc in choice .message .tool_calls :
7498 name = tc .function .name
75- try :
76- args = json .loads (tc .function .arguments )
77- except json .JSONDecodeError :
78- args = {}
79-
80- if namespace and name in tools_with_flags :
81- flags = args .setdefault ("flags" , {})
82- if "namespace" not in flags :
83- flags ["namespace" ] = namespace
84-
85- policy = mcp .check_policy (name , args )
86-
87- if policy == "reject" :
88- result = "Tool call rejected by policy."
89- yield {
90- "event" : "tool_call" ,
91- "name" : name ,
92- "arguments" : args ,
93- "pending" : False ,
94- }
95- yield {
96- "event" : "tool_rejected" ,
97- "name" : name ,
98- "reason" : "blocked by policy" ,
99- }
100- elif policy == "ask" and approve_fn :
101- yield {
102- "event" : "tool_call" ,
103- "name" : name ,
104- "arguments" : args ,
105- "pending" : True ,
106- }
107- approved , reason = await approve_fn (name , args )
108- if not approved :
109- result = f"Tool call denied by user. { reason or '' } "
110- yield {
111- "event" : "tool_rejected" ,
112- "name" : name ,
113- "reason" : reason or "denied" ,
114- }
99+ args = _parse_args (tc )
100+ if namespace :
101+ args .setdefault ("flags" , {}).setdefault ("namespace" , namespace )
102+
103+ result = ""
104+ async for event in execute_tool_call (name , args , mcp , approve_fn ):
105+ if "_result" in event :
106+ result = event ["_result" ]
115107 else :
116- try :
117- result = await mcp .call_tool (name , args )
118- except Exception as exc :
119- logger .exception ("Tool call %s failed" , name )
120- result = f"Error executing tool: { exc } "
121- yield {
122- "event" : "tool_result" ,
123- "name" : name ,
124- "result" : _truncate (result ),
125- }
126- else :
127- yield {
128- "event" : "tool_call" ,
129- "name" : name ,
130- "arguments" : args ,
131- "pending" : False ,
132- }
133- try :
134- result = await mcp .call_tool (name , args )
135- except Exception as exc :
136- logger .exception ("Tool call %s failed" , name )
137- result = f"Error executing tool: { exc } "
138- yield {
139- "event" : "tool_result" ,
140- "name" : name ,
141- "result" : _truncate (result ),
142- }
108+ yield event
143109
144110 messages .append ({"role" : "tool" , "tool_call_id" : tc .id , "content" : result })
145111
146112 yield {"event" : "error" , "message" : "Max iterations reached" }
147113
148114
149- MAX_HISTORY_CHARS = 80_000
115+ # ---------------------------------------------------------------------------
116+ # Helpers
117+ # ---------------------------------------------------------------------------
150118
151119
152- def _trim_history (history : list [dict ]) -> list [dict ]:
153- """Keep only recent history that fits within a character budget."""
154- total = 0
155- result : list [dict ] = []
156- for msg in reversed (history ):
157- size = len (msg .get ("content" , "" ))
158- if total + size > MAX_HISTORY_CHARS :
159- break
160- result .append (msg )
161- total += size
162- result .reverse ()
163- return result
120+ def _build_messages (
121+ system_prompt : str ,
122+ command : str | None ,
123+ history : list [dict ] | None ,
124+ user_message : str ,
125+ max_history_chars : int = 80_000 ,
126+ ) -> list [dict ]:
127+ """Assemble the initial message list for the LLM."""
128+ msgs : list [dict ] = [{"role" : "system" , "content" : system_prompt }]
129+ msgs .extend (trim_history (history or [], max_history_chars ))
130+ if command :
131+ msgs .append (
132+ {
133+ "role" : "user" ,
134+ "content" : f"Follow this command:\n \n { command } \n \n User message: { user_message } " ,
135+ }
136+ )
137+ else :
138+ msgs .append ({"role" : "user" , "content" : user_message })
139+ return msgs
164140
165141
166- def _truncate (text : str , limit : int = 80_000 ) -> str :
167- if len (text ) <= limit :
168- return text
169- return text [:limit ] + "\n ... (truncated)"
142+ def _parse_args (tc : object ) -> dict :
143+ """JSON-parse tool-call arguments with a safe fallback."""
144+ try :
145+ return json .loads (tc .function .arguments )
146+ except json .JSONDecodeError :
147+ return {}
0 commit comments