Skip to content

Commit 8789f35

Browse files
committed
Improve non-verbose agent tool summaries
1 parent d4dec90 commit 8789f35

5 files changed

Lines changed: 309 additions & 23 deletions

File tree

app/agent/__init__.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,14 @@ def _emit_output(self, text: str):
349349
except Exception as e:
350350
logger.debug(f"智能体输出回调失败: {e}")
351351

352+
def _handle_stream_text(self, text: str):
353+
"""
354+
统一处理一段可见流式文本,确保工具统计注入后的内容会同时进入
355+
消息缓冲区和外部流式回调。
356+
"""
357+
emitted_text = self.stream_handler.emit(text)
358+
self._emit_output(emitted_text)
359+
352360
def _initialize_tools(self) -> List:
353361
"""
354362
初始化工具列表
@@ -572,12 +580,13 @@ async def _execute_agent(self, messages: List[BaseMessage]):
572580
agent=agent,
573581
messages={"messages": messages},
574582
config=agent_config,
575-
on_token=lambda token: (
576-
self.stream_handler.emit(token),
577-
self._emit_output(token),
578-
),
583+
on_token=self._handle_stream_text,
579584
)
580585

586+
trailing_tool_summary = self.stream_handler.flush_pending_tool_summary()
587+
if trailing_tool_summary:
588+
self._emit_output(trailing_tool_summary)
589+
581590
# 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本
582591
(
583592
all_sent_via_stream,
@@ -588,8 +597,14 @@ async def _execute_agent(self, messages: List[BaseMessage]):
588597
# 流式输出未能发送全部内容(发送失败等)
589598
# 通过常规方式发送剩余内容
590599
remaining_text = await self.stream_handler.take()
591-
if remaining_text and not self._streamed_output:
592-
self._emit_output(remaining_text)
600+
if remaining_text:
601+
unsent_text = remaining_text
602+
if self._streamed_output and remaining_text.startswith(
603+
self._streamed_output
604+
):
605+
unsent_text = remaining_text[len(self._streamed_output) :]
606+
if unsent_text:
607+
self._emit_output(unsent_text)
593608
if (
594609
remaining_text
595610
and not self.suppress_user_reply

app/agent/callback/__init__.py

Lines changed: 188 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import threading
3-
from typing import Optional, Tuple
3+
from typing import Any, Optional, Tuple
44

55
from fastapi.concurrency import run_in_threadpool
66

@@ -62,16 +62,30 @@ def __init__(self):
6262
self._user_id: Optional[str] = None
6363
self._username: Optional[str] = None
6464
self._title: str = ""
65+
# 非啰嗦模式下的待输出工具统计,等下一段文本到来时再统一补一句摘要
66+
self._pending_tool_stats: dict[str, dict[str, Any]] = {}
6567

66-
def emit(self, token: str):
68+
def emit(self, token: str) -> str:
6769
"""
6870
接收 LLM 流式 token,积累到缓冲区。
71+
如果存在待输出的工具统计,则会先补上一句摘要再追加 token。
6972
"""
7073
with self._lock:
74+
emitted = token or ""
75+
76+
if self._pending_tool_stats:
77+
summary = self._consume_pending_tool_summary_locked()
78+
if summary:
79+
if emitted:
80+
emitted = f"{summary}{emitted.lstrip(chr(10))}"
81+
else:
82+
emitted = summary
83+
7184
# 如果存量消息结束是两个换行,则去掉新消息前面的换行,避免过多空行
72-
if self._buffer.endswith("\n\n") and token.startswith("\n"):
73-
token = token.lstrip("\n")
74-
self._buffer += token
85+
if self._buffer.endswith("\n\n") and emitted.startswith("\n"):
86+
emitted = emitted.lstrip("\n")
87+
self._buffer += emitted
88+
return emitted
7589

7690
async def take(self) -> str:
7791
"""
@@ -82,6 +96,8 @@ async def take(self) -> str:
8296
8397
注意:流式渠道不调用此方法,工具消息直接 emit 到 buffer 中。
8498
"""
99+
self.flush_pending_tool_summary()
100+
85101
with self._lock:
86102
if not self._buffer:
87103
return ""
@@ -99,6 +115,7 @@ def clear(self):
99115
self._sent_text = ""
100116
self._message_response = None
101117
self._msg_start_offset = 0
118+
self._pending_tool_stats = {}
102119

103120
def reset(self):
104121
"""
@@ -112,6 +129,7 @@ def reset(self):
112129
self._buffer = ""
113130
self._sent_text = ""
114131
self._msg_start_offset = 0
132+
self._pending_tool_stats = {}
115133

