Skip to content

Commit 2b5528c

Browse files
committed
fix: keep agent typing status while queued
1 parent cb15b71 commit 2b5528c

4 files changed

Lines changed: 231 additions & 18 deletions

File tree

app/agent/__init__.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,8 @@ def __init__(self):
952952
self._session_queues: Dict[str, asyncio.Queue] = {}
953953
# 每个会话的worker任务
954954
self._session_workers: Dict[str, asyncio.Task] = {}
955+
# typing 这类状态按会话/聊天共享,前一条任务结束时可能仍需延续到后续排队消息。
956+
self._deferred_processing_statuses: Dict[str, dict] = {}
955957

956958
def get_session_status(self, session_id: str) -> dict[str, Any]:
957959
"""获取会话当前模型与 token 使用状态。"""
@@ -1007,6 +1009,7 @@ async def close(self):
10071009
pass
10081010
self._session_workers.clear()
10091011
self._session_queues.clear()
1012+
self._deferred_processing_statuses.clear()
10101013
for agent in self.active_agents.values():
10111014
await agent.cleanup()
10121015
self.active_agents.clear()
@@ -1100,8 +1103,10 @@ async def _session_worker(self, session_id: str):
11001103
except Exception as e:
11011104
logger.error(f"处理会话 {session_id} 的消息失败: {e}")
11021105
finally:
1103-
await _async_finish_processing_status(
1104-
task.processing_status, task.user_id
1106+
await self._finish_task_processing_status(
1107+
session_id=session_id,
1108+
task=task,
1109+
queue=queue,
11051110
)
11061111
queue.task_done()
11071112

@@ -1116,6 +1121,52 @@ async def _session_worker(self, session_id: str):
11161121
and self._session_queues[session_id].empty()
11171122
):
11181123
self._session_queues.pop(session_id, None)
1124+
self._deferred_processing_statuses.pop(session_id, None)
1125+
1126+
@staticmethod
1127+
def _is_shared_processing_status(status: Optional[dict]) -> bool:
1128+
"""
1129+
判断状态是否属于同一聊天窗口共享的处理提示。
1130+
reaction 绑定到具体消息,应按消息收口;typing 绑定到会话/聊天,需要等队列空闲再关闭。
1131+
"""
1132+
metadata = (status or {}).get("metadata") or {}
1133+
return isinstance(metadata, dict) and metadata.get("kind") == "typing"
1134+
1135+
async def _finish_task_processing_status(
1136+
self,
1137+
session_id: str,
1138+
task: _MessageTask,
1139+
queue: asyncio.Queue,
1140+
) -> None:
1141+
"""
1142+
根据会话队列状态结束或延后处理提示。
1143+
当后面还有排队消息时,typing 状态继续保留;队列真正空闲后再统一关闭。
1144+
"""
1145+
status = task.processing_status
1146+
if self._is_shared_processing_status(status) and not queue.empty():
1147+
self._deferred_processing_statuses[session_id] = status
1148+
return
1149+
1150+
if status:
1151+
await _async_finish_processing_status(status, task.user_id)
1152+
if self._is_shared_processing_status(status):
1153+
self._deferred_processing_statuses.pop(session_id, None)
1154+
elif queue.empty():
1155+
deferred_status = self._deferred_processing_statuses.pop(
1156+
session_id, None
1157+
)
1158+
if deferred_status:
1159+
await _async_finish_processing_status(
1160+
deferred_status, task.user_id
1161+
)
1162+
return
1163+
1164+
if not queue.empty():
1165+
return
1166+
1167+
deferred_status = self._deferred_processing_statuses.pop(session_id, None)
1168+
if deferred_status:
1169+
await _async_finish_processing_status(deferred_status, task.user_id)
11191170

