forked from Project-N-E-K-O/N.E.K.O
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain_server.py
More file actions
1280 lines (1124 loc) · 54.9 KB
/
main_server.py
File metadata and controls
1280 lines (1124 loc) · 54.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Windows multiprocessing 支持:确保子进程不会重复执行模块级初始化
from multiprocessing import freeze_support
import multiprocessing
from utils.port_utils import set_port_probe_reuse
freeze_support()
# 设置 multiprocessing 启动方法(确保跨进程共享结构的一致性)
# 在 Linux/macOS 上使用 fork,在 Windows 上使用 spawn(默认)
if sys.platform != "win32":
try:
multiprocessing.set_start_method('fork', force=False)
except RuntimeError:
# 启动方法已经设置过,忽略
pass
# 检查是否需要执行初始化(用于防止 Windows spawn 方式创建的子进程重复初始化)
# 方案:首次导入时设置环境变量标记,子进程会继承这个标记从而跳过初始化
_INIT_MARKER = '_NEKO_MAIN_SERVER_INITIALIZED'
_IS_MAIN_PROCESS = _INIT_MARKER not in os.environ
if _IS_MAIN_PROCESS:
# 立即设置标记,这样任何从此进程 spawn 的子进程都会继承此标记
os.environ[_INIT_MARKER] = '1'
# 获取应用程序根目录(与 config_manager 保持一致)
def _get_app_root():
if getattr(sys, 'frozen', False):
if hasattr(sys, '_MEIPASS'):
return sys._MEIPASS
else:
return os.path.dirname(sys.executable)
else:
return os.getcwd()
# 仅在 Windows 上调整 DLL 搜索路径
if sys.platform == "win32" and hasattr(os, "add_dll_directory"):
os.add_dll_directory(_get_app_root())
import mimetypes # noqa
mimetypes.add_type("application/javascript", ".js")
import asyncio # noqa
import logging # noqa
import atexit # noqa
import httpx # noqa
from config import MAIN_SERVER_PORT, MONITOR_SERVER_PORT # noqa
from utils.config_manager import get_config_manager, get_reserved # noqa
# 将日志初始化提前,确保导入阶段异常也能落盘
from utils.logger_config import setup_logging # noqa: E402
from utils.ssl_env_diagnostics import probe_ssl_environment, write_ssl_diagnostic # noqa: E402
logger, log_config = setup_logging(service_name="Main", log_level=logging.INFO, silent=not _IS_MAIN_PROCESS)
if _IS_MAIN_PROCESS:
_ssl_precheck = probe_ssl_environment()
if not _ssl_precheck.get("ok", True):
diag_dir = os.path.join(log_config.get_log_directory_path(), "diagnostics")
diag_path = write_ssl_diagnostic(
event="main_server_ssl_precheck_failed",
output_dir=diag_dir,
extra=_ssl_precheck,
)
logger.warning(
"SSL environment precheck failed: %s%s",
_ssl_precheck.get("error_message"),
f" | diagnostic: {diag_path}" if diag_path else "",
)
try:
from fastapi import FastAPI # noqa
from fastapi.staticfiles import StaticFiles # noqa
from main_logic import core as core, cross_server as cross_server # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, set_main_bridge # noqa
from fastapi.templating import Jinja2Templates # noqa
from threading import Thread, Event as ThreadEvent # noqa
from queue import Queue # noqa
except Exception as e:
logger.exception(f"[Main] Module import failed during startup: {e}")
raise
# 导入创意工坊工具模块
from utils.workshop_utils import ( # noqa
get_workshop_root,
get_workshop_path
)
# 导入创意工坊路由中的函数
from main_routers.workshop_router import get_subscribed_workshop_items, sync_workshop_character_cards, warmup_ugc_cache # noqa
# 确定 templates 目录位置(使用 _get_app_root)
template_dir = _get_app_root()
templates = Jinja2Templates(directory=template_dir)
def initialize_steamworks():
try:
# 明确读取steam_appid.txt文件以获取应用ID
app_id = None
app_id_file = os.path.join(_get_app_root(), 'steam_appid.txt')
if os.path.exists(app_id_file):
with open(app_id_file, 'r') as f:
app_id = f.read().strip()
print(f"从steam_appid.txt读取到应用ID: {app_id}")
# 创建并初始化Steamworks实例
from steamworks import STEAMWORKS
steamworks = STEAMWORKS()
# 显示Steamworks初始化过程的详细日志
print("正在初始化Steamworks...")
steamworks.initialize()
steamworks.UserStats.RequestCurrentStats()
# 初始化后再次获取应用ID以确认
actual_app_id = steamworks.app_id
print(f"Steamworks初始化完成,实际使用的应用ID: {actual_app_id}")
# 检查全局logger是否已初始化,如果已初始化则记录成功信息
if 'logger' in globals():
logger.info(f"Steamworks初始化成功,应用ID: {actual_app_id}")
logger.info(f"Steam客户端运行状态: {steamworks.IsSteamRunning()}")
logger.info(f"Steam覆盖层启用状态: {steamworks.IsOverlayEnabled()}")
return steamworks
except Exception as e:
# 检查全局logger是否已初始化,如果已初始化则记录错误,否则使用print
error_msg = f"初始化Steamworks失败: {e}"
if 'logger' in globals():
logger.error(error_msg)
else:
print(error_msg)
return None
def get_default_steam_info():
global steamworks
# 检查steamworks是否初始化成功
if steamworks is None:
print("Steamworks not initialized. Skipping Steam functionality.")
if 'logger' in globals():
logger.info("Steamworks not initialized. Skipping Steam functionality.")
return
try:
my_steam64 = steamworks.Users.GetSteamID()
my_steam_level = steamworks.Users.GetPlayerSteamLevel()
subscribed_apps = steamworks.Workshop.GetNumSubscribedItems()
print(f'Subscribed apps: {subscribed_apps}')
print(f'Logged on as {my_steam64}, level: {my_steam_level}')
print('Is subscribed to current app?', steamworks.Apps.IsSubscribed())
except Exception as e:
print(f"Error accessing Steamworks API: {e}")
if 'logger' in globals():
logger.error(f"Error accessing Steamworks API: {e}")
# Steamworks 初始化将在 @app.on_event("startup") 中延迟执行
# 这样可以避免在模块导入时就执行 DLL 加载等操作
steamworks = None
_server_loop: asyncio.AbstractEventLoop | None = None
_config_manager = get_config_manager()
def cleanup():
"""通知所有同步线程停止"""
logger.info("正在关闭同步线程...")
for k in sync_shutdown_event:
try:
sync_shutdown_event[k].set()
except Exception:
pass
# 只在主进程中注册 cleanup 函数,防止子进程退出时执行清理
if _IS_MAIN_PROCESS:
atexit.register(cleanup)
sync_message_queue = {}
sync_shutdown_event = {}
session_manager = {}
session_id = {}
sync_process = {}
# 每个角色的websocket操作锁,用于防止preserve/restore与cleanup()之间的竞争
websocket_locks = {}
# 角色数据全局变量(会在重载时更新)
master_name = None
her_name = None
master_basic_config = None
lanlan_basic_config = None
name_mapping = None
lanlan_prompt = None
time_store = None
setting_store = None
recent_log = None
catgirl_names = []
agent_event_bridge: MainServerAgentBridge | None = None
async def _handle_agent_event(event: dict):
"""通过 ZeroMQ 接收 agent_server 事件,并分发到 core/websocket。"""
try:
event_type = event.get("event_type")
lanlan = event.get("lanlan_name")
if event_type == "analyze_ack":
logger.info(
"[EventBus] analyze_ack received on main: event_id=%s lanlan=%s",
event.get("event_id"),
lanlan,
)
notify_analyze_ack(str(event.get("event_id") or ""))
return
# Agent status updates may be broadcast (lanlan_name omitted).
if event_type == "agent_status_update":
payload = {
"type": "agent_status_update",
"snapshot": event.get("snapshot", {}),
}
if lanlan and lanlan in session_manager:
mgr = session_manager.get(lanlan)
if mgr and mgr.websocket and hasattr(mgr.websocket, "send_json"):
try:
await mgr.websocket.send_json(payload)
except Exception:
pass
else:
for mgr in session_manager.values():
if mgr and mgr.websocket and hasattr(mgr.websocket, "send_json"):
try:
await mgr.websocket.send_json(payload)
except Exception:
pass
return
# Resolve target session manager; fallback to broadcast if lanlan is unknown
mgr = session_manager.get(lanlan) if lanlan else None
if not mgr and event_type == "task_update":
# Broadcast task_update to all connected sessions when lanlan is unresolvable
task_payload = {"type": "agent_task_update", "task": event.get("task", {})}
for _mgr in session_manager.values():
if _mgr and _mgr.websocket and hasattr(_mgr.websocket, "send_json"):
try:
await _mgr.websocket.send_json(task_payload)
except Exception:
pass
return
if not mgr and event_type in ("proactive_message", "task_result"):
# No target session found — drop the event entirely.
# Do NOT broadcast text to other sessions to prevent cross-session leaks.
logger.info("[EventBus] %s dropped: no target session for lanlan=%s", event_type, lanlan)
return
if not mgr:
logger.info("[EventBus] %s dropped: no session_manager for lanlan=%s", event_type, lanlan)
return
if event_type in ("task_result", "proactive_message"):
text = (event.get("text") or "").strip()
if text:
# Build structured callback and enqueue for LLM injection
cb_status = event.get("status") or ("completed" if event.get("success", True) else "failed")
callback = {
"event": "agent_task_callback",
"task_id": event.get("task_id") or "",
"channel": event.get("channel") or "unknown",
"status": cb_status,
"success": bool(event.get("success", True)),
"summary": event.get("summary") or text,
"detail": event.get("detail") or text,
"error_message": event.get("error_message") or "",
"timestamp": event.get("timestamp") or "",
}
mgr.enqueue_agent_callback(callback)
logger.info("[EventBus] %s enqueued callback, scheduling trigger_agent_callbacks", event_type)
mgr._pending_agent_callback_task = asyncio.create_task(mgr.trigger_agent_callbacks())
if mgr.websocket and hasattr(mgr.websocket, "send_json"):
try:
notif = {
"type": "agent_notification",
"text": text,
"source": "brain",
"status": cb_status,
}
err_msg = event.get("error_message") or ""
if err_msg:
notif["error_message"] = err_msg[:500]
await mgr.websocket.send_json(notif)
logger.info("[EventBus] agent_notification sent to frontend: %.60s", text[:60])
except Exception as e:
logger.warning("[EventBus] agent_notification WS send failed: %s", e)
else:
logger.warning("[EventBus] agent_notification: no websocket available")
elif event_type == "agent_notification":
if mgr.websocket and hasattr(mgr.websocket, "send_json"):
try:
notif = {
"type": "agent_notification",
"text": event.get("text", ""),
"source": event.get("source", "brain"),
"status": event.get("status", "error"),
}
err_msg = event.get("error_message") or ""
if err_msg:
notif["error_message"] = err_msg[:500]
await mgr.websocket.send_json(notif)
except Exception as e:
logger.debug("[EventBus] agent_notification send failed: %s", e)
elif event_type == "task_update":
if mgr.websocket and hasattr(mgr.websocket, "send_json"):
try:
await mgr.websocket.send_json({"type": "agent_task_update", "task": event.get("task", {})})
except Exception:
pass
except Exception as e:
logger.debug(f"handle_agent_event error: {e}")
async def initialize_character_data():
"""初始化或重新加载角色配置数据"""
global master_name, her_name, master_basic_config, lanlan_basic_config
global name_mapping, lanlan_prompt, time_store, setting_store, recent_log
global catgirl_names, sync_message_queue, sync_shutdown_event, session_manager, session_id, sync_process, websocket_locks
logger.info("正在加载角色配置...")
# 清理无效的voice_id引用;如果发现旧版 CosyVoice 音色,推入通知缓冲池等前端连接后弹出
_cleaned, _legacy_names = _config_manager.cleanup_invalid_voice_ids()
if _legacy_names:
core.enqueue_prominent_notice({
"code": "notice.voiceMigration.legacyRemoved",
"message": "CosyVoice 现已升级至 3.5,您的旧语音已失效,请重新克隆语音。",
"message_en": "CosyVoice has been upgraded to 3.5. Your old voices are no longer valid — please re-clone your voices.",
"details": {"voices": _legacy_names},
})
# 加载最新的角色数据
master_name, her_name, master_basic_config, lanlan_basic_config, name_mapping, lanlan_prompt, time_store, setting_store, recent_log = _config_manager.get_character_data()
catgirl_names = list(lanlan_prompt.keys())
# 为新增的角色初始化资源
for k in catgirl_names:
is_new_character = False
if k not in sync_message_queue:
sync_message_queue[k] = Queue()
sync_shutdown_event[k] = ThreadEvent()
session_id[k] = None
sync_process[k] = None
logger.info(f"为角色 {k} 初始化新资源")
is_new_character = True
# 确保该角色有websocket锁
if k not in websocket_locks:
websocket_locks[k] = asyncio.Lock()
# 更新或创建session manager(使用最新的prompt)
# 使用锁保护websocket的preserve/restore操作,防止与cleanup()竞争
async with websocket_locks[k]:
# 如果已存在且已有websocket连接,保留websocket引用
old_websocket = None
if k in session_manager and session_manager[k].websocket:
old_websocket = session_manager[k].websocket
logger.info(f"保留 {k} 的现有WebSocket连接")
# 注意:不在这里清理旧session,因为:
# 1. 切换当前角色音色时,已在API层面关闭了session
# 2. 切换其他角色音色时,已跳过重新加载
# 3. 其他场景不应该影响正在使用的session
# 如果旧session_manager有活跃session,保留它,只更新配置相关的字段
# 先检查会话状态(在锁内检查避免竞态条件)
has_active_session = k in session_manager and session_manager[k].is_active
if has_active_session:
# 有活跃session,不重新创建session_manager,只更新配置
# 这是为了防止重新创建session_manager时破坏正在运行的session
try:
old_mgr = session_manager[k]
# 更新prompt
old_mgr.lanlan_prompt = lanlan_prompt[k].replace('{LANLAN_NAME}', k).replace('{MASTER_NAME}', master_name)
# 重新读取角色配置以更新voice_id等字段
(
_,
_,
_,
lanlan_basic_config_updated,
_,
_,
_,
_,
_
) = _config_manager.get_character_data()
# 更新voice_id(这是切换音色时需要的)
old_mgr.voice_id = get_reserved(
lanlan_basic_config_updated[k],
'voice_id',
default='',
legacy_keys=('voice_id',),
)
logger.info(f"{k} 有活跃session,只更新配置,不重新创建session_manager")
except Exception as e:
logger.error(f"更新 {k} 的活跃session配置失败: {e}", exc_info=True)
# 配置更新失败,但为了不影响正在运行的session,继续使用旧配置
# 如果确实需要更新配置,可以考虑在下次session重启时再应用
else:
# 没有活跃session,可以安全地重新创建session_manager
session_manager[k] = core.LLMSessionManager(
sync_message_queue[k],
k,
lanlan_prompt[k].replace('{LANLAN_NAME}', k).replace('{MASTER_NAME}', master_name)
)
# 将websocket锁存储到session manager中,供cleanup()使用
session_manager[k].websocket_lock = websocket_locks[k]
# 恢复websocket引用(如果存在)
if old_websocket:
session_manager[k].websocket = old_websocket
logger.info(f"已恢复 {k} 的WebSocket连接")
# 检查并启动同步连接器线程
# 如果是新角色,或者线程不存在/已停止,需要启动线程
if k not in sync_process:
sync_process[k] = None
need_start_thread = False
if is_new_character:
# 新角色,需要启动线程
need_start_thread = True
elif sync_process[k] is None:
# 线程为None,需要启动
need_start_thread = True
elif hasattr(sync_process[k], 'is_alive') and not sync_process[k].is_alive():
# 线程已停止,需要重启
need_start_thread = True
try:
sync_process[k].join(timeout=0.1)
except: # noqa: E722
pass
if need_start_thread:
try:
_char_name = k
def _make_status_cb(char_name):
def _cb(msg):
mgr = session_manager.get(char_name)
if not mgr:
return
loop = _server_loop
if loop is None or loop.is_closed():
return
ws = mgr.websocket
if ws and hasattr(ws, 'client_state') and ws.client_state == ws.client_state.CONNECTED:
import json as _json
data = _json.dumps({"type": "status", "message": msg})
asyncio.run_coroutine_threadsafe(ws.send_text(data), loop)
return _cb
_status_cb = _make_status_cb(_char_name)
sync_process[k] = Thread(
target=cross_server.sync_connector_process,
args=(sync_message_queue[k], sync_shutdown_event[k], k, f"ws://127.0.0.1:{MONITOR_SERVER_PORT}", {'bullet': False, 'monitor': True, 'memory': False}, _status_cb),
daemon=True,
name=f"SyncConnector-{k}"
)
sync_process[k].start()
logger.info(f"✅ 已为角色 {k} 启动同步连接器线程 ({sync_process[k].name})")
await asyncio.sleep(0.1) # 线程启动更快,减少等待时间
if not sync_process[k].is_alive():
logger.error(f"❌ 同步连接器线程 {k} ({sync_process[k].name}) 启动后立即退出!")
else:
logger.info(f"✅ 同步连接器线程 {k} ({sync_process[k].name}) 正在运行")
except Exception as e:
logger.error(f"❌ 启动角色 {k} 的同步连接器线程失败: {e}", exc_info=True)
# 清理已删除角色的资源
removed_names = [k for k in session_manager.keys() if k not in catgirl_names]
for k in removed_names:
logger.info(f"清理已删除角色 {k} 的资源")
# 先停止同步连接器线程(线程只能协作式终止,不能强制kill)
if k in sync_process and sync_process[k] is not None:
try:
logger.info(f"正在停止已删除角色 {k} 的同步连接器线程...")
if k in sync_shutdown_event:
sync_shutdown_event[k].set()
sync_process[k].join(timeout=3) # 等待线程正常结束
if sync_process[k].is_alive():
logger.warning(f"⚠️ 同步连接器线程 {k} 未能在超时内停止,将作为daemon线程自动清理")
else:
logger.info(f"✅ 已停止角色 {k} 的同步连接器线程")
except Exception as e:
logger.warning(f"停止角色 {k} 的同步连接器线程时出错: {e}")
# 清理队列(queue.Queue 没有 close/join_thread 方法)
if k in sync_message_queue:
try:
while not sync_message_queue[k].empty():
sync_message_queue[k].get_nowait()
except: # noqa
pass
del sync_message_queue[k]
# 清理其他资源
if k in sync_shutdown_event:
del sync_shutdown_event[k]
if k in session_manager:
del session_manager[k]
if k in session_id:
del session_id[k]
if k in sync_process:
del sync_process[k]
logger.info(f"角色配置加载完成,当前角色: {catgirl_names},主人: {master_name}")
# 初始化角色数据(使用asyncio.run在模块级别执行async函数)
# 只在主进程中执行,防止 Windows 上子进程重复导入时再次启动子进程
if _IS_MAIN_PROCESS:
import asyncio as _init_asyncio
try:
_init_asyncio.get_event_loop()
except RuntimeError:
_init_asyncio.set_event_loop(_init_asyncio.new_event_loop())
_init_asyncio.get_event_loop().run_until_complete(initialize_character_data())
lock = asyncio.Lock()
# --- FastAPI App Setup ---
app = FastAPI()
class CustomStaticFiles(StaticFiles):
async def get_response(self, path, scope):
response = await super().get_response(path, scope)
if path.endswith('.js'):
response.headers['Content-Type'] = 'application/javascript'
return response
# 确定 static 目录位置(使用 _get_app_root)
static_dir = os.path.join(_get_app_root(), 'static')
app.mount("/static", CustomStaticFiles(directory=static_dir), name="static")
# 挂载用户文档下的live2d目录(只在主进程中执行,子进程不提供HTTP服务)
if _IS_MAIN_PROCESS:
_config_manager.ensure_live2d_directory()
_config_manager.ensure_vrm_directory()
_config_manager.ensure_mmd_directory()
_config_manager.ensure_chara_directory()
# CFA (反勒索防护) 感知挂载:
# 优先从原始 Documents 目录(可读)提供模型文件,
# 可写回退目录(AppData)作为辅助挂载供新导入的模型使用
_readable_live2d = _config_manager.readable_live2d_dir
_serve_live2d_path = str(_readable_live2d) if _readable_live2d else str(_config_manager.live2d_dir)
if os.path.exists(_serve_live2d_path):
app.mount("/user_live2d", CustomStaticFiles(directory=_serve_live2d_path), name="user_live2d")
logger.info(f"已挂载用户Live2D目录: {_serve_live2d_path}")
# CFA 场景:可写回退目录额外挂载,供新导入的模型使用
if _readable_live2d and str(_config_manager.live2d_dir) != _serve_live2d_path:
_writable_live2d_path = str(_config_manager.live2d_dir)
if os.path.exists(_writable_live2d_path):
app.mount("/user_live2d_local", CustomStaticFiles(directory=_writable_live2d_path), name="user_live2d_local")
logger.info(f"已挂载本地Live2D目录(CFA回退): {_writable_live2d_path}")
# 挂载VRM动画目录(static/vrm/animation) 必须第一个挂载
vrm_animation_path = str(_config_manager.vrm_animation_dir)
if os.path.exists(vrm_animation_path):
app.mount("/user_vrm/animation", CustomStaticFiles(directory=vrm_animation_path), name="user_vrm_animation")
logger.info(f"已挂载VRM动画目录: {vrm_animation_path}")
# 挂载VRM模型目录(用户文档目录)
user_vrm_path = str(_config_manager.vrm_dir)
if os.path.exists(user_vrm_path):
app.mount("/user_vrm", CustomStaticFiles(directory=user_vrm_path), name="user_vrm")
logger.info(f"已挂载VRM目录: {user_vrm_path}")
# 挂载项目目录下的static/vrm(作为备用,如果文件在项目目录中)
project_vrm_path = os.path.join(static_dir, 'vrm')
if os.path.exists(project_vrm_path) and os.path.isdir(project_vrm_path):
logger.info(f"项目VRM目录存在: {project_vrm_path} (可通过 /static/vrm/ 访问)")
# 挂载MMD动画目录(必须在MMD模型目录之前挂载)
mmd_animation_path = str(_config_manager.mmd_animation_dir)
if os.path.exists(mmd_animation_path):
app.mount("/user_mmd/animation", CustomStaticFiles(directory=mmd_animation_path), name="user_mmd_animation")
logger.info(f"已挂载MMD动画目录: {mmd_animation_path}")
# 挂载MMD模型目录(用户文档目录)
user_mmd_path = str(_config_manager.mmd_dir)
if os.path.exists(user_mmd_path):
app.mount("/user_mmd", CustomStaticFiles(directory=user_mmd_path), name="user_mmd")
logger.info(f"已挂载MMD目录: {user_mmd_path}")
# 挂载项目目录下的static/mmd(作为备用)
project_mmd_path = os.path.join(static_dir, 'mmd')
if os.path.exists(project_mmd_path) and os.path.isdir(project_mmd_path):
logger.info(f"项目MMD目录存在: {project_mmd_path} (可通过 /static/mmd/ 访问)")
# 挂载用户mod路径
user_mod_path = _config_manager.get_workshop_path()
if os.path.exists(user_mod_path) and os.path.isdir(user_mod_path):
app.mount("/user_mods", CustomStaticFiles(directory=user_mod_path), name="user_mods")
logger.info(f"已挂载用户mod路径: {user_mod_path}")
# --- 初始化共享状态并挂载路由 ---
# 从 main_routers 包导入并挂载路由
from main_routers import ( # noqa
config_router,
characters_router,
live2d_router,
vrm_router,
mmd_router,
workshop_router,
memory_router,
pages_router,
websocket_router,
agent_router,
system_router,
)
from main_routers import music_router # noqa
from main_routers.cookies_login_router import router as cookies_login_router # noqa
from main_routers.shared_state import init_shared_state # noqa
# 初始化共享状态,供各路由访问
# 注意:steamworks 会在 startup 事件中初始化后更新
if _IS_MAIN_PROCESS:
init_shared_state(
sync_message_queue=sync_message_queue,
sync_shutdown_event=sync_shutdown_event,
session_manager=session_manager,
session_id=session_id,
sync_process=sync_process,
websocket_locks=websocket_locks,
steamworks=None, # 延迟初始化,会在 startup 事件中设置
templates=templates,
config_manager=_config_manager,
logger=logger,
initialize_character_data=initialize_character_data,
)
# ── 健康检查 / 指纹端点 ──────────────────────────────────────────
@app.get("/health")
async def health():
"""返回带 N.E.K.O 签名的健康响应,供 launcher/前端识别,
以区分当前服务与随机占用该端口的其他进程。"""
from utils.port_utils import build_health_response
from config import INSTANCE_ID
return build_health_response("main", instance_id=INSTANCE_ID)
@app.post('/api/beacon/shutdown')
async def beacon_shutdown():
"""Beacon 接口:用于优雅关闭服务器"""
try:
# 从 app.state 获取配置
current_config = get_start_config()
# 仅当服务由 --open-browser 模式启动时才响应 beacon
if current_config['browser_mode_enabled']:
logger.info("收到beacon信号,准备关闭服务器...")
# 调度服务器关闭任务
asyncio.create_task(shutdown_server_async())
return {"success": True, "message": "服务器关闭信号已接收"}
except Exception as e:
logger.error(f"Beacon处理错误: {e}")
return {"success": False, "error": str(e)}
# 挂载全部路由
app.include_router(config_router)
app.include_router(characters_router)
app.include_router(live2d_router)
app.include_router(vrm_router)
app.include_router(mmd_router)
app.include_router(workshop_router)
app.include_router(memory_router)
# 注意:pages_router 含 /{lanlan_name} 兜底路由,应最后挂载
app.include_router(websocket_router)
app.include_router(agent_router)
app.include_router(system_router)
app.include_router(music_router.router)
app.include_router(cookies_login_router) # Cookies登录相关路由,放在最后以避免与其他API路由冲突
app.include_router(pages_router) # 兜底路由需最后挂载
# 后台预加载任务
_preload_task: asyncio.Task = None
async def _background_preload():
"""后台预加载音频处理模块
注意:不需要 Event 同步机制,因为 Python 的 import lock 会自动等待首次导入完成。
如果用户在预加载完成前点击语音,再次 import 会自动阻塞等待。
"""
try:
logger.info("🔄 后台预加载音频处理模块...")
# 在线程池中执行同步导入(避免阻塞事件循环)
import concurrent.futures
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
await loop.run_in_executor(pool, _sync_preload_modules)
except Exception as e:
logger.warning(f"⚠️ 音频处理模块预加载失败(不影响使用): {e}")
def _sync_preload_modules():
"""同步预加载延迟导入的模块(在线程池中执行)
注意:以下模块已通过导入链在启动时加载,无需预加载:
- numpy, soxr: 通过 core.py / audio_processor.py
- websockets: 通过 omni_realtime_client.py
- langchain_openai/langchain_core: 通过 omni_offline_client.py
- httpx: 通过 core.py
- aiohttp: 通过 tts_client.py
真正需要预加载的延迟导入模块:
- pyrnnoise/audiolab: audio_processor.py 中通过 _get_rnnoise() 延迟加载
- dashscope: tts_client.py 中仅在 cosyvoice_vc_tts_worker 函数内部导入
- googletrans/translatepy: language_utils.py 中延迟导入的翻译库
- translation_service: language_utils.py 中的翻译服务(TranslationService)
"""
import time
start = time.time()
# 1. 翻译服务相关模块(避免首轮对话延迟)
try:
# 预加载翻译库(googletrans, translatepy 等)
from utils import language_utils
# 触发翻译库的导入(如果可用)
_ = language_utils.GOOGLETRANS_AVAILABLE
_ = language_utils.TRANSLATEPY_AVAILABLE
logger.debug("✅ 翻译库预加载完成")
except Exception as e:
logger.debug(f"⚠️ 翻译库预加载失败(不影响使用): {e}")
# 2. 翻译服务实例(需要 config_manager)
try:
# 提前初始化翻译服务(如果在初始化过程中需要翻译数据)
from utils.language_utils import get_translation_service
from utils.config_manager import get_config_manager
# 此处仅调用以触发单例初始化,后续使用时通过 get_translation_service 获取即可
config_manager = get_config_manager()
# 预初始化翻译服务实例(触发 LLM 客户端创建等)
_ = get_translation_service(config_manager)
logger.debug("✅ 翻译服务预加载完成")
except Exception as e:
logger.debug(f"⚠️ 翻译服务预加载失败(不影响使用): {e}")
# 3. pyrnnoise/audiolab (音频降噪 - 延迟加载,可能较慢)
try:
from utils.audio_processor import _get_rnnoise
RNNoise = _get_rnnoise()
if RNNoise:
# 创建临时实例以预热神经网络权重加载
_warmup_instance = RNNoise(sample_rate=48000)
del _warmup_instance
logger.debug(" ✓ pyrnnoise loaded and warmed up")
else:
logger.debug(" ✗ pyrnnoise not available")
except Exception as e:
logger.debug(f" ✗ pyrnnoise: {e}")
# 4. dashscope (阿里云 CosyVoice TTS SDK - 仅在使用自定义音色时需要)
try:
import dashscope # noqa: F401
logger.debug(" ✓ dashscope loaded")
except Exception as e:
logger.debug(f" ✗ dashscope: {e}")
# 5. AudioProcessor 预热(numpy buffer + soxr resampler 初始化)
try:
from utils.audio_processor import AudioProcessor
import numpy as np
# 创建临时实例预热 numpy/soxr
_warmup_processor = AudioProcessor(
input_sample_rate=48000,
output_sample_rate=16000,
noise_reduce_enabled=False # 不需要 RNNoise,前面已预热
)
# 模拟处理一小块音频,预热 numpy 和 soxr 的 JIT
_dummy_audio = np.zeros(480, dtype=np.int16).tobytes()
_ = _warmup_processor.process_chunk(_dummy_audio)
del _warmup_processor, _dummy_audio
logger.debug(" ✓ AudioProcessor warmed up")
except Exception as e:
logger.debug(f" ✗ AudioProcessor warmup: {e}")
# 6. httpx SSL 上下文预热(首次创建 AsyncClient 会初始化 SSL)
try:
import httpx
import asyncio
async def _warmup_httpx():
async with httpx.AsyncClient(timeout=1.0, proxy=None, trust_env=False) as client:
# 发送一个简单请求预热 SSL 上下文
try:
await client.get("http://127.0.0.1:1", timeout=0.01)
except: # noqa: E722
pass # 预期会失败,只是为了初始化 SSL
# 在当前线程的事件循环中运行(如果没有则创建临时循环)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果已有运行中的循环,使用线程池
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
pool.submit(lambda: asyncio.run(_warmup_httpx())).result(timeout=2.0)
else:
loop.run_until_complete(_warmup_httpx())
except RuntimeError:
asyncio.run(_warmup_httpx())
logger.debug(" ✓ httpx SSL context warmed up")
except Exception as e:
logger.debug(f" ✗ httpx warmup: {e}")
elapsed = time.time() - start
logger.info(f"📦 模块预加载完成,耗时 {elapsed:.2f}s")
# Startup 事件:延迟初始化 Steamworks 和全局语言
@app.on_event("startup")
async def on_startup():
"""服务器启动时执行的初始化操作"""
if _IS_MAIN_PROCESS:
global steamworks, _preload_task, agent_event_bridge, _server_loop
_server_loop = asyncio.get_running_loop()
logger.info("正在初始化 Steamworks...")
steamworks = initialize_steamworks()
# 更新 shared_state 中的 steamworks 引用
from main_routers.shared_state import set_steamworks
set_steamworks(steamworks)
# 尝试获取 Steam 信息
get_default_steam_info()
# 在后台异步预加载音频模块(不阻塞服务器启动)
# 注意:不需要等待机制,Python import lock 会自动处理并发
_preload_task = asyncio.create_task(_background_preload())
# 启动 agent_server <-> main_server 的 ZeroMQ 事件桥接
try:
agent_event_bridge = MainServerAgentBridge(on_agent_event=_handle_agent_event)
await agent_event_bridge.start()
set_main_bridge(agent_event_bridge)
except Exception as e:
logger.warning(f"Agent event bridge startup failed: {e}")
await _init_and_mount_workshop()
# 后台预热 UGC 缓存 + 同步角色卡(分别独立任务,互不阻塞)
if steamworks:
import main_routers.workshop_router as _wr
async def _warmup_only():
"""仅预热 UGC 缓存"""
try:
await warmup_ugc_cache()
except Exception as e:
logger.warning(f"UGC 缓存预热失败: {e}")
async def _sync_characters_only():
"""等待预热完成后同步角色卡"""
# 先等预热完成,角色卡同步依赖订阅物品列表
if _wr._ugc_warmup_task is not None:
try:
await asyncio.wait_for(asyncio.shield(_wr._ugc_warmup_task), timeout=20)
except asyncio.TimeoutError:
logger.warning("等待 UGC 预热任务超时(20s),继续角色卡同步")
except Exception as e:
logger.debug(f"等待 UGC 预热任务时异常(不影响角色卡同步): {e}")
try:
sync_result = await sync_workshop_character_cards()
if sync_result["added"] > 0:
logger.info(f"✅ 创意工坊角色卡同步完成:新增 {sync_result['added']} 个,跳过 {sync_result['skipped']} 个")
else:
logger.info("创意工坊角色卡同步完成:无新增角色卡")
except Exception as e:
logger.warning(f"创意工坊角色卡同步失败(不影响启动): {e}")
# _ugc_warmup_task 仅引用预热任务,等待它不会被角色卡同步阻塞
_wr._ugc_warmup_task = asyncio.create_task(_warmup_only())
_wr._ugc_sync_task = asyncio.create_task(_sync_characters_only())
# 初始化全局 LLM Token 用量追踪器
try:
from utils.token_tracker import TokenTracker, install_hooks
install_hooks()
TokenTracker.get_instance().start_periodic_save()
TokenTracker.get_instance().record_app_start()
logger.info("Token usage tracker initialized")
except Exception as e:
logger.warning(f"Token tracker initialization failed (non-critical): {e}")
logger.info("Startup 初始化完成,后台正在预加载音频模块...")
# 初始化全局语言变量(优先级:Steam设置 > 系统设置)
try:
from utils.language_utils import initialize_global_language
global_lang = initialize_global_language()
logger.info(f"全局语言初始化完成: {global_lang}")
except Exception as e:
logger.warning(f"全局语言初始化失败: {e},将使用默认值")
@app.on_event("shutdown")
async def on_shutdown():
"""服务器关闭时清理资源"""
if _IS_MAIN_PROCESS:
logger.info("正在清理资源...")
# 等待预加载任务完成(如果还在运行)
global _preload_task, agent_event_bridge
if _preload_task:
try:
await asyncio.wait_for(_preload_task, timeout=1.0)
except asyncio.TimeoutError:
_preload_task.cancel()
try:
await _preload_task
except asyncio.CancelledError:
logger.debug("预加载任务清理时超时并已取消(正常关闭流程)")
except asyncio.CancelledError:
logger.debug("预加载任务清理时已取消(正常关闭流程)")
except Exception as e:
logger.debug(f"预加载任务清理时出错(正常关闭流程): {e}", exc_info=True)
# Clean up agent_event_bridge (ZMQ context/sockets/recv thread)
if agent_event_bridge is not None:
try:
await agent_event_bridge.stop()
except Exception as e:
logger.debug(f"Agent event bridge cleanup failed: {e}", exc_info=True)
# 释放 soxr ResampleStream(nanobind C 扩展),避免解释器退出时泄漏警告
try:
for mgr in session_manager.values():
if hasattr(mgr, 'audio_resampler'):
mgr.audio_resampler = None
except Exception as e:
logger.debug(f"soxr resampler cleanup failed: {e}")
# 关闭翻译服务
try:
from utils import language_utils
close_fn = getattr(language_utils, "aclose_translation_service", None)
if callable(close_fn):
await close_fn()
else:
logger.debug("Translation service cleanup skipped: function not implemented")
except Exception as e:
logger.debug(f"Translation service cleanup failed: {e}")
# 保存 Token 用量数据
try:
from utils.token_tracker import TokenTracker
TokenTracker.get_instance().save()
except Exception as e:
logger.debug(f"Token usage save on shutdown failed: {e}")
# 关闭音乐爬虫连接池
try:
from utils.music_crawlers import close_all_crawlers
# 【核心修改】增加 1 秒超时兜底。如果 1 秒内关不完,直接抛弃,保障服务器顺利退出
await asyncio.wait_for(close_all_crawlers(), timeout=1.0)
except asyncio.TimeoutError:
# 单独捕获超时异常,记录警告但放行
logger.warning("音乐爬虫连接池清理超时,已强制跳过以保证服务正常退出。")
except Exception as e:
logger.debug(f"音乐爬虫清理失败: {e}", exc_info=True)
# 使用 FastAPI 的 app.state 来管理启动配置
def get_start_config():
"""从 app.state 获取启动配置"""
if hasattr(app.state, 'start_config'):
return app.state.start_config
return {
"browser_mode_enabled": False,
"browser_page": "chara_manager",
'server': None
}
def set_start_config(config):
"""设置启动配置到 app.state"""
app.state.start_config = config
async def _init_and_mount_workshop():
"""
初始化并挂载创意工坊目录
设计原则:
- main 层只负责调用,不维护状态
- 路径由 utils 层计算并持久化到 config 层
- 其他代码需要路径时调用 get_workshop_path() 获取
"""
try:
# 1. 获取订阅的创意工坊物品列表
workshop_items_result = await get_subscribed_workshop_items()
# 2. 提取物品列表传给 utils 层
subscribed_items = []