116134
async def start_streaming(
117135
self,
@@ -141,6 +159,7 @@ async def start_streaming(
141159
self._sent_text = ""
142160
self._message_response = None
143161
self._msg_start_offset = 0
162+
self._pending_tool_stats = {}
144163

145164
# 检查渠道是否支持消息编辑,不支持则仅收集 token 到 buffer,不实时推送
146165
if not self._can_stream():
@@ -176,6 +195,9 @@ async def stop_streaming(self) -> Tuple[bool, str]:
176195
# 取消定时任务
177196
await self._cancel_flush_task()
178197

198+
# 将未落地的工具统计补入缓冲区,避免流式结束时丢失这段执行信息
199+
self.flush_pending_tool_summary()
200+
179201
# 执行最后一次刷新
180202
await self._flush()
181203

@@ -194,11 +216,172 @@ async def stop_streaming(self) -> Tuple[bool, str]:
194216
self._sent_text = ""
195217
self._message_response = None
196218
self._msg_start_offset = 0
219+
self._pending_tool_stats = {}
197220
if all_sent:
198221
# 所有内容已通过流式发送,清空缓冲区
199222
self._buffer = ""
200223
return all_sent, final_text
201224

225+
def record_tool_call(
226+
self,
227+
tool_name: str,
228+
tool_message: Optional[str] = None,
229+
tool_kwargs: Optional[dict[str, Any]] = None,
230+
):
231+
"""
232+
记录一次工具调用,供非啰嗦模式下延迟汇总输出。
233+
"""
234+
category, target = self._classify_tool_call(
235+
tool_name=tool_name,
236+
tool_message=tool_message,
237+
tool_kwargs=tool_kwargs or {},
238+
)
239+
with self._lock:
240+
bucket = self._pending_tool_stats.setdefault(
241+
category,
242+
{
243+
"count": 0,
244+
"targets": set(),
245+
},
246+
)
247+
bucket["count"] += 1
248+
if target:
249+
bucket["targets"].add(str(target))
250+
251+
def flush_pending_tool_summary(self) -> str:
252+
"""
253+
将待输出的工具统计摘要补入缓冲区,并返回本次新增的摘要文本。
254+
"""
255+
with self._lock:
256+
summary = self._consume_pending_tool_summary_locked()
257+
if summary:
258+
self._buffer += summary
259+
return summary
260+
261+
@staticmethod
262+
def _classify_tool_call(
263+
tool_name: str,
264+
tool_message: Optional[str],
265+
tool_kwargs: dict[str, Any],
266+
) -> tuple[str, Optional[str]]:
267+
tool_name = (tool_name or "").strip().lower()
268+
tool_message = (tool_message or "").strip()
269+
tool_message_lower = tool_message.lower()
270+
271+
if tool_name == "read_file":
272+
return "file_read", tool_kwargs.get("file_path")
273+
if tool_name in {"write_file", "edit_file"}:
274+
return "file_write", tool_kwargs.get("file_path")
275+
if tool_name in {"list_directory", "query_directory_settings"}:
276+
return "directory", tool_kwargs.get("path")
277+
if tool_name == "browse_webpage":
278+
return (
279+
"web_browse",
280+
tool_kwargs.get("url")
281+
or tool_kwargs.get("target_url")
282+
or tool_kwargs.get("path"),
283+
)
284+
if tool_name == "execute_command":
285+
return "command", tool_kwargs.get("command")
286+
if tool_name == "ask_user_choice":
287+
return "interaction", tool_kwargs.get("message")
288+
if tool_name.startswith("search_") or tool_name in {"get_search_results"}:
289+
return (
290+
"search",
291+
tool_kwargs.get("query")
292+
or tool_kwargs.get("title")
293+
or tool_kwargs.get("keyword"),
294+
)
295+
if tool_name.startswith("query_") or tool_name.startswith("list_") or tool_name.startswith("get_"):
296+
return "data_query", None
297+
if tool_name.startswith(("add_", "update_", "delete_", "modify_", "run_")):
298+
return "action", None
299+
if tool_name in {
300+
"recognize_media",
301+
"scrape_metadata",
302+
"transfer_file",
303+
"test_site",
304+
"send_message",
305+
"send_local_file",
306+
"send_voice_message",
307+
}:
308+
return "action", None
309+
310+
if "读取文件" in tool_message or "read file" in tool_message_lower:
311+
return "file_read", tool_kwargs.get("file_path")
312+
if (
313+
"写入文件" in tool_message
314+
or "编辑文件" in tool_message
315+
or "write file" in tool_message_lower
316+
or "edit file" in tool_message_lower
317+
):
318+
return "file_write", tool_kwargs.get("file_path")
319+
if "目录" in tool_message or "directory" in tool_message_lower:
320+
return "directory", tool_kwargs.get("path")
321+
if "搜索" in tool_message or "search" in tool_message_lower:
322+
return (
323+
"search",
324+
tool_kwargs.get("query")
325+
or tool_kwargs.get("title")
326+
or tool_kwargs.get("keyword"),
327+
)
328+
if "网页" in tool_message or "browser" in tool_message_lower or "webpage" in tool_message_lower:
329+
return "web_browse", tool_kwargs.get("url")
330+
if "命令" in tool_message or "command" in tool_message_lower:
331+
return "command", tool_kwargs.get("command")
332+
333+
return "tool", None
334+
335+
def _consume_pending_tool_summary_locked(self) -> str:
336+
if not self._pending_tool_stats:
337+
return ""
338+
339+
parts = []
340+
for category, bucket in self._pending_tool_stats.items():
341+
value = bucket["count"]
342+
if category in {"file_read", "file_write", "directory", "web_browse"} and bucket["targets"]:
343+
value = len(bucket["targets"])
344+
part = self._format_tool_stat(category, value)
345+
if part:
346+
parts.append(part)
347+
348+
self._pending_tool_stats = {}
349+
if not parts:
350+
return ""
351+
352+
summary = f"({','.join(parts)})"
353+
visible_buffer = self._buffer.rstrip(" \t")
354+
last_char = visible_buffer[-1:] if visible_buffer.strip() else ""
355+
prefix = ""
356+
if self._buffer and last_char != "\n":
357+
prefix = "\n"
358+
return f"{prefix}{summary}\n"
359+
360+
@staticmethod
361+
def _format_tool_stat(category: str, count: int) -> str:
362+
if count <= 0:
363+
return ""
364+
365+
if category == "search":
366+
return f"执行了 {count} 次搜索"
367+
if category == "file_read":
368+
return f"读取了 {count} 个文件"
369+
if category == "file_write":
370+
return f"修改了 {count} 个文件"
371+
if category == "directory":
372+
return f"查看了 {count} 个目录"
373+
if category == "web_browse":
374+
return f"浏览了 {count} 个网页"
375+
if category == "command":
376+
return f"执行了 {count} 条命令"
377+
if category == "data_query":
378+
return f"查询了 {count} 次数据"
379+
if category == "action":
380+
return f"执行了 {count} 次操作"
381+
if category == "interaction":
382+
return f"发起了 {count} 次交互"
383+
return f"调用了 {count} 次工具"
384+
202385
def _can_stream(self) -> bool:
203386
"""
204387
检查当前渠道是否支持流式输出(消息编辑)

app/agent/tools/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,12 @@ async def _arun(self, *args: Any, **kwargs: Any) -> str:
124124
merged_message = "\n\n".join(messages)
125125
await self.send_tool_message(merged_message)
126126
else:
127-
# 非VERBOSE:工具边界至少补一个换行,避免工具前后的文本直接连在一起
128-
if self._stream_handler.last_buffer_char not in ("", "\n"):
129-
self._stream_handler.emit("\n")
127+
# 非VERBOSE:不逐条回显工具调用,转为在下一段文本前补一句聚合摘要
128+
self._stream_handler.record_tool_call(
129+
tool_name=self.name,
130+
tool_message=tool_message,
131+
tool_kwargs=kwargs,
132+
)
130133
else:
131134
# 未启用流式传输,不发送任何工具消息内容
132135
pass

app/api/endpoints/openai.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,15 @@ def bind_queue(self, queue: asyncio.Queue):
6969
self._event_queue = queue
7070

7171
def emit(self, token: str):
72-
super().emit(token)
73-
if token and self._event_queue is not None:
74-
self._event_queue.put_nowait(token)
72+
emitted = super().emit(token)
73+
if emitted and self._event_queue is not None:
74+
self._event_queue.put_nowait(emitted)
75+
76+
def flush_pending_tool_summary(self) -> str:
77+
emitted = super().flush_pending_tool_summary()
78+
if emitted and self._event_queue is not None:
79+
self._event_queue.put_nowait(emitted)
80+
return emitted
7581

7682
async def start_streaming(
7783
self,

0 commit comments

Comments
 (0)