-
Notifications
You must be signed in to change notification settings - Fork 159
Expand file tree
/
Copy pathmain_server.py
More file actions
2589 lines (2261 loc) · 116 KB
/
main_server.py
File metadata and controls
2589 lines (2261 loc) · 116 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import sys
import os
# Make the repo root importable when this module is run as a script
# (python app/main_server.py). Under launcher.py the path is already set
# up; the insert below is then a no-op.
_repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
# Wire DI bindings (config._runtime resolvers ← utils.language_utils /
# utils.tokenize). Under launcher this is also done by app/__init__.py's
# side effect when ``from app import main_server`` runs; under direct
# script invocation (``python app/main_server.py``) Python does NOT execute
# the package __init__, so an explicit call is required. The function is
# idempotent — the second call is a no-op.
from app.runtime_bindings import install_runtime_bindings as _install_runtime_bindings
_install_runtime_bindings()
# Windows multiprocessing 支持:确保子进程不会重复执行模块级初始化
from multiprocessing import freeze_support
import multiprocessing
from utils.port_utils import set_port_probe_reuse
freeze_support()
# 设置 multiprocessing 启动方法(确保跨进程共享结构的一致性)
# 在 Linux/macOS 上使用 fork,在 Windows 上使用 spawn(默认)
if sys.platform != "win32":
try:
multiprocessing.set_start_method('fork', force=False)
except RuntimeError:
# 启动方法已经设置过,忽略
pass
# 检查是否需要执行初始化(用于防止 Windows spawn 方式创建的子进程重复初始化)
# 方案:首次导入时设置环境变量标记,子进程会继承这个标记从而跳过初始化
_INIT_MARKER = '_NEKO_MAIN_SERVER_INITIALIZED'
_IS_MAIN_PROCESS = _INIT_MARKER not in os.environ
if _IS_MAIN_PROCESS:
# 立即设置标记,这样任何从此进程 spawn 的子进程都会继承此标记
os.environ[_INIT_MARKER] = '1'
# 获取应用程序根目录(与 config_manager 保持一致)
def _get_app_root():
if getattr(sys, 'frozen', False):
if hasattr(sys, '_MEIPASS'):
return sys._MEIPASS
else:
return os.path.dirname(sys.executable)
else:
# Source mode: this file lives at <repo>/app/main_server.py, so the
# app root is two dirname() calls up.
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 仅在 Windows 上调整 DLL 搜索路径
if sys.platform == "win32" and hasattr(os, "add_dll_directory"):
os.add_dll_directory(_get_app_root())
import mimetypes # noqa
mimetypes.add_type("application/javascript", ".js")
import asyncio # noqa
import importlib # noqa
import inspect # noqa
import logging # noqa
import atexit # noqa
import httpx # noqa
import time # noqa
import signal # noqa
from datetime import datetime, timezone # noqa
from config import MAIN_SERVER_PORT, MONITOR_SERVER_PORT, USER_NOTIFICATION_ERROR_MAX_CHARS # noqa
from utils.cloudsave_autocloud import get_cloudsave_manager # noqa
from utils.cloudsave_runtime import (
CloudsaveDeadlineExceeded,
MaintenanceModeError,
ROOT_MODE_NORMAL,
bootstrap_local_cloudsave_environment,
is_write_fence_active,
maintenance_error_payload,
set_root_mode,
should_write_root_mode_normal_after_startup,
)
from utils.config_manager import get_config_manager, get_reserved # noqa
from utils.storage_location_bootstrap import get_storage_startup_blocking_reason
# 将日志初始化提前,确保导入阶段异常也能落盘
from utils.logger_config import setup_logging # noqa: E402
from utils.ssl_env_diagnostics import probe_ssl_environment, write_ssl_diagnostic # noqa: E402
logger, log_config = setup_logging(service_name="Main", log_level=logging.INFO, silent=not _IS_MAIN_PROCESS)
if _IS_MAIN_PROCESS:
_ssl_precheck = probe_ssl_environment()
if not _ssl_precheck.get("ok", True):
diag_dir = os.path.join(log_config.get_log_directory_path(), "diagnostics")
diag_path = write_ssl_diagnostic(
event="main_server_ssl_precheck_failed",
output_dir=diag_dir,
extra=_ssl_precheck,
)
logger.warning(
"SSL environment precheck failed: %s%s",
_ssl_precheck.get("error_message"),
f" | diagnostic: {diag_path}" if diag_path else "",
)
try:
from fastapi import FastAPI, Request # noqa
from fastapi.responses import JSONResponse # noqa
from fastapi.staticfiles import StaticFiles # noqa
from main_logic import core as core, cross_server as cross_server # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, set_main_bridge # noqa
from fastapi.templating import Jinja2Templates # noqa
from dataclasses import dataclass # noqa
from typing import Any, Optional # noqa
except Exception as e:
logger.exception(f"[Main] Module import failed during startup: {e}")
raise
# 导入创意工坊工具模块
from utils.workshop_utils import ( # noqa
get_workshop_root,
get_workshop_path
)
# 导入创意工坊路由中的函数
from main_routers.workshop_router import get_subscribed_workshop_items, sync_workshop_character_cards, warmup_ugc_cache # noqa
# 确定 templates 目录位置(使用 _get_app_root)
template_dir = _get_app_root()
templates = Jinja2Templates(directory=template_dir)
def initialize_steamworks():
try:
# 明确读取steam_appid.txt文件以获取应用ID
app_id = None
app_id_file = os.path.join(_get_app_root(), 'steam_appid.txt')
if os.path.exists(app_id_file):
with open(app_id_file, 'r') as f:
app_id = f.read().strip()
print(f"从steam_appid.txt读取到应用ID: {app_id}")
# 创建并初始化Steamworks实例
from steamworks import STEAMWORKS
steamworks = STEAMWORKS()
# 显示Steamworks初始化过程的详细日志
print("正在初始化Steamworks...")
steamworks.initialize()
steamworks.UserStats.RequestCurrentStats()
# 初始化后再次获取应用ID以确认
actual_app_id = steamworks.app_id
print(f"Steamworks初始化完成,实际使用的应用ID: {actual_app_id}")
# 检查全局logger是否已初始化,如果已初始化则记录成功信息
if 'logger' in globals():
logger.info(f"Steamworks初始化成功,应用ID: {actual_app_id}")
logger.info(f"Steam客户端运行状态: {steamworks.IsSteamRunning()}")
logger.info(f"Steam覆盖层启用状态: {steamworks.IsOverlayEnabled()}")
return steamworks
except Exception as e:
# 检查全局logger是否已初始化,如果已初始化则记录错误,否则使用print
error_msg = f"初始化Steamworks失败: {e}"
if 'logger' in globals():
logger.error(error_msg)
else:
print(error_msg)
return None
def get_default_steam_info():
global steamworks
# 检查steamworks是否初始化成功
if steamworks is None:
print("Steamworks not initialized. Skipping Steam functionality.")
if 'logger' in globals():
logger.info("Steamworks not initialized. Skipping Steam functionality.")
return
try:
my_steam64 = steamworks.Users.GetSteamID()
my_steam_level = steamworks.Users.GetPlayerSteamLevel()
subscribed_apps = steamworks.Workshop.GetNumSubscribedItems()
print(f'Subscribed apps: {subscribed_apps}')
print(f'Logged on as {my_steam64}, level: {my_steam_level}')
print('Is subscribed to current app?', steamworks.Apps.IsSubscribed())
except Exception as e:
print(f"Error accessing Steamworks API: {e}")
if 'logger' in globals():
logger.error(f"Error accessing Steamworks API: {e}")
# Steamworks 初始化将在 @app.on_event("startup") 中延迟执行
# 这样可以避免在模块导入时就执行 DLL 加载等操作
steamworks = None
_server_loop: asyncio.AbstractEventLoop | None = None
_config_manager = get_config_manager()
_cloudsave_manager = get_cloudsave_manager(_config_manager)
def _cloudsave_action_supports_deadline(action) -> bool:
try:
signature = inspect.signature(action)
except (TypeError, ValueError):
return False
if "deadline_monotonic" in signature.parameters:
return True
return any(
parameter.kind == inspect.Parameter.VAR_KEYWORD
for parameter in signature.parameters.values()
)
def _cloudsave_action_supports_steamworks(action) -> bool:
try:
signature = inspect.signature(action)
except (TypeError, ValueError):
return False
if "steamworks" in signature.parameters:
return True
return any(
parameter.kind == inspect.Parameter.VAR_KEYWORD
for parameter in signature.parameters.values()
)
async def _run_cloudsave_manager_action(
action_name: str,
*,
reason: str,
budget_seconds: float | None = None,
steamworks=None,
):
action = getattr(_cloudsave_manager, action_name)
kwargs = {"reason": reason}
if (
budget_seconds is not None
and budget_seconds > 0
and _cloudsave_action_supports_deadline(action)
):
kwargs["deadline_monotonic"] = time.monotonic() + float(budget_seconds)
if steamworks is not None and _cloudsave_action_supports_steamworks(action):
kwargs["steamworks"] = steamworks
return await asyncio.to_thread(action, **kwargs)
async def _request_memory_server_shutdown() -> None:
"""Request memory_server shutdown after main_server has finished its own cleanup."""
try:
from config import MEMORY_SERVER_PORT
shutdown_url = f"http://127.0.0.1:{MEMORY_SERVER_PORT}/shutdown"
async with httpx.AsyncClient(timeout=1, proxy=None, trust_env=False) as client:
response = await client.post(shutdown_url)
if response.status_code == 200:
logger.info("已向memory_server发送关闭信号")
else:
logger.warning(f"向memory_server发送关闭信号失败,状态码: {response.status_code}")
except Exception as e:
logger.warning(f"向memory_server发送关闭信号时出错: {e}")
class MemoryServerStartupBlocked(RuntimeError):
def __init__(self, payload: dict):
self.payload = dict(payload)
self.blocking_reason = str(self.payload.get("blocking_reason") or "").strip()
super().__init__(f"memory_server startup still blocked: {self.payload!r}")
async def _request_memory_server_continue_startup(reason: str = "") -> None:
"""Release memory_server from limited mode after the storage barrier is accepted."""
try:
from config import MEMORY_SERVER_PORT
from utils.internal_http_client import get_internal_http_client
client = get_internal_http_client()
response = await client.post(
f"http://127.0.0.1:{MEMORY_SERVER_PORT}/internal/storage/startup/continue",
json={"reason": reason},
timeout=60.0,
)
if response.status_code == 409:
try:
payload = response.json()
except Exception:
payload = {"ok": False, "blocking_reason": "", "error": response.text}
if isinstance(payload, dict) and payload.get("ok") is False and payload.get("blocking_reason"):
raise MemoryServerStartupBlocked(payload)
response.raise_for_status()
response.raise_for_status()
payload = response.json()
if not isinstance(payload, dict) or payload.get("ok") is not True:
raise RuntimeError(f"memory_server continue-startup returned unexpected payload: {payload!r}")
except MemoryServerStartupBlocked:
raise
except Exception as e:
raise RuntimeError(f"failed to release memory_server limited-mode startup: {e}") from e
async def _request_memory_server_block_startup(reason: str = "") -> None:
"""Return memory_server to limited mode when main_server cannot finish startup."""
try:
from config import MEMORY_SERVER_PORT
from utils.internal_http_client import get_internal_http_client
client = get_internal_http_client()
response = await client.post(
f"http://127.0.0.1:{MEMORY_SERVER_PORT}/internal/storage/startup/block",
json={"reason": reason},
timeout=10.0,
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, dict) or payload.get("ok") is not True:
raise RuntimeError(f"memory_server block-startup returned unexpected payload: {payload!r}")
except Exception as e:
raise RuntimeError(f"failed to restore memory_server limited-mode startup: {e}") from e
class _SyncMessageQueue(asyncio.Queue):
"""``asyncio.Queue`` with sync ``put()`` aliased to ``put_nowait()``.
``sync_message_queue`` 历史上是 ``queue.Queue``(线程安全),生产端在
core.py / system_router.py 等 14+ 处用同步 ``q.put(item)`` 调用。
cross_server 改成主 loop 上的 ``asyncio.Task`` 后,message_queue 切到
``asyncio.Queue``。原生 ``asyncio.Queue.put`` 是 coroutine,原 sync 调用
会变成"未 await 的 coroutine"——既不入队也产生 RuntimeWarning。
覆盖 ``put`` 为 sync alias 到 ``put_nowait`` 保持向后兼容:sync_message_queue
全部 unbounded(无 maxsize),``put_nowait`` 永远不会因满而 raise,所以
替换在语义上等价。
"""
def put(self, item): # type: ignore[override]
# 故意 sync override:原 asyncio.Queue.put 是 coroutine。
self.put_nowait(item)
@dataclass
class RoleState:
"""单个 catgirl 的 per-k 运行态容器。
把之前 6 张并列 module-global dict(sync_message_queue / sync_shutdown_event /
session_id / sync_process / websocket_locks / session_manager)合并成一个
record,由 role_state[k] 统一持有,避免半初始化状态 + 维护成本分散。
见 issue #857 / PR #855 review。
不变量:
- sync_message_queue / websocket_lock 在 _ensure_character_slots
一次性构造,之后**永不替换**。特别是 websocket_lock —— 替换会让已经
``async with`` 进来的协程阻塞在一把孤立的旧 Lock 上;如果任何逻辑
需要整体重建 role_state[k],必须把旧 lock 原样传过去。
- session_id / sync_task / session_manager 初始为 None,分别由
websocket_router / _init_character_resources 后续赋值。
历史字段:``sync_shutdown_event: ThreadEvent`` 和 ``sync_process: Thread``
在 cross_server 合并到主 event loop 后语义上已删除(不再起独立线程)。
生命周期改由 ``sync_task: asyncio.Task`` 管理,shutdown 走 ``task.cancel()``。
但 ``main_routers/shared_state.py`` 的 ``_RoleStateFieldView`` 仍为
``sync_shutdown_event`` / ``sync_process`` 暴露 dict-like 视图(``get_sync_shutdown_event()``
/ ``get_sync_process()`` 公共 router API)。视图的 ``__getitem__`` 用
``getattr(rs, field)``(不带 default),如果字段不存在会 ``AttributeError``。
保留这两个 ``Optional[Any] = None`` 占位字段维护 shim 的"永远空字典"语义:
``__contains__`` 看到 None 返回 False、``__getitem__`` 走 ``raise KeyError``,
所有调用者得到一致的空状态而不是崩溃。这两个字段不再被赋值,未来如果
确认外部确无依赖再清。
"""
sync_message_queue: _SyncMessageQueue
websocket_lock: asyncio.Lock
session_id: Optional[str] = None
sync_task: Optional[asyncio.Task] = None
# 用 Any 而非 core.LLMSessionManager:避免 dataclass 运行时求值 annotation
# 时踩到 forward-ref / 循环引用边界
session_manager: Optional[Any] = None
# 仅为 main_routers/shared_state.py 的 legacy field-view 提供占位;永远 None
sync_shutdown_event: Optional[Any] = None
sync_process: Optional[Any] = None
# 角色名 -> RoleState 的主存储;所有 per-k 同步资源都通过它访问
role_state: dict[str, RoleState] = {}
def _iter_sync_connector_tasks():
"""迭代所有仍然存活的同步连接器 task(按 role_state 为准)。"""
for name, rs in role_state.items():
task = rs.sync_task
if task is None:
continue
yield name, task
def _signal_sync_connectors_shutdown(*, log: bool = True) -> None:
"""取消所有同步连接器 task。task.cancel() 是同步、幂等、loop 关闭后亦无害的,
所以 atexit 二次调用安全。"""
if log:
logger.info("正在关闭同步连接器 task...")
for rs in role_state.values():
try:
task = rs.sync_task
if task is not None and not task.done():
task.cancel()
except Exception as e:
logger.debug(f"取消同步连接器 task 失败: {e}", exc_info=True)
async def join_sync_connector_tasks(timeout: float = 3.0) -> list[str]:
"""并行 await 所有同步连接器 task,返回在 timeout 内未结束的角色名。
通常调用前已经 ``_signal_sync_connectors_shutdown`` 取消过;这里只是等
各 task 走完 finally cleanup(关闭 ws/session/reader)。
"""
wait_timeout = max(0.0, float(timeout))
targets = list(_iter_sync_connector_tasks())
if not targets:
return []
async def _wait_one(name: str, task: asyncio.Task) -> str | None:
try:
await asyncio.wait_for(asyncio.shield(task), timeout=wait_timeout)
except asyncio.TimeoutError:
return name
except asyncio.CancelledError:
# task 正常 cancel 走完 finally 后会 raise CancelledError
return None
except Exception as e:
logger.debug(f"同步连接器 task {name} 退出时抛异常: {e}", exc_info=True)
return None
return None
results = await asyncio.gather(
*(_wait_one(name, task) for name, task in targets),
return_exceptions=False,
)
pending = [name for name in results if name]
if pending:
logger.warning(
"以下同步连接器 task 未在 %.1fs 内退出: %s",
wait_timeout,
", ".join(pending),
)
return pending
# 兼容别名:旧名 join_sync_connector_threads 在文件内有调用,先保留 alias 减小 diff
join_sync_connector_threads = join_sync_connector_tasks
def cleanup(*, log: bool = True):
"""通知所有同步连接器 task 停止。log=False 用于 atexit 二次触发时抑制重复日志。"""
_signal_sync_connectors_shutdown(log=log)
def _reset_sync_connector_shutdown_events() -> None:
"""已是空实现:旧版用 ThreadEvent.clear() 让下次启动可以复用线程槽位;
现在 task 模式下没有可重置的状态——已死的 task 会被 ``_init_character_resources``
检测后直接 ``asyncio.create_task`` 重启。保留函数名以避免修改众多调用点。"""
return
# 只在主进程中注册 cleanup 函数,防止子进程退出时执行清理
# log=False:on_shutdown 已经打印过 "正在清理资源...",atexit 补一刀时不重复 log
if _IS_MAIN_PROCESS:
atexit.register(cleanup, log=False)
# 角色数据全局变量(会在重载时更新)
master_name = None
her_name = None
master_basic_config = None
lanlan_basic_config = None
name_mapping = None
lanlan_prompt = None
time_store = None
setting_store = None
recent_log = None
catgirl_names = []
agent_event_bridge: MainServerAgentBridge | None = None
def _is_websocket_connected(ws) -> bool:
"""Check if a WebSocket is in CONNECTED state."""
if not ws:
return False
if not hasattr(ws, "client_state"):
return False
try:
return ws.client_state == ws.client_state.CONNECTED
except Exception:
return False
def _iter_session_managers():
"""Yield (name, session_manager) for every role with a live session_manager.
Replaces the old ``session_manager.items()`` pattern after the per-k dicts
were consolidated into ``role_state``.
"""
for name, rs in role_state.items():
if rs.session_manager is not None:
yield name, rs.session_manager
def _get_session_manager(name):
"""Return ``role_state[name].session_manager`` or None — dict.get() equivalent."""
if not name:
return None
rs = role_state.get(name)
return rs.session_manager if rs is not None else None
def _select_fallback_session_manager():
"""Return a single connected session manager as a safe fallback, if unambiguous."""
connected = []
for name, mgr in _iter_session_managers():
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
connected.append((name, mgr))
if len(connected) == 1:
return connected[0]
return None, None
async def _broadcast_to_all_connected(event_payload: dict) -> int:
"""Broadcast an event to all connected WebSocket sessions in parallel.
每秒可能多次(agent status),串行 await 会让一个慢的 ws 拖累其它会话。"""
# Take a snapshot to avoid RuntimeError from concurrent dict mutation
targets = [
(name, getattr(mgr, "websocket", None))
for name, mgr in list(_iter_session_managers())
if mgr
]
targets = [(n, ws) for n, ws in targets if _is_websocket_connected(ws) and hasattr(ws, "send_json")]
async def _send_one(name, ws):
try:
await ws.send_json(event_payload)
return True
except Exception as e:
logger.debug("[EventBus] broadcast to %s failed: %s", name, e)
return False
results = await asyncio.gather(*(_send_one(n, ws) for n, ws in targets), return_exceptions=False)
return sum(1 for r in results if r is True)
async def _handle_agent_event(event: dict):
"""通过 ZeroMQ 接收 agent_server 事件,并分发到 core/websocket。"""
try:
event_type = event.get("event_type")
lanlan = event.get("lanlan_name")
if event_type == "analyze_ack":
logger.info(
"[EventBus] analyze_ack received on main: event_id=%s lanlan=%s",
event.get("event_id"),
lanlan,
)
notify_analyze_ack(str(event.get("event_id") or ""))
return
# Agent status updates may be broadcast (lanlan_name omitted).
if event_type == "agent_status_update":
payload = {
"type": "agent_status_update",
"snapshot": event.get("snapshot", {}),
"lanlan_name": lanlan or "",
}
mgr_for_status = _get_session_manager(lanlan)
if lanlan and mgr_for_status is not None:
mgr = mgr_for_status
ws = getattr(mgr, "websocket", None) if mgr else None
if _is_websocket_connected(ws):
try:
await ws.send_json(payload)
except Exception as e:
logger.debug("[EventBus] agent_status_update send failed: %s", e)
else:
await _broadcast_to_all_connected(payload)
return
# Resolve target session manager; fallback to broadcast if lanlan is unknown
mgr = _get_session_manager(lanlan)
if not mgr and event_type == "task_update":
# Broadcast task_update to all connected sessions when lanlan is unresolvable
task_payload = {"type": "agent_task_update", "task": event.get("task", {})}
delivered = await _broadcast_to_all_connected(task_payload)
if delivered == 0:
logger.warning("[EventBus] task_update broadcast: no connected WebSocket sessions")
return
# --- Music Global Broadcasts (Must come before early 'if not mgr' returns) ---
elif event_type == "music_allowlist_add":
# Music allowlist is a global UI state, broadcast to all active sessions
targets = [mgr] if mgr else [m for _, m in _iter_session_managers()]
payload = {
"type": "music_allowlist_add",
"domains": event.get("domains") or event.get("metadata", {}).get("domains", [])
}
async def _send_allowlist(target_mgr):
if target_mgr and target_mgr.websocket and hasattr(target_mgr.websocket, "send_json"):
try:
await target_mgr.websocket.send_json(payload)
except Exception as e:
logger.debug("[EventBus] music_allowlist_add broadcast failed: %s", e)
await asyncio.gather(*(_send_allowlist(t) for t in targets), return_exceptions=True)
if targets:
logger.info("[EventBus] music_allowlist_add broadcasted to %d sessions", len(targets))
return
elif event_type == "music_play_url":
# Music playback is a global UI action, broadcast to all active sessions
targets = [mgr] if mgr else [m for _, m in _iter_session_managers()]
payload = {
"type": "music_play_url",
"url": event.get("url"),
"name": event.get("name") or "Plugin Music",
"artist": event.get("artist") or "External"
}
async def _send_play(target_mgr):
if target_mgr and target_mgr.websocket and hasattr(target_mgr.websocket, "send_json"):
try:
await target_mgr.websocket.send_json(payload)
except Exception as e:
logger.debug("[EventBus] music_play_url broadcast failed: %s", e)
await asyncio.gather(*(_send_play(t) for t in targets), return_exceptions=True)
if targets:
logger.info("[EventBus] music_play_url broadcasted to %d sessions", len(targets))
return
if not mgr and event_type in ("proactive_message", "task_result"):
fallback_name, fallback_mgr = _select_fallback_session_manager()
if fallback_mgr is not None:
mgr = fallback_mgr
logger.warning(
"[EventBus] %s rerouted: lanlan=%s missing, fallback_session=%s",
event_type,
lanlan,
fallback_name,
)
else:
# No target session found — drop the event entirely.
# Do NOT broadcast text to other sessions to prevent cross-session leaks.
logger.info(
"[EventBus] %s dropped: no target session for lanlan=%s, active_sessions=%s",
event_type,
lanlan,
[name for name, _ in _iter_session_managers()],
)
return
if not mgr:
logger.info("[EventBus] %s dropped: no session_manager for lanlan=%s", event_type, lanlan)
return
if event_type in ("task_result", "proactive_message"):
raw_text = event.get("text") or ""
# Why: chat-blind passthrough must preserve verbatim whitespace;
# only the empty-check / log / callback paths use the stripped form.
text = raw_text.strip()
# v2 push_message: media parts (image/audio/video) ride on the
# same proactive_message event. Image parts go straight to the
# realtime session via ``stream_image`` (the public vision-input
# API on OmniRealtimeClient/OmniOfflineClient) before the (text
# → callback) path so the AI sees them in the same context
# window as the text it's about to respond to.
#
# Audio / video aren't supported here — ``stream_audio`` is the
# live-mic PCM pipeline (specific sample rate + RNNoise gate),
# not a generic file injector, and we have no video API.
# ai_behavior=blind suppresses injection entirely.
media_parts = event.get("media_parts") if isinstance(event.get("media_parts"), list) else []
ai_behavior_v2 = event.get("ai_behavior")
if media_parts and ai_behavior_v2 in ("respond", "read"):
sess = getattr(mgr, "session", None)
stream_image = getattr(sess, "stream_image", None) if sess else None
for mp in media_parts:
if not isinstance(mp, dict):
continue
part_type = mp.get("type")
b64 = mp.get("binary_base64")
url = mp.get("url")
mime = mp.get("mime") or ""
if part_type != "image":
# ``audio`` / ``video`` need provider-specific transport
# we don't have today; drop with a one-line warning so
# plugin authors notice instead of silently losing
# frames.
logger.warning(
"[EventBus] media_part type=%s not yet supported (mime=%s); dropped",
part_type, mime,
)
continue
if stream_image is None:
logger.debug(
"[EventBus] image media_part dropped: session=%s has no stream_image",
type(sess).__name__ if sess else "None",
)
continue
if isinstance(b64, str) and b64:
# ``stream_image`` takes a base64 STRING (not bytes); pass through
try:
await stream_image(b64)
logger.debug(
"[EventBus] image media_part injected (base64 len=%d, mime=%s)",
len(b64), mime,
)
except Exception as e:
logger.warning("[EventBus] image media_part stream_image failed: %s", e)
elif isinstance(url, str) and url:
# TODO(v0.9): fetch URL → bytes → base64 → stream_image.
# Until then plugin authors should inline-encode small
# images (≤256KB) or pre-fetch URL-served frames into
# ``parts`` themselves.
logger.warning(
"[EventBus] image media_part url=%s not yet fetched; dropped",
url[:80],
)
# else: malformed part, silently skip
if text:
if event.get("direct_reply"):
detail_text = (event.get("detail") or text).strip()
delivered = False
if detail_text and hasattr(mgr, "send_lanlan_response"):
try:
delivered = bool(await mgr.send_lanlan_response(detail_text, True))
except Exception as e:
logger.warning("[EventBus] direct task_result reply failed: %s", e)
if delivered and hasattr(mgr, "handle_proactive_complete"):
try:
await mgr.handle_proactive_complete()
except Exception as e:
logger.warning("[EventBus] direct task_result turn_end failed: %s", e)
if delivered:
# detail_text 是面向用户的回复内容,不写 logger
logger.info("[EventBus] direct task_result reply delivered (detail_len=%d)", len(detail_text))
print(f"[EventBus] direct task_result reply: {detail_text[:60]}")
return
# Build structured callback and enqueue for LLM injection
cb_status = event.get("status") or ("completed" if event.get("success", True) else "failed")
# delivery_mode controls how the callback reaches the LLM:
# proactive (default): enqueue + immediately schedule trigger_agent_callbacks
# passive : enqueue only (next user turn will drain)
# silent : skip LLM channel entirely (frontend HUD still fires)
delivery_mode = (event.get("delivery_mode") or "proactive").strip()
if delivery_mode not in ("proactive", "passive", "silent"):
delivery_mode = "proactive"
# Defensive: blind ai_behavior must NEVER reach the LLM channel,
# even if delivery_mode arrives as "proactive" / "passive". The
# plugin proactive_bridge already maps blind→silent, but this
# is an indirect contract — a future direct emitter (or a bug
# in another bridge) could violate it. Forcing silent here
# locks the (blind ⇒ no LLM enqueue) invariant on the host
# side regardless of caller-supplied delivery_mode.
if (event.get("ai_behavior") or "").strip() == "blind":
delivery_mode = "silent"
# Default source_kind from channel when caller didn't specify one.
# Plugin emit sites already pass explicit source_kind/source_name.
_channel = event.get("channel") or "unknown"
source_kind = (event.get("source_kind") or "").strip()
source_name = (event.get("source_name") or "").strip()
if not source_kind:
if _channel == "user_plugin":
source_kind = "plugin"
elif _channel in ("computer_use", "cu"):
source_kind = "cu"
elif _channel in ("browser_use", "browser"):
source_kind = "browser"
elif _channel.startswith("plugin:"):
source_kind = "plugin"
if not source_name:
source_name = _channel.split(":", 1)[1]
else:
source_kind = "system"
event_metadata = event.get("metadata") if isinstance(event.get("metadata"), dict) else {}
# origin is a STRUCTURAL fact derived from event_type:
# "task_result" → real task completion (agent_server._emit_task_result):
# Computer Use / Browser Use / plugin entry / MCP tool result
# "proactive_message" → plugin push_message stream (proactive_bridge):
# danmaku / gift / external notification
# Plugin authors cannot influence this — it's determined by which
# SDK method they call (finish() vs push_message()) and which host
# path it flows through. _build_callback_instruction uses this to
# pick the right wrapper template (task "汇报" vs event "回应").
if event_type == "task_result":
origin = "task_result"
else:
# event_type == "proactive_message" (or any future event-stream
# producer that lands on this branch); see the (event_type in
# {"task_result", "proactive_message"}) gate above.
origin = "event"
callback = {
"event": "agent_task_callback",
"origin": origin,
"task_id": event.get("task_id") or "",
"channel": _channel,
"status": cb_status,
"success": bool(event.get("success", True)),
"summary": event.get("summary") or text,
"detail": event.get("detail") or text,
"error_message": event.get("error_message") or "",
"source_kind": source_kind,
"source_name": source_name,
"delivery_mode": delivery_mode,
"timestamp": event.get("timestamp") or "",
"metadata": event_metadata,
"context_type": event_metadata.get("context_type") or "",
}
if delivery_mode != "silent":
mgr.enqueue_agent_callback(callback)
if delivery_mode == "passive":
logger.info(
"[EventBus] %s enqueued callback (passive); next user turn will carry it",
event_type,
)
else:
logger.info(
"[EventBus] %s enqueued callback, scheduling trigger_agent_callbacks",
event_type,
)
# Create task with exception logging
async def _run_trigger_with_logging():
try:
await mgr.trigger_agent_callbacks()
except Exception as e:
logger.error("[EventBus] trigger_agent_callbacks task failed: %s", e)
mgr._pending_agent_callback_task = asyncio.create_task(_run_trigger_with_logging())
else:
logger.info(
"[EventBus] %s delivery=silent: skipping LLM channel (frontend HUD still fires)",
event_type,
)
# v2 chat+blind passthrough: render verbatim into chat
# bubble WITHOUT entering chat-LLM context. Distinct from
# mirror_assistant_output (which writes to sync_message_queue
# so cross_server may add an AIMessage). Both this branch
# and the HUD agent_notification below can fire when
# visibility=["chat","hud"] — they're orthogonal sinks.
#
# Gated on visibility containing "chat" AND ai_behavior=="blind"
# because non-blind ai_behavior already enqueues the LLM
# callback above and the AI's own response is what the
# user should see in the chat bubble.
_vis_raw = event.get("visibility")
_vis_present = isinstance(_vis_raw, list)
_vis = _vis_raw if _vis_present else []
_ai_behavior = (event.get("ai_behavior") or "").strip()
if (
"chat" in _vis
and _ai_behavior == "blind"
and hasattr(mgr, "passthrough_to_chat_bubble")
):
passthrough_dispatched = False
try:
# Reuse the already-resolved source_kind local (computed
# above from channel: computer_use→cu, browser_use→browser,
# plugin:*→plugin, else system). Falling back to event
# raw + "plugin" default would mislabel non-plugin sources.
passthrough_source = source_kind or "plugin"
# Why: passthrough_to_chat_bubble swallows send_json
# failures and is a no-op when WS is missing/disconnected,
# so absence-of-exception is NOT proof a frame was sent.
# We must gate handle_proactive_complete on the bool
# return — otherwise we emit turn-end without a matching
# turn-start (frontend never opened the assistant
# lifecycle), corrupting proactive rescheduling.
passthrough_dispatched = bool(
await mgr.passthrough_to_chat_bubble(
raw_text,
request_id=event.get("task_id") or None,
source=passthrough_source,
)
)
logger.info(
"[EventBus] passthrough_to_chat_bubble dispatched=%s (text_len=%d, source=%s)",
passthrough_dispatched, len(text), passthrough_source,
)
except Exception as e:
logger.warning(
"[EventBus] passthrough_to_chat_bubble failed: %s", e,
)
# Why: gemini_response opens an assistant turn lifecycle on
# the frontend (ensureAssistantTurnStarted in app-websocket.js);
# without a matching turn-end event the assistant bubble
# stays "in-progress" and proactive rescheduling / lifecycle
# finalization never fire. handle_proactive_complete is the
# canonical turn-end emitter shared with the direct task_result
# reply path above. The HUD agent_notification branch below
# does NOT open an assistant turn, so single-emit here is
# sufficient even when visibility=["chat","hud"].
if passthrough_dispatched and hasattr(mgr, "handle_proactive_complete"):
try:
await mgr.handle_proactive_complete()
except Exception as e:
logger.warning(
"[EventBus] passthrough turn_end emit failed: %s", e,
)
# v2 visibility contract: HUD agent_notification fires only
# when "hud" is in visibility. Why: visibility=["chat"] must
# not double-render as both chat bubble AND HUD toast.
# Legacy emitters that omit the visibility field entirely
# (no v2 plumbing) keep the pre-v2 behavior of firing HUD
# by default — checked via _vis_present, not via _vis truthiness,
# so an explicit visibility=[] (v2 "no verbatim render") suppresses HUD.
_hud_allowed = ("hud" in _vis) if _vis_present else True
ws = getattr(mgr, "websocket", None)
if not _hud_allowed:
logger.info(
"[EventBus] agent_notification suppressed by visibility=%s (no 'hud') for lanlan=%s",
_vis, lanlan,
)
elif _is_websocket_connected(ws):
try:
notif = {
"type": "agent_notification",
"text": text,
"source": "brain",
"status": cb_status,
}
err_msg = event.get("error_message") or ""
if err_msg:
notif["error_message"] = err_msg[:USER_NOTIFICATION_ERROR_MAX_CHARS]
await ws.send_json(notif)
# text 是面向前端的通知正文,不写 logger
logger.info("[EventBus] agent_notification sent to frontend (text_len=%d)", len(text))
print(f"[EventBus] agent_notification text: {text[:60]}")
except Exception as e:
logger.warning("[EventBus] agent_notification WS send failed: %s", e)
else:
logger.warning("[EventBus] agent_notification: WebSocket not connected for lanlan=%s", lanlan)
elif event_type == "agent_notification":
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
try:
notif = {
"type": "agent_notification",
"text": event.get("text", ""),
"source": event.get("source", "brain"),
"status": event.get("status", "error"),
}
err_msg = event.get("error_message") or ""
if err_msg:
notif["error_message"] = err_msg[:USER_NOTIFICATION_ERROR_MAX_CHARS]
await ws.send_json(notif)
except Exception as e:
logger.debug("[EventBus] agent_notification send failed: %s", e)
else:
logger.debug("[EventBus] agent_notification: WebSocket not connected for lanlan=%s", lanlan)
elif event_type == "task_update":
task_payload = {"type": "agent_task_update", "task": event.get("task", {})}
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
try:
await ws.send_json(task_payload)
except Exception as e:
logger.warning("[EventBus] task_update send failed for lanlan=%s: %s", lanlan, e)
else:
logger.warning("[EventBus] task_update dropped: WebSocket not connected for lanlan=%s", lanlan)
except Exception as e:
logger.debug(f"handle_agent_event error: {e}")
async def _refresh_character_globals():
"""刷新角色相关 module globals(从 config 重新拉一次 aget_character_data)。
所有 fast-path 入口都必须先走这一步,确保 set_current_catgirl / update_catgirl
等操作后,后续读 her_name / lanlan_prompt / lanlan_basic_config 的代码看到最新值。
"""
global master_name, her_name, master_basic_config, lanlan_basic_config
global name_mapping, lanlan_prompt, time_store, setting_store, recent_log
global catgirl_names
master_name, her_name, master_basic_config, lanlan_basic_config, name_mapping, lanlan_prompt, time_store, setting_store, recent_log = await _config_manager.aget_character_data()
catgirl_names = list(lanlan_prompt.keys())
def _ensure_character_slots(k: str) -> bool:
"""为单个 catgirl 预备 per-k 同步资源槽位。返回是否为新建角色(决定后续要不要强制启动 task)。
纯内存的原子操作:要么 role_state[k] 已经存在(什么都不做),要么一次性
把 queue / websocket_lock 两件全部填好。避免旧代码里 6 张 dict 用两种不同
sentinel(sync_message_queue vs websocket_locks)各自判断 "角色是否已有
槽位" 造成的半初始化风险。
注:``asyncio.Queue`` 在 Python 3.10+ 创建时不需要 running loop;
本函数虽然是 sync,但调用链上来自 ``initialize_character_data`` /
``_init_character_resources`` 等 async 上下文,loop 可用。
"""
if k not in role_state:
role_state[k] = RoleState(
sync_message_queue=_SyncMessageQueue(),
websocket_lock=asyncio.Lock(),
)
logger.info(f"为角色 {k} 初始化新资源")
return True
return False