-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathmain.py
More file actions
417 lines (363 loc) · 13.4 KB
/
main.py
File metadata and controls
417 lines (363 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import os
from pathlib import Path
import sys
# 打包后须在任何会触发 ConfigManager 的 import 之前设发行根 cwd(同 webui_qt)
if getattr(sys, "frozen", False):
try:
_rel = Path(sys.executable).resolve().parent.parent
os.environ["EASYAI_PROJECT_ROOT"] = str(_rel)
os.chdir(_rel)
except OSError:
pass
current_script = Path(__file__).resolve()
project_root = current_script.parent
if str(project_root) not in sys.path:
sys.path.append(str(project_root))
if getattr(sys, "frozen", False):
from core.bootstrap.frozen_log import init_frozen_stdio
init_frozen_stdio("main")
import llm.tools.character_tools
import llm.tools.memory_tools
import llm.tools.tool_search
import llm.tools.file_tools
from llm.template_generator import is_transparent_background
from llm.llm_manager import LLMManager, LLMAdapterFactory
from llm.text_processor import TextProcessor
from core.runtime.app_runtime import AppRuntime, set_app_runtime
from core.runtime.workflow import build_runtime_workflow, get_chat_workflow_handles
from tts.tts_manager import TTSManager, TTSAdapterFactory
from config.config_manager import ConfigManager
from t2i.t2i_manager import T2IAdapterFactory, T2IManager
import pygame
import traceback
from opencc import OpenCC
from queue import Queue
from core.sprite.chat_history import (
chat_history,
get_history,
load_chat_history,
save_bg,
save_chat_history,
)
from core.sprite.chat_ui_service import (
install_chat_ui_context,
restore_session_ui,
wire_chat_ui_bridge,
)
from core.sprite.sprite_cli import parse_sprite_args
try:
from live.danmuku_handler import start_bilibili_service
except ImportError as e:
# 早于 init_i18n,不调用 tr
# print("Bilibili import failed:", e)
pass
voice_lang = "ja"
cc = OpenCC("t2s") # 繁体到简体转换器
def _shutdown_plugins() -> None:
try:
from core.plugins.plugin_host import get_plugin_manager
mgr = get_plugin_manager()
if mgr is not None:
mgr.shutdown_all()
except Exception:
pass
def _mask_secret(value: str) -> str:
if not value:
return ""
if len(value) <= 8:
return "***"
return f"{value[:4]}...{value[-4:]}"
def main():
config = ConfigManager()
from i18n import init_i18n, tr as tr_i18n, tr_in_bundle
from asr.asr_adapter import system_config_to_asr_lang
init_i18n(config.config.system_config.ui_language)
from core.plugins.plugin_host import ensure_plugins_loaded, wire_user_input_plugins
ensure_plugins_loaded(config)
args = parse_sprite_args(tr_i18n)
# T2I manager
t2i_manager = None
if args.t2i:
raw = (args.t2i or "").strip()
adapter_pick = (
(config.config.api_config.t2i_provider or "comfyui").strip()
if raw.lower() == "comfyui"
else raw
)
try:
t2i_adapter = T2IAdapterFactory.create_adapter(
adapter_name=adapter_pick,
**config.merged_t2i_factory_kwargs(
adapter_pick,
{
"work_path": config.config.api_config.t2i_work_path,
"api_url": config.config.api_config.t2i_api_url,
"workflow_path": config.config.api_config.t2i_default_workflow_path,
"prompt_node_id": config.config.api_config.t2i_prompt_node_id,
"output_node_id": config.config.api_config.t2i_output_node_id,
},
),
)
t2i_manager = T2IManager(t2i_adapter)
except Exception as e:
print(tr_i18n("main.print_t2i_fail", e=str(e)))
traceback.print_exc()
# TTS:仅当 API 中语音引擎不是「不使用」时加载;命令行 --tts 可覆盖引擎名(与 api.yaml 一致)
gsv_url, gsv_api_path, config_tts_provider = config.get_gpt_sovits_config()
adapter_name = (args.tts or "").strip() or config_tts_provider
tts_manager = None
if adapter_name and str(adapter_name).strip().lower() not in ("none",):
try:
adapter = TTSAdapterFactory.create_adapter(
adapter_name=adapter_name,
**config.merged_tts_factory_kwargs(
adapter_name,
{
"gpt_sovits_work_path": gsv_api_path,
"tts_server_url": gsv_url,
},
),
)
tts_manager = TTSManager(tts_server_url=gsv_url)
tts_manager.set_tts_adapter(adapter=adapter)
_voice_lang = str(config.config.system_config.voice_language or "ja").strip() or "ja"
tts_manager.set_language(_voice_lang)
except Exception as e:
print(tr_i18n("main.print_tts_fail", e=str(e)))
traceback.print_exc()
print(tr_i18n("main.print_load_template", a=args))
messages = []
if args.history:
print(tr_i18n("main.print_load_history", path=args.history))
messages = load_chat_history(args.history)
user_template = ""
with open(
f"./data/character_templates/{args.template}.txt", "r", encoding="utf-8"
) as f:
user_template = f.read()
# Init LLMManager before UI, so that handlers can access it via get_app_runtime().llm_manager
llm_provider, llm_model, base_url, api_key = config.get_llm_api_config()
print(llm_provider, llm_model, base_url, _mask_secret(api_key))
if not llm_provider:
print(tr_i18n("main.err_select_llm"))
return
llm_adapter = LLMAdapterFactory.create_adapter(
**config.merged_llm_factory_kwargs(
llm_provider,
{
"llm_provider": llm_provider,
"api_key": api_key,
"base_url": base_url,
"model": llm_model,
},
)
)
llm_manager = LLMManager(
adapter=llm_adapter,
user_template=user_template,
max_tokens=int(config.config.api_config.max_context_tokens),
generation_config={
"temperature": float(config.config.api_config.temperature),
"repetition_penalty": float(config.config.api_config.repetition_penalty),
"presence_penalty": float(config.config.api_config.presence_penalty),
"frequency_penalty": float(config.config.api_config.frequency_penalty),
"max_tokens": 4096,
},
)
if messages:
llm_manager.set_messages(messages)
# Legacy flow
image_queue = Queue()
emotion_queue = Queue()
if not args.headless:
pygame.mixer.init()
text_processor = TextProcessor()
# 将角色读音映射注入 text_processor.name_map
for _char in config.config.characters:
_pm = getattr(_char, "pronunciation_map", None)
if _pm:
from llm.text_processor import name_map
name_map.update(_pm)
# 获取背景组
bg_group = None
try:
bg_group = (
None
if is_transparent_background(args.bg)
else config.get_background_by_name(args.bg).sprites
)
except Exception:
pass
bgm_list = []
try:
bgm_list = (
[]
if is_transparent_background(args.bg)
else config.get_background_by_name(args.bg).bgm_list
)
except Exception:
pass
if args.headless and not (args.workflow or "").strip():
headless_workflow = "assets/system/workflow/headless.yaml"
else:
headless_workflow = None
workflow = build_runtime_workflow(
workflow_path=args.workflow or headless_workflow,
queue_factory=Queue,
)
chat_handles = get_chat_workflow_handles(workflow)
user_input_queue = chat_handles.input_queue
audio_path_queue = chat_handles.audio_queue
tts_queue = chat_handles.tts_queue
_um = chat_handles.ui_worker
if args.headless:
from core.runtime.ui_update_manager import HeadlessUIUpdateManager
ui_updates = HeadlessUIUpdateManager(chat_history=chat_history)
set_app_runtime(
AppRuntime(
config=config,
ui_update_manager=ui_updates,
llm_manager=llm_manager,
tts_manager=tts_manager,
t2i_manager=t2i_manager,
bgm_list=bgm_list,
user_input_queue=user_input_queue,
tts_queue=tts_queue,
audio_path_queue=audio_path_queue,
text_processor=text_processor,
opencc=cc,
)
)
workflow.start()
print(f"Workflow started: {args.workflow or 'default'}")
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
workflow.stop()
_shutdown_plugins()
if tts_manager:
tts_manager.shutdown()
save_chat_history(args.history, llm_manager.get_messages())
return
# Init UI and connect to runtime
from core.runtime.ui_update_manager import UIUpdateManager, connect_to_desktop_window
from PySide6.QtGui import QIcon
from PySide6.QtWidgets import QApplication
from ui.chat_ui.chat_ui import ChatUIWindow
from ui.chat_ui.qss_fusion import ensure_fusion_style
app = QApplication([])
ensure_fusion_style(app)
ui_updates = UIUpdateManager(chat_history=chat_history, bg_group=bg_group or [])
window = ChatUIWindow(
image_queue,
emotion_queue,
llm_manager,
sprite_mode=True,
background_mode=(bg_group is not None),
)
connect_to_desktop_window(ui_updates, window)
set_app_runtime(
AppRuntime(
config=config,
ui_update_manager=ui_updates,
llm_manager=llm_manager,
tts_manager=tts_manager,
t2i_manager=t2i_manager,
bgm_list=bgm_list,
user_input_queue=user_input_queue,
tts_queue=tts_queue,
audio_path_queue=audio_path_queue,
text_processor=text_processor,
opencc=cc,
)
)
workflow.start()
init_sprite_path = args.init_sprite_path
print(init_sprite_path)
if not init_sprite_path:
init_sprite_path = "./assets/system/picture/shinsekai.png"
if system_config_to_asr_lang(config.config.system_config) == "zh":
_welcome_html = tr_in_bundle("main.welcome_html", "zh_CN")
_option_start = tr_in_bundle("main.option_start", "zh_CN")
else:
_welcome_html = tr_i18n("main.welcome_html")
_option_start = tr_i18n("main.option_start")
# 更新初始立绘(已从文件恢复会话时不要先刷欢迎语,否则会 hide 选项区并与恢复队列竞争)
try:
if not messages:
window.setDisplayWords(_welcome_html)
if len(get_history()) <= 1:
window.setOptions([_option_start])
except Exception:
if not messages:
window.setDisplayWords(_welcome_html)
window.setNotification(tr_i18n("main.notify_chat"))
if user_input_queue is not None:
emit_user_text = wire_user_input_plugins(user_input_queue)
else:
emit_user_text = None
# Update system_config with current session's bg/bgm so restore doesn't use stale values
sc = config.config.system_config.model_copy(deep=True)
if bg_group:
sc.bgm_path = bgm_list[0] if bgm_list else ""
sc.background_path = bg_group[0].get("path", "") if bg_group else ""
else:
sc.bgm_path = ""
sc.background_path = ""
config.config.system_config = sc
config.save_system_config()
chat_ui_ctx = install_chat_ui_context(window, emit_user_text=emit_user_text)
if audio_path_queue is not None:
restore_session_ui(
messages,
audio_path_queue=audio_path_queue,
window=window,
config=config,
tr_i18n=tr_i18n,
)
wire_chat_ui_bridge(
chat_ui_ctx,
window=window,
app=app,
emit_user_text=emit_user_text,
chat_history=chat_history,
history_file=args.history,
llm_manager=llm_manager,
audio_path_queue=audio_path_queue,
tts_manager=tts_manager,
ui_worker=_um,
tr_i18n=tr_i18n,
)
if args.room_id:
print(tr_i18n("main.print_bili_start", id=args.room_id))
if user_input_queue is not None:
try:
start_bilibili_service(args.room_id, user_input_queue=user_input_queue)
except ImportError as e:
# print(tr_i18n("main.print_bili_import", e=str(e)))
pass
# 确保在程序退出时停止所有线程
try:
appIcon = QIcon("./assets/system/picture/icon.png")
app.setWindowIcon(appIcon)
except Exception as e:
print(tr_i18n("main.print_icon_fail", e=str(e)))
# 关闭顺序:插件 → TTS 服务器 → Worker 线程 → 保存数据
app.aboutToQuit.connect(workflow.stop)
app.aboutToQuit.connect(_shutdown_plugins)
app.aboutToQuit.connect(lambda: tts_manager and tts_manager.shutdown())
app.aboutToQuit.connect(lambda: save_chat_history(args.history, llm_manager.get_messages()))
app.aboutToQuit.connect(
lambda: save_bg(
bg_path=window.current_background_path,
bgm_path=ui_updates.current_bgm_path,
)
)
window.show()
app.exec()
if __name__ == "__main__":
main()