11201171
async def _process_message_internal(self, task: _MessageTask):
11211172
"""
@@ -1181,6 +1232,7 @@ async def stop_current_task(self, session_id: str):
11811232
break
11821233
self._session_queues.pop(session_id, None)
11831234
stopped = True
1235+
self._deferred_processing_statuses.pop(session_id, None)
11841236

11851237
if stopped:
11861238
logger.info(f"会话 {session_id} 的Agent推理已应急停止")
@@ -1204,6 +1256,7 @@ async def clear_session(self, session_id: str, user_id: str):
12041256

12051257
# 清理队列
12061258
self._session_queues.pop(session_id, None)
1259+
self._deferred_processing_statuses.pop(session_id, None)
12071260

12081261
# 清理agent
12091262
if session_id in self.active_agents:

app/modules/telegram/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
CommandRegisterEventData,
1616
NotificationConf,
1717
MessageResponse,
18+
NotificationType,
1819
)
1920
from app.schemas.types import ModuleType, ChainEventType
2021
from app.utils.structures import DictUtils
@@ -451,6 +452,7 @@ def post_message(self, message: Notification, **kwargs) -> None:
451452
return
452453
client: Telegram = self.get_instance(conf.name)
453454
if client:
455+
stop_typing = message.mtype != NotificationType.Agent
454456
if message.file_path:
455457
client.send_file(
456458
file_path=message.file_path,
@@ -459,13 +461,15 @@ def post_message(self, message: Notification, **kwargs) -> None:
459461
text=message.text,
460462
userid=userid,
461463
original_chat_id=message.original_chat_id,
464+
stop_typing=stop_typing,
462465
)
463466
elif message.voice_path:
464467
client.send_voice(
465468
voice_path=message.voice_path,
466469
userid=userid,
467470
caption=message.voice_caption,
468471
original_chat_id=message.original_chat_id,
472+
stop_typing=stop_typing,
469473
)
470474
else:
471475
client.send_msg(
@@ -478,6 +482,7 @@ def post_message(self, message: Notification, **kwargs) -> None:
478482
original_message_id=message.original_message_id,
479483
original_chat_id=message.original_chat_id,
480484
disable_web_page_preview=message.disable_web_page_preview,
485+
stop_typing=stop_typing,
481486
)
482487

483488
def post_medias_message(
@@ -502,6 +507,7 @@ def post_medias_message(
502507
buttons=message.buttons,
503508
original_message_id=message.original_message_id,
504509
original_chat_id=message.original_chat_id,
510+
stop_typing=message.mtype != NotificationType.Agent,
505511
)
506512

507513
def post_torrents_message(
@@ -526,6 +532,7 @@ def post_torrents_message(
526532
buttons=message.buttons,
527533
original_message_id=message.original_message_id,
528534
original_chat_id=message.original_chat_id,
535+
stop_typing=message.mtype != NotificationType.Agent,
529536
)
530537

531538
def delete_message(
@@ -585,12 +592,14 @@ def edit_message(
585592
continue
586593
client: Telegram = self.get_instance(conf.name)
587594
if client:
595+
stop_typing = not (metadata or {}).get("agent_managed_typing")
588596
result = client.edit_msg(
589597
chat_id=chat_id,
590598
message_id=message_id,
591599
text=text,
592600
title=title,
593601
buttons=buttons,
602+
stop_typing=stop_typing,
594603
)
595604
if result:
596605
return True
@@ -665,12 +674,14 @@ def send_direct_message(self, message: Notification) -> Optional[MessageResponse
665674
return None
666675
client: Telegram = self.get_instance(conf.name)
667676
if client:
677+
agent_managed_typing = message.mtype == NotificationType.Agent
668678
if message.voice_path:
669679
result = client.send_voice(
670680
voice_path=message.voice_path,
671681
userid=userid,
672682
caption=message.voice_caption,
673683
original_chat_id=message.original_chat_id,
684+
stop_typing=not agent_managed_typing,
674685
)
675686
else:
676687
result = client.send_msg(
@@ -680,13 +691,17 @@ def send_direct_message(self, message: Notification) -> Optional[MessageResponse
680691
userid=userid,
681692
link=message.link,
682693
disable_web_page_preview=message.disable_web_page_preview,
694+
stop_typing=not agent_managed_typing,
683695
)
684696
if result and result.get("success"):
685697
return MessageResponse(
686698
message_id=result.get("message_id"),
687699
chat_id=result.get("chat_id"),
688700
channel=MessageChannel.Telegram,
689701
source=conf.name,
702+
metadata={"agent_managed_typing": True}
703+
if agent_managed_typing
704+
else None,
690705
success=True,
691706
)
692707
return None

0 commit comments

Comments
 (0)