-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathagent_chat.py
More file actions
1815 lines (1643 loc) · 74.1 KB
/
agent_chat.py
File metadata and controls
1815 lines (1643 loc) · 74.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import json
import logging
import uuid
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Union, Optional, List, Dict, Any, Type, Tuple, Callable, Awaitable
import orjson
from fastapi import BackgroundTasks
from derisk import BaseComponent
from derisk._private.config import Config
from derisk.agent import (
AgentMemory,
ConversableAgent,
get_agent_manager,
AgentContext,
UserProxyAgent,
LLMStrategyType,
GptsMemory,
LLMConfig,
ResourceType,
ShortTermMemory,
)
from derisk.agent.core.base_team import ManagerAgent
from derisk.agent.core.memory.gpts import GptsMessage
from derisk.agent.core.plan.react.team_react_plan import AutoTeamContext
from derisk.agent.core.sandbox_manager import SandboxManager
from derisk.agent.core.schema import Status
from derisk.agent.resource import get_resource_manager, ResourceManager
from derisk.agent.resource.agent_skills import AgentSkillResource
from derisk.agent.resource.base import FILE_RESOURCES, AgentResource
from derisk.agent.util.ext_config import ExtConfigHolder
from derisk.component import ComponentType, SystemApp
from derisk.sandbox import AutoSandbox
from derisk_app.config import SandboxConfigParameters
from derisk_serve.agent.resource import DeriskSkillResource
from derisk_serve.schedule.local_scheduler import LocalScheduler
from derisk.core.interface.scheduler import Scheduler
from derisk.core import HumanMessage, StorageConversation
from derisk.model import DefaultLLMClient
from derisk.model.cluster import WorkerManagerFactory
from derisk.util.data_util import first
from derisk.util.date_utils import current_ms
from derisk.util.executor_utils import ExecutorFactory, execute_no_wait, run_async_tasks
from derisk.util.json_utils import serialize
from derisk.util.log_util import CHAT_LOGGER
from derisk.util.logger import digest
from derisk.util.tracer.tracer_impl import root_tracer, trace
from derisk.vis import VisProtocolConverter
from derisk.vis.vis_manage import get_vis_manager
from derisk_serve.agent.agents.derisks_memory import (
MetaDerisksPlansMemory,
MetaDerisksMessageMemory,
MetaAgentSystemMessageMemory,
MetaDerisksWorkLogStorage,
MetaDerisksKanbanStorage,
MetaDerisksTodoStorage,
MetaDerisksFileMetadataStorage,
)
from derisk_serve.agent.db import (
GptsConversationsEntity,
GptsConversationsDao,
GptsMessagesDao,
)
from derisk_serve.agent.db.gpts_tool import GptsToolDao
from derisk_serve.agent.team.base import TeamMode
from derisk_serve.building.app.api.schema_app import GptsApp, GptsAppDetail
from derisk_serve.building.app.api.schemas import ServerResponse
from derisk_serve.building.app.service.service import Service as AppService
from derisk_serve.building.config.api.schemas import ChatInParamValue, AppParamType
from derisk_serve.conversation.serve import Serve as ConversationServe
logger = logging.getLogger(__name__)
CFG = Config()
def get_app_service() -> AppService:
return AppService.get_instance(CFG.SYSTEM_APP)
def _format_vis_msg(msg: str):
content = json.dumps({"vis": msg}, default=serialize, ensure_ascii=False)
return f"data:{content} \n"
async def _build_conversation(
conv_id: str,
select_param: Union[str, Dict[str, Any]],
model_name: str,
summary: str,
app_code: str,
conv_serve: ConversationServe,
user_name: Optional[str] = "",
sys_code: Optional[str] = "",
) -> StorageConversation:
return await StorageConversation(
conv_uid=conv_id,
chat_mode="chat_agent",
user_name=user_name,
sys_code=sys_code,
model_name=model_name,
summary=summary,
param_type="derisks",
param_value=select_param,
app_code=app_code,
conv_storage=conv_serve.conv_storage,
message_storage=conv_serve.message_storage,
async_load=True,
).async_load()
# 使用类型别名简化复杂类型注解
AgentContextType = Union[str, AutoTeamContext]
class GlobalSandboxManagerCache:
"""全局沙箱管理器缓存,用于同一会话内共享 sandbox_manager"""
_repository: Dict[str, SandboxManager] = {}
_lock: Optional[asyncio.Lock] = None
@classmethod
def get_lock(cls) -> asyncio.Lock:
"""获取锁,延迟初始化"""
if cls._lock is None:
cls._lock = asyncio.Lock()
return cls._lock
@classmethod
def get(cls, key: str) -> Optional[SandboxManager]:
"""获取沙箱管理器"""
return cls._repository.get(key)
@classmethod
async def get_or_create(
cls, key: str, creator: Callable[[], Awaitable[SandboxManager]]
) -> SandboxManager:
"""获取或创建沙箱管理器"""
async with cls.get_lock():
if key in cls._repository:
return cls._repository[key]
sandbox_manager = await creator()
cls._repository[key] = sandbox_manager
logger.info(
f"[Sandbox]创建新sandbox,key={key}, 当前运行中沙箱数量={len(cls._repository)}"
)
return sandbox_manager
@classmethod
def remove(cls, key: str):
"""移除沙箱管理器"""
cls._repository.pop(key, None)
logger.info(
f"[Sandbox]移除sandbox,key={key}, 当前运行中沙箱数量={len(cls._repository)}"
)
@classmethod
async def cleanup_and_remove(cls, key: str):
"""清理并移除沙箱管理器,包括 kill 沙箱客户端"""
sandbox_manager = cls._repository.pop(key, None)
if sandbox_manager and sandbox_manager.client:
try:
await sandbox_manager.client.kill()
logger.info(
f"[Sandbox]清理sandbox_manager并kill,key={key}, 杀死后运行中沙箱数量={len(cls._repository)}"
)
except Exception as e:
logger.exception(
f"[Sandbox]清理sandbox_manager失败,key={key}, error={str(e)}"
)
else:
logger.info(
f"[Sandbox]清理sandbox_manager(无client),key={key}, 当前运行中沙箱数量={len(cls._repository)}"
)
class AgentChat(BaseComponent, ABC):
name = ComponentType.AGENT_CHAT
def __init__(
self,
system_app: SystemApp,
gpts_memory: Optional[GptsMemory] = None,
llm_provider: Optional[DefaultLLMClient] = None,
):
self.gpts_conversations = GptsConversationsDao()
self.gpts_messages_dao = GptsMessagesDao()
# 初始化数据库存储后端
file_metadata_db_storage = MetaDerisksFileMetadataStorage()
work_log_db_storage = MetaDerisksWorkLogStorage()
kanban_db_storage = MetaDerisksKanbanStorage()
todo_db_storage = MetaDerisksTodoStorage()
self.memory = gpts_memory or GptsMemory(
plans_memory=MetaDerisksPlansMemory(),
message_memory=MetaDerisksMessageMemory(),
message_system_memory=MetaAgentSystemMessageMemory(),
file_metadata_db_storage=file_metadata_db_storage,
work_log_db_storage=work_log_db_storage,
kanban_db_storage=kanban_db_storage,
todo_db_storage=todo_db_storage,
)
self.llm_provider = llm_provider
self.agent_memory_map = {}
super().__init__(system_app)
self.system_app = system_app
self.agent_manage = get_agent_manager(system_app)
def init_app(self, system_app: SystemApp):
self.system_app = system_app
# 注册全局模型配置缓存
self._register_model_configs()
def _register_model_configs(self):
"""注册全局模型配置到缓存"""
from derisk.agent.util.llm.model_config_cache import (
ModelConfigCache,
parse_provider_configs,
)
global_agent_conf = self.system_app.config.get("agent.llm")
if not global_agent_conf:
agent_conf = self.system_app.config.get("agent")
if isinstance(agent_conf, dict):
global_agent_conf = agent_conf.get("llm")
if global_agent_conf:
model_configs = parse_provider_configs(global_agent_conf)
if model_configs:
ModelConfigCache.register_configs(model_configs)
logger.info(f"Registered {len(model_configs)} models to global cache")
async def _get_or_create_sandbox_manager(
self, context: AgentContext, app: GptsApp, need_sandbox: bool
) -> Optional[SandboxManager]:
"""获取或创建沙箱管理器,同一会话内共享
Args:
context: Agent 上下文
app: 应用配置
need_sandbox: 是否需要沙箱
Returns:
SandboxManager 实例或 None
"""
# 检查是否需要沙箱
if not (
(need_sandbox and app.team_context.use_sandbox)
or await self._have_agent_skill(
app, context.extra.get("dynamic_resources", [])
)
):
return None
# 检查缓存中是否已有该会话的 sandbox_manager
sandbox_key = f"{context.conv_id}_{context.staff_no}"
cached_manager = GlobalSandboxManagerCache.get(sandbox_key)
if cached_manager:
return cached_manager
# 缓存中没有,需要创建新的
async def _create_sandbox_manager() -> SandboxManager:
app_config = self.system_app.config.configs.get("app_config")
sandbox_config: Optional[SandboxConfigParameters] = app_config.sandbox
sandbox_client = await AutoSandbox.create(
user_id=context.staff_no or sandbox_config.user_id,
agent=sandbox_config.agent_name,
type=sandbox_config.type,
template=sandbox_config.template_id,
work_dir=sandbox_config.work_dir,
skill_dir=sandbox_config.skill_dir,
oss_ak=sandbox_config.oss_ak,
oss_sk=sandbox_config.oss_sk,
oss_endpoint=sandbox_config.oss_endpoint,
oss_bucket_name=sandbox_config.oss_bucket_name,
)
sandbox_manager = SandboxManager(sandbox_client=sandbox_client)
# 后台启动和初始化沙箱服务
sandbox_task = asyncio.create_task(sandbox_manager.acquire())
sandbox_manager.set_init_task(sandbox_task)
return sandbox_manager
return await GlobalSandboxManagerCache.get_or_create(
sandbox_key, _create_sandbox_manager
)
async def _cleanup_sandbox_manager(
self, conv_id: str, staff_no: Optional[str] = None
):
"""清理会话的沙箱管理器
Args:
conv_id: 会话ID
staff_no: 用户ID
"""
if staff_no:
sandbox_key = f"{conv_id}_{staff_no}"
await GlobalSandboxManagerCache.cleanup_and_remove(sandbox_key)
def after_start(self):
if not self.llm_provider:
worker_manager = CFG.SYSTEM_APP.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
self.llm_provider = DefaultLLMClient(
worker_manager, auto_convert_message=True
)
async def save_conversation(
self,
conv_session_id: str,
agent_conv_id: str,
current_message: StorageConversation,
final_message: Optional[str] = None,
err_msg: Optional[str] = None,
chat_call_back: Optional[Callable[..., Optional[Any]]] = None,
first_chunk_ms: Optional[int] = None,
):
"""最终对话保存(按格式收集最终内容,回调,并销毁缓存空间)
Args:
conv_session_id:会话id
agent_conv_id:对话id
"""
logger.info(f"Agent chat end, save conversation {agent_conv_id}!")
try:
"""统一保存对话结果的逻辑"""
if not final_message:
try:
final_message = await self.memory.vis_final(agent_conv_id)
except Exception as e:
logger.exception(f"获取{agent_conv_id}最终消息异常: {str(e)}")
final_message = str(e)
if callable(chat_call_back):
final_report = None
try:
final_report = await self.memory.user_answer(agent_conv_id)
except Exception as e:
logger.exception(f"获取{conv_session_id}最终报告异常: {str(e)}")
post_action_reports: list[dict] = []
try:
messages = await self.memory.get_messages(agent_conv_id)
post_action_reports = [
post_action_report
for message in messages
if (
post_action_report := _get_post_action_report(
message.context
)
)
]
except Exception as e:
logger.exception(
f"获取{conv_session_id}post_action_reports: {str(e)}"
)
await chat_call_back(
conv_session_id,
agent_conv_id,
final_message,
final_report,
err_msg,
first_chunk_ms,
post_action_reports=post_action_reports,
)
# logger.info(f"获取{conv_session_id}最终消息: {final_message}, 异常信息:{err_msg}")
if not final_message:
final_message = ""
if err_msg:
current_message.add_view_message(final_message)
else:
current_message.add_view_message(final_message)
current_message.end_current_round()
current_message.save_to_storage()
finally:
await self.memory.clear(agent_conv_id)
@trace("agent.initialize_conversation", requires=["app_code", "conv_session_id"])
async def _initialize_conversation(
self,
conv_session_id: str,
app_code: str,
user_query: Union[str, HumanMessage],
user_code: Optional[str] = None,
) -> StorageConversation:
"""初始化会话"""
conv_serve = ConversationServe.get_instance(CFG.SYSTEM_APP)
current_message = await _build_conversation(
conv_id=conv_session_id,
select_param="",
summary="",
model_name="",
app_code=app_code,
conv_serve=conv_serve,
user_name=user_code,
)
execute_no_wait(current_message.save_to_storage)
# current_message.save_to_storage()
current_message.start_new_round()
current_message.add_user_message(
user_query if isinstance(user_query, str) else user_query.content
)
return current_message
@trace(
"agent.initialize_agent_conversation", requires=["app_code", "conv_session_id"]
)
async def _initialize_agent_conversation(self, conv_session_id: str, **ext_info):
gpts_conversations: List[
GptsConversationsEntity
] = await self.gpts_conversations.get_by_session_id_asc(conv_session_id)
logger.info(
f"gpts_conversations count:{conv_session_id}, "
f"{len(gpts_conversations) if gpts_conversations else 0}"
)
last_conversation = gpts_conversations[-1] if gpts_conversations else None
if last_conversation and Status.WAITING.value == last_conversation.state:
agent_conv_id = last_conversation.conv_id
logger.info("收到用户动作授权, 恢复会话: " + agent_conv_id)
else:
gpt_chat_order = (
"1" if not gpts_conversations else str(len(gpts_conversations) + 1)
)
agent_conv_id = conv_session_id + "_" + gpt_chat_order
return agent_conv_id, gpts_conversations
@abstractmethod
async def chat(
self,
conv_uid: str,
gpts_name: str,
user_query: Union[str, HumanMessage],
background_tasks: Optional[BackgroundTasks] = None,
specify_config_code: Optional[str] = None,
user_code: Optional[str] = None,
sys_code: Optional[str] = None,
stream: Optional[bool] = True,
chat_call_back: Optional[Any] = None,
chat_in_params: Optional[List[ChatInParamValue]] = None,
**ext_info,
):
"""会话入口接口,根据需要分开实现. 对外服务
Args:
"""
raise NotImplementedError
async def aggregation_chat(
self,
conv_id: str,
agent_conv_id: str,
gpts_name: str,
user_query: Union[str, HumanMessage],
user_code: str = None,
sys_code: str = None,
stream: Optional[bool] = True,
gpts_conversations: Optional[List[GptsConversationsEntity]] = None,
specify_config_code: Optional[str] = None,
chat_in_params: Optional[List[ChatInParamValue]] = None,
**ext_info,
):
"""具体agent(app)对话入口,构建对话记忆和对话目标等通用的Agent对话逻辑(需要外层基于会话封装一般不直接)
Args:
conv_id: 会话id
agent_conv_id:当前对话id
gpts_name:要对话的智能体(应用/agent/工作流等)
"""
# logger.info(
# f"agent_chat conv_id:{conv_id}, agent_conv_id:{agent_conv_id},gpts_name:{gpts_name},user_query:"
# f"{user_query}"
# )
root_tracer.set_current_agent_id(gpts_name) # 将当前agent app_code写入trace存储
digest(
CHAT_LOGGER,
"CHAT_ENTRY",
conv_id=conv_id,
app_code=gpts_name,
user_code=user_code,
)
start_ts = root_tracer.get_context_entrance_ms() or current_ms()
succeed = False
first_chunk_time = None
if isinstance(user_query, str):
user_query: HumanMessage = HumanMessage.parse_chat_completion_message(
user_query, ignore_unknown_media=True
)
root_tracer.set_context_conv_id(agent_conv_id)
message_round = 0
history_message_count = 0
is_retry_chat = False
last_speaker_name = None
history_messages = None
########################################################
app_config = self.system_app.config.configs.get("app_config")
web_config = app_config.service.web
app_service = get_app_service()
gpt_app: GptsApp = await app_service.app_detail(
gpts_name, specify_config_code, building_mode=False
)
await self.dynamic_resource_adapter(gpt_app, ext_info)
if not gpt_app:
raise ValueError(f"Not found app {gpts_name}!")
# init gpts memory
vis_render = ext_info.get("vis_render", None)
# 如果接口指定使用接口传递,没有指定使用当前应用的布局配置
if not vis_render:
if gpt_app.layout and gpt_app.layout.chat_layout:
vis_render = gpt_app.layout.chat_layout.name
else:
vis_render = "gpt_vis_all"
vis_converter_mng = get_vis_manager()
vis_protocol = vis_converter_mng.get_by_name(vis_render)(
derisk_url=web_config.web_url
)
ext_info["incremental"] = vis_protocol.incremental
await self.memory.init(
agent_conv_id,
app_code=gpts_name,
history_messages=history_messages,
start_round=history_message_count,
vis_converter=vis_protocol,
)
#########################################################
with root_tracer.start_span("agent.conversation.state_check"):
# 检查最后一个对话记录是否完成,如果是等待状态,则要继续进行当前对话
if gpts_conversations:
last_gpts_conversation: GptsConversationsEntity = gpts_conversations[-1]
logger.info(
f"last conversation status:{last_gpts_conversation.__dict__}"
)
if last_gpts_conversation.state == Status.WAITING.value:
is_retry_chat = True
agent_conv_id = last_gpts_conversation.conv_id
gpts_messages: List[
GptsMessage
] = await self.gpts_messages_dao.get_by_conv_id(agent_conv_id) # type:ignore
last_message = gpts_messages[-1]
message_round = last_message.rounds + 1
last_speaker_name = last_message.sender_name
# 恢复起来的会话,需要加载历史消息到记忆中
await self.memory.load_persistent_memory(agent_conv_id)
historical_dialogues: List[GptsMessage] = []
if not is_retry_chat:
# Create a new gpts conversation record
## When creating a new gpts conversation record, determine whether to
# include the history of previous topics according to the application
# definition.
if gpt_app.keep_start_rounds > 0 or gpt_app.keep_end_rounds > 0:
if gpts_conversations and len(gpts_conversations) > 0:
rely_conversations = []
if gpt_app.keep_start_rounds + gpt_app.keep_end_rounds < len(
gpts_conversations
):
if gpt_app.keep_start_rounds > 0:
front = gpts_conversations[gpt_app.keep_start_rounds :]
rely_conversations.extend(front)
if gpt_app.keep_end_rounds > 0:
back = gpts_conversations[-gpt_app.keep_end_rounds :]
rely_conversations.extend(back)
else:
rely_conversations = gpts_conversations
for gpts_conversation in rely_conversations:
temps: List[GptsMessage] = await self.memory.get_messages(
gpts_conversation.conv_id
)
if temps and len(temps) > 1:
historical_dialogues.append(temps[0])
historical_dialogues.append(temps[-1])
user_goal = json.dumps(user_query.to_dict(), ensure_ascii=False)
user_goal = user_goal[: min(len(user_goal), 6500)] if user_goal else ""
await self.gpts_conversations.a_add(
GptsConversationsEntity(
conv_id=agent_conv_id,
conv_session_id=conv_id,
user_goal=user_goal,
gpts_name=gpts_name,
team_mode=gpt_app.team_mode,
state=Status.RUNNING.value,
max_auto_reply_round=0,
auto_reply_count=0,
user_code=user_code,
sys_code=sys_code,
vis_render=vis_render,
extra=orjson.dumps(ext_info).decode(),
)
)
# init agent memory
agent_memory = self.get_or_build_derisk_memory(
conv_id, gpt_app.app_code, user_code, gpt_app.team_context
)
file_handle = None
try:
task = asyncio.create_task(
self._inner_chat(
user_query=user_query,
conv_session_id=conv_id,
conv_uid=agent_conv_id,
gpts_app=gpt_app,
agent_memory=agent_memory,
is_retry_chat=is_retry_chat,
last_speaker_name=last_speaker_name,
init_message_rounds=message_round,
historical_dialogues=historical_dialogues,
user_code=user_code,
sys_code=sys_code,
stream=stream,
chat_in_params=chat_in_params,
**ext_info,
)
)
## TEST FILE WRITE
WRITE_TO_FILE = True
if WRITE_TO_FILE:
from derisk.configs.model_config import DATA_DIR
import os
chat_chunk_file_path = os.path.join(DATA_DIR, "chat_chunk_file")
os.makedirs(chat_chunk_file_path, exist_ok=True)
filename = os.path.join(
chat_chunk_file_path, f"_chat_file_{agent_conv_id}.jsonl"
)
file_handle = open(filename, "w", encoding="utf-8")
if stream == True:
async for chunk in self._chat_messages(agent_conv_id):
if chunk and len(chunk) > 0:
try:
content = orjson.dumps({"vis": chunk}).decode("utf-8")
if WRITE_TO_FILE:
file_handle.write(content)
file_handle.write("\n")
resp = f"data:{content}\n\n"
first_chunk_time = first_chunk_time or current_ms()
yield task, resp, agent_conv_id
except Exception as e:
logger.exception(
f"get messages {gpts_name} Exception!" + str(e)
)
yield task, f"data: {str(e)}\n\n", agent_conv_id
# 5. 最终处理
if task.done() and task.exception():
# 如果任务有异常,返回错误
logger.exception(f"agent chat exception!{conv_id}")
raise task.exception()
else:
# 正常结束
yield task, _format_vis_msg("[DONE]"), agent_conv_id
else:
logger.info("非流式消息输出!")
last_chunk = None, None, None
async for chunk in self._chat_messages(agent_conv_id):
if chunk and len(chunk) > 0:
if not first_chunk_time:
yield task, "", agent_conv_id
try:
content = json.dumps(
{"vis": chunk},
default=serialize,
ensure_ascii=False,
)
if WRITE_TO_FILE:
file_handle.write(content)
file_handle.write("\n")
resp = f"data:{content}\n\n"
first_chunk_time = first_chunk_time or current_ms()
last_chunk = task, resp, agent_conv_id
except Exception as e:
logger.exception(
f"get messages {gpts_name} Exception!" + str(e)
)
yield task, f"data: {str(e)}\n\n", agent_conv_id
yield last_chunk
succeed = True
except asyncio.CancelledError:
# 取消时不立即回调
logger.info("Generator cancelled, delaying callback")
raise
except Exception as e:
logger.exception(f"Agent chat have error!{str(e)}")
raise e
# yield task, str(e), agent_conv_id
finally:
digest(
CHAT_LOGGER,
"CHAT_DONE",
conv_id=conv_id,
app_code=gpts_name,
user_code=user_code,
succeed=succeed,
cost_ms=current_ms() - start_ts,
first_chunk_time=(first_chunk_time - start_ts)
if first_chunk_time
else 0,
)
# 确保文件句柄关闭
if file_handle:
file_handle.close()
def get_or_build_agent_memory(self, conv_id: str, derisks_name: str) -> AgentMemory:
session_memory = ShortTermMemory(buffer_size=10)
agent_memory = AgentMemory(session_memory, gpts_memory=self.memory)
return agent_memory
@trace("agent.get_or_build_memory", requires=["conv_id", "agent_id"])
def get_or_build_derisk_memory(
self,
conv_id: str,
agent_id: str,
user_id: str,
team_context: Optional[AgentContextType] = None,
) -> AgentMemory:
"""Get or build a Derisk memory instance for the given conversation ID.
Args:
conv_id:(str) conversation ID
agent_id:(str) app_code
"""
session_memory = ShortTermMemory(buffer_size=20)
agent_memory = AgentMemory(
memory=session_memory,
gpts_memory=self.memory,
)
return agent_memory
async def build_agent_by_app_code(
self,
app_code: str,
context: AgentContext,
agent_memory: AgentMemory = None,
**kwargs,
) -> ConversableAgent:
app_service = get_app_service()
gpts_app: ServerResponse = await app_service.app_detail(
app_code, building_mode=False
)
agent_memory = agent_memory or self.get_or_build_agent_memory(
context.conv_id, gpts_app.app_name
)
resource_manager: ResourceManager = get_resource_manager()
return await self._build_agent_by_gpts(
context=context,
agent_memory=agent_memory,
rm=resource_manager,
app=gpts_app,
**kwargs,
)
async def _have_agent_skill(
self, app: GptsApp, dynamic_resources: Optional[List[AgentResource]] = None
):
"""检查应用是否包含 AgentSkill 资源"""
if app.resource_tool and any(
item.type in [AgentSkillResource.type(), DeriskSkillResource.type()]
for item in app.resource_tool
):
return True
if app.all_resources and any(
item.type in [AgentSkillResource.type(), DeriskSkillResource.type()]
for item in app.all_resources
):
return True
if dynamic_resources and any(
item.type in [AgentSkillResource.type(), DeriskSkillResource.type()]
for item in dynamic_resources
):
return True
return False
@trace("agent.build_agent_by_gpts")
async def _build_agent_by_gpts(
self,
context: AgentContext,
agent_memory: AgentMemory,
rm: ResourceManager,
app: GptsApp,
scheduler: Optional[Scheduler],
need_sandbox: bool = False,
**kwargs,
) -> ConversableAgent:
"""Build a dialogue target agent through gpts configuration"""
from datetime import datetime
logger.info(
f"_build_agent_by_gpts:{app.app_code},{app.app_name}, start:{datetime.now()}"
)
try:
## 检测动态资源
real_all_resources = kwargs.get("dynamic_resources", [])
# 使用全局缓存获取或创建 sandbox_manager,避免并行创建重复的沙箱
sandbox_manager = await self._get_or_create_sandbox_manager(
context, app, need_sandbox
)
# 初始化场景文件到沙箱(如果应用绑定了场景)
# 注意:每个Agent有独立的场景文件目录,避免多Agent共享沙箱时的冲突
if sandbox_manager and app.scenes and len(app.scenes) > 0:
try:
from derisk.agent.core_v2.scene_sandbox_initializer import (
initialize_scenes_for_agent,
)
scene_init_result = await initialize_scenes_for_agent(
app_code=app.app_code,
agent_name=app.app_name or app.app_code or "default_agent",
scenes=app.scenes,
sandbox_manager=sandbox_manager,
)
if scene_init_result.get("success"):
logger.info(
f"[AgentChat] Scene files initialized for {app.app_code}: "
f"{len(scene_init_result.get('files', []))} files "
f"in {scene_init_result.get('scenes_dir', 'unknown')}"
)
else:
logger.warning(
f"[AgentChat] Failed to initialize scene files for {app.app_code}: "
f"{scene_init_result.get('message')}"
)
except Exception as scene_init_error:
logger.warning(
f"[AgentChat] Error initializing scene files for {app.app_code}: "
f"{scene_init_error}"
)
# 场景初始化失败不影响主流程
employees: List[ConversableAgent] = []
if "extra_agents" in kwargs and kwargs.get("extra_agents"):
# extra_agents 表示动态添加的子Agent
employees = await self._build_extra_employees(
kwargs.get("extra_agents"), context, agent_memory, rm, scheduler
)
app.all_resources.extend(
[self.agent_to_resource(extra_agent) for extra_agent in employees]
)
elif app.details is not None and len(app.details) > 0:
employees: List[ConversableAgent] = await self._build_employees(
context,
agent_memory,
rm,
[deepcopy(item) for item in app.details],
scheduler,
)
team_mode = TeamMode(app.team_mode)
## 模型服务
if not self.llm_provider:
worker_manager = CFG.SYSTEM_APP.get_component(
ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory
).create()
self.llm_provider = DefaultLLMClient(
worker_manager, auto_convert_message=True
)
llm_config = LLMConfig(
llm_client=self.llm_provider,
llm_strategy=LLMStrategyType(app.llm_config.llm_strategy),
strategy_context=app.llm_config.llm_strategy_value,
llm_param=app.llm_config.llm_param,
mist_keys=app.llm_config.mist_keys,
)
real_all_resources.extend(app.all_resources)
real_all_resources = await self.add_duplicate_allow_tools(
real_all_resources
)
if team_mode == TeamMode.SINGLE_AGENT or TeamMode.NATIVE_APP == team_mode:
if employees is not None and len(employees) == 1:
recipient = employees[0]
else:
cls: Type[ConversableAgent] = self.agent_manage.get_by_name(
app.agent
)
## 处理agent资源内容
# depend_resource = await blocking_func_to_async(
# CFG.SYSTEM_APP, rm.build_resource, app.all_resources
# )
depend_resource = await rm.a_build_resource(real_all_resources)
agent_context = deepcopy(context)
agent_context.agent_app_code = app.app_code
recipient = (
await cls()
.bind(agent_context)
.bind(agent_memory)
.bind(llm_config)
.bind(sandbox_manager)
.bind(depend_resource)
# .bind(prompt_template)
.bind(app.context_config)
.bind(ExtConfigHolder(ext_config=app.ext_config))
.bind(scheduler)
.build()
)
## 处理Agent实例的基本信息
temp_profile = recipient.profile.copy()
temp_profile.desc = app.app_describe
temp_profile.name = app.app_name
temp_profile.avatar = app.icon
if app.system_prompt_template is not None:
temp_profile.system_prompt_template = app.system_prompt_template
if app.user_prompt_template:
temp_profile.user_prompt_template = app.user_prompt_template
# 如果应用有场景,读取场景内容并注入到Agent的System Prompt
if app.scenes and len(app.scenes) > 0 and sandbox_manager:
try:
scene_content = await self._load_and_inject_scenes(
agent_name=app.app_name or app.app_code or "default_agent",
scenes=app.scenes,
sandbox_manager=sandbox_manager,
agent_profile=temp_profile,
)
if scene_content:
logger.info(
f"[AgentChat] 场景内容已注入Agent: "
f"{len(scene_content)} 字符"
)
except Exception as e:
logger.warning(f"[AgentChat] 场景内容注入失败: {e}")
# 场景注入失败不影响主流程
recipient.bind(temp_profile)
return recipient
elif TeamMode.AUTO_PLAN == team_mode:
agent_manager = get_agent_manager()
auto_team_ctx = app.team_context
manager_cls: Type[ConversableAgent] = agent_manager.get_by_name(
auto_team_ctx.teamleader
)
manager = manager_cls()
if real_all_resources:
# depend_resource = await blocking_func_to_async(
# CFG.SYSTEM_APP, rm.build_resource, app.all_resources
# )
depend_resource = await rm.a_build_resource(real_all_resources)
manager.bind(depend_resource)
agent_context = deepcopy(context)
agent_context.agent_app_code = app.app_code
manager = (
await manager.bind(agent_context)
.bind(llm_config)
.bind(agent_memory)
.bind(app.context_config)
.bind(sandbox_manager)
.bind(ExtConfigHolder(ext_config=app.ext_config))
.bind(scheduler)
.build()
)
## 处理Agent实例的基本信息
temp_profile = manager.profile.copy()
temp_profile.desc = app.app_describe
temp_profile.name = app.app_name
temp_profile.avatar = app.icon
if app.system_prompt_template is not None:
temp_profile.system_prompt_template = app.system_prompt_template
if app.user_prompt_template:
temp_profile.user_prompt_template = app.user_prompt_template
manager.bind(temp_profile)
if isinstance(manager, ManagerAgent) and len(employees) > 0:
manager.hire(employees)
logger.info(
f"_build_agent_by_gpts return:{manager.profile.name},{manager.profile.desc},{id(manager)}"
)
return manager