-
Notifications
You must be signed in to change notification settings - Fork 159
Expand file tree
/
Copy pathmemory_server.py
More file actions
4457 lines (3949 loc) · 216 KB
/
memory_server.py
File metadata and controls
4457 lines (3949 loc) · 216 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import sys
import os
_repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
# Wire DI bindings explicitly — direct script invocation
# (``python app/memory_server.py``) doesn't run app/__init__.py.
# Idempotent under launcher's ``from app import memory_server`` path too.
from app.runtime_bindings import install_runtime_bindings as _install_runtime_bindings
_install_runtime_bindings()
from memory import (
CompressedRecentHistoryManager, ImportantSettingsManager, TimeIndexedMemory,
FactStore, PersonaManager, ReflectionEngine,
)
from memory.cursors import CursorStore, CURSOR_REBUTTAL_CHECKED_UNTIL
from memory.facts import FactExtractionFailed
from memory.event_log import (
EventLog, Reconciler,
EVIDENCE_SOURCE_USER_CONFIRM,
EVIDENCE_SOURCE_USER_FACT,
EVIDENCE_SOURCE_USER_IGNORE,
EVIDENCE_SOURCE_USER_KEYWORD_REBUT,
EVIDENCE_SOURCE_USER_REBUT,
EVIDENCE_SOURCE_MIGRATION_SEED,
)
from memory.evidence_handlers import register_evidence_handlers as _register_evidence_handlers
from memory.outbox import Outbox, OP_POST_TURN_SIGNALS
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import PlainTextResponse, JSONResponse
import json
import uvicorn
from utils.llm_client import convert_to_messages
from uuid import uuid4
from config import (
EVIDENCE_ARCHIVE_DAYS,
EVIDENCE_ARCHIVE_SWEEP_INTERVAL_SECONDS,
EVIDENCE_NEGATIVE_TARGET_MODEL_TIER,
EVIDENCE_SIGNAL_CHECK_ENABLED,
EVIDENCE_SIGNAL_CHECK_EVERY_N_TURNS,
EVIDENCE_SIGNAL_CHECK_IDLE_MINUTES,
EVIDENCE_SIGNAL_CHECK_INTERVAL_SECONDS,
EVIDENCE_AI_AWARE_EVERY_N_A_TICKS,
MAX_AI_AWARE_WINDOW_MSGS,
MAX_KNOWN_POOL_FACTS,
MEMORY_REFLECTION_SYNTHESIS_INTERVAL_SECONDS,
IGNORED_REINFORCEMENT_DELTA,
MEMORY_RECHECK_ENABLED,
MEMORY_RECHECK_INITIAL_DELAY_SECONDS,
MEMORY_REFINE_CRON_INTERVAL_SECONDS,
MEMORY_RECHECK_INTERVAL_SECONDS,
MEMORY_SERVER_PORT,
USER_CONFIRM_DELTA,
USER_FACT_NEGATE_DELTA,
USER_FACT_REINFORCE_DELTA,
USER_KEYWORD_REBUT_DELTA,
USER_REBUT_DELTA,
)
from config.prompts.prompts_sys import _loc
from config.prompts.prompts_memory import (
INNER_THOUGHTS_HEADER, INNER_THOUGHTS_BODY,
CHAT_GAP_NOTICE, CHAT_GAP_LONG_HINT, CHAT_GAP_CURRENT_TIME,
CHAT_HOLIDAY_CONTEXT,
MEMORY_RECALL_HEADER, MEMORY_RESULTS_HEADER,
PERSONA_HEADER, INNER_THOUGHTS_DYNAMIC,
RECENT_HISTORY_INTRO, NO_RECENT_HISTORY,
)
# Negative-intent prompts/scanner 已迁到 ``prompts_directives``(与 ban-topic
# regex 同源——同是"用户负面 / 回避指令"的语义层)。``prompts_memory`` 保留
# fact/persona/reflection/summary 等纯 memory-业务 prompt。
from config.prompts.prompts_directives import (
get_negative_target_check_prompt,
scan_negative_keywords,
)
from utils.language_utils import get_global_language
from utils.character_name import validate_character_name
from utils.cloudsave_runtime import (
MaintenanceModeError,
ROOT_MODE_NORMAL,
bootstrap_local_cloudsave_environment,
maintenance_error_payload,
set_root_mode,
should_write_root_mode_normal_after_startup,
)
from utils.config_manager import get_config_manager
from utils.storage_location_bootstrap import get_storage_startup_blocking_reason
from pydantic import BaseModel
import re
import asyncio
import logging
import argparse
from datetime import datetime, timedelta, timezone
from typing import Awaitable, Callable
from utils.frontend_utils import get_timestamp
# 配置日志
from utils.logger_config import setup_logging
logger, log_config = setup_logging(service_name="Memory", log_level=logging.INFO)
from utils.time_format import format_elapsed as _format_elapsed
class HistoryRequest(BaseModel):
input_history: str
class ContinueStorageStartupRequest(BaseModel):
reason: str = ""
app = FastAPI()
_STORAGE_LIMITED_MODE_ALLOWED_PATHS = {
"/health",
"/shutdown",
"/internal/storage/startup/continue",
"/internal/storage/startup/block",
}
@app.middleware("http")
async def storage_limited_mode_guard(request: Request, call_next):
if _memory_runtime_init_completed and not _memory_storage_blocked_after_init:
return await call_next(request)
if request.url.path in _STORAGE_LIMITED_MODE_ALLOWED_PATHS:
return await call_next(request)
blocking_reason = get_storage_startup_blocking_reason(_config_manager)
if blocking_reason or _memory_storage_blocked_after_init:
blocking_reason = blocking_reason or "storage_startup_blocked_after_init"
logger.info(
"[Memory] limited-mode blocks request path=%s reason=%s",
request.url.path,
blocking_reason,
)
return JSONResponse(
status_code=409,
content={
"ok": False,
"error_code": "storage_startup_blocked",
"blocking_reason": blocking_reason,
"limited_mode": True,
"error": "Memory server 正处于存储受限启动状态,请等待存储位置选择、迁移或恢复完成。",
},
)
runtime_blocking_reason = "runtime_initializing"
logger.info(
"[Memory] limited-mode blocks request path=%s reason=%s",
request.url.path,
runtime_blocking_reason,
)
return JSONResponse(
status_code=409,
content={
"ok": False,
"error_code": "storage_startup_blocked",
"blocking_reason": runtime_blocking_reason,
"limited_mode": True,
"error": "Memory server 正处于存储受限启动状态,请等待存储位置选择、迁移或恢复完成。",
},
)
@app.exception_handler(MaintenanceModeError)
async def handle_maintenance_mode_error(_request, exc: MaintenanceModeError):
return JSONResponse(status_code=409, content=maintenance_error_payload(exc))
# ── 健康检查 / 指纹端点 ──────────────────────────────────────────
@app.get("/health")
async def health():
"""返回带 N.E.K.O 签名的健康响应,供 launcher/前端识别,
以区分当前服务与随机占用该端口的其他进程。"""
from utils.port_utils import build_health_response
from config import INSTANCE_ID
return build_health_response("memory", instance_id=INSTANCE_ID)
def validate_lanlan_name(name: str) -> str:
result = validate_character_name(name, allow_dots=True, max_length=50)
if result.code in {"empty", "too_long_length"}:
raise HTTPException(status_code=400, detail="Invalid lanlan_name length")
if result.code is not None:
raise HTTPException(status_code=400, detail="Invalid characters in lanlan_name")
return result.normalized
# 所有依赖 cloudsave 目录结构的初始化都推迟到 startup 钩子(见 startup_event_handler):
# 1. bootstrap_local_cloudsave_environment 在磁盘满/只读 FS 等场景会 raise OSError,
# 裸调会让 module import 阶段就崩,FastAPI 根本起不来;
# 2. bootstrap 内部的 import_legacy_runtime_root_if_needed 可能把 legacy 扁平布局的
# memory/{type}_{name}.ext 文件带进 target root,必须在 migrate_to_character_dirs
# 之前跑(不然 legacy 数据留在扁平布局、components 只认 per-character 布局,数据不可达);
# 3. 因此 bootstrap → migrate → 组件实例化 三步必须保持顺序且都放在 startup 里。
# Components 先声明为 None,startup hook 赋值。FastAPI 在 startup 钩子 await 完成后
# 才开始接请求,所以 route handler 不会看到 None。
_config_manager = get_config_manager()
recent_history_manager: CompressedRecentHistoryManager | None = None
settings_manager: ImportantSettingsManager | None = None
time_manager: TimeIndexedMemory | None = None
fact_store: FactStore | None = None
persona_manager: PersonaManager | None = None
reflection_engine: ReflectionEngine | None = None
cursor_store: CursorStore | None = None
outbox: Outbox | None = None
# memory-evidence-rfc §3.3 基础设施:EventLog + Reconciler 单例。
# 初始化时机同 persona_manager 等——startup hook 里建,reload 时重建。
event_log: EventLog | None = None
reconciler: Reconciler | None = None
# memory-enhancements P2: vector embedding warmup + backfill worker.
# Lazily constructed in startup hook; held at module scope so
# /process / /renew handlers can call notify_first_process() to
# unblock the warmup wait early. None when vectors are disabled or
# the worker bootstrap raised.
embedding_warmup_worker = None
# memory-enhancements P2: fact vector dedup resolver. Shares the
# FactStore with the embedding worker (worker enqueues candidates,
# the idle-maintenance loop resolves them). None when bootstrap
# fails or the embedding service is permanently disabled.
fact_dedup_resolver = None
# 用于保护重新加载操作的锁
_reload_lock = asyncio.Lock()
_deferred_time_managers: list[TimeIndexedMemory] = []
_memory_runtime_init_lock = asyncio.Lock()
_memory_runtime_init_completed = False
_memory_storage_blocked_after_init = False
_memory_background_tasks_started = False
def _defer_time_manager_cleanup(manager: TimeIndexedMemory | None) -> None:
"""将旧的 TimeIndexedMemory 延迟到进程关闭时再清理,避免切换窗口内并发请求触发已释放句柄。"""
if manager is None:
return
if any(existing is manager for existing in _deferred_time_managers):
return
_deferred_time_managers.append(manager)
logger.info("[MemoryServer] 旧的 TimeIndexedMemory 已加入延迟清理队列")
async def reload_memory_components():
"""重新加载记忆组件配置(用于新角色创建后)
使用锁保护重新加载操作,确保原子性交换,避免竞态条件。
先创建所有新实例,然后原子性地交换引用。
注意:reload 期间旧 cursor_store 已启动的 async 任务可能与新实例并发
读写同一份 cursors.json。整个架构假设"per-character 单写者",重载是
管理员操作(角色新增),不会与后台 rebuttal_loop 高频冲突;
atomic_write_json 保证单次写原子,极端 last-writer-wins 场景下最多
损失一次 cursor 推进——下一轮 tick 即恢复。
"""
global recent_history_manager, settings_manager, time_manager, fact_store, persona_manager, reflection_engine, cursor_store, outbox, event_log, reconciler, fact_dedup_resolver
async with _reload_lock:
logger.info("[MemoryServer] 开始重新加载记忆组件配置...")
old_time_manager = time_manager
try:
# 先创建所有新实例
new_recent = CompressedRecentHistoryManager()
new_settings = ImportantSettingsManager()
new_time = TimeIndexedMemory(new_recent)
new_facts = FactStore(time_indexed_memory=new_time)
# EventLog 复用(per-character lock dict 没有必要跨 reload 丢弃),
# 但每次 reload 重建 Reconciler 以便 handlers 指向新 manager 实例。
new_event_log = event_log if event_log is not None else EventLog()
new_persona = PersonaManager(event_log=new_event_log)
new_reflection = ReflectionEngine(new_facts, new_persona, event_log=new_event_log)
new_cursor_store = CursorStore()
new_outbox = Outbox()
new_reconciler = Reconciler(new_event_log)
_register_evidence_handlers(new_reconciler, new_persona, new_reflection)
# P2 step 2: rebind the existing fact_dedup_resolver to the
# NEW FactStore in place rather than constructing a new
# resolver. Going via rebind_fact_store preserves the
# per-character ``_alocks`` dict, so a mid-reload
# ``aresolve`` still in flight on the old instance and a
# fresh ``aenqueue_candidates`` arriving on the new
# instance serialise on the same asyncio.Lock (CodeRabbit
# PR-956 Major; Codex PR-957 P2). Falls back to fresh
# construction only if there was no prior resolver
# (extremely cold-path during reload — startup never ran).
try:
from memory.fact_dedup import FactDedupResolver
if fact_dedup_resolver is not None:
fact_dedup_resolver.rebind_fact_store(new_facts)
new_fact_dedup_resolver = fact_dedup_resolver
else:
new_fact_dedup_resolver = FactDedupResolver(new_facts)
except Exception as e:
logger.warning(f"[MemoryServer] reload: fact_dedup_resolver 重建失败: {e}")
new_fact_dedup_resolver = None
# 然后原子性地交换引用
recent_history_manager = new_recent
settings_manager = new_settings
time_manager = new_time
fact_store = new_facts
persona_manager = new_persona
reflection_engine = new_reflection
cursor_store = new_cursor_store
outbox = new_outbox
event_log = new_event_log
reconciler = new_reconciler
fact_dedup_resolver = new_fact_dedup_resolver
if old_time_manager is not None and old_time_manager is not new_time:
_defer_time_manager_cleanup(old_time_manager)
logger.info("[MemoryServer] ✅ 记忆组件配置重新加载完成")
return True
except Exception as e:
logger.error(f"[MemoryServer] ❌ 重新加载记忆组件配置失败: {e}", exc_info=True)
return False
@app.post("/release_character/{lanlan_name}")
async def release_character_resources(lanlan_name: str):
"""在角色重命名/删除前主动释放对应 SQLite 句柄。"""
try:
lanlan_name = validate_lanlan_name(lanlan_name)
except HTTPException as exc:
logger.warning("[MemoryServer] 拒绝释放非法角色名的 SQLite 引擎: %s", lanlan_name)
return JSONResponse(
{"status": "error", "character_name": lanlan_name, "message": str(exc.detail)},
status_code=exc.status_code,
)
async with _reload_lock:
try:
time_manager.dispose_engine(lanlan_name)
logger.info("[MemoryServer] 已主动释放角色 %s 的 SQLite 引擎", lanlan_name)
return {"status": "success", "character_name": lanlan_name}
except Exception as exc:
logger.warning("[MemoryServer] 释放角色 %s 的 SQLite 引擎失败: %s", lanlan_name, exc)
return JSONResponse(
{"status": "error", "character_name": lanlan_name, "message": str(exc)},
status_code=500,
)
# 全局变量用于控制服务器关闭
shutdown_event = asyncio.Event()
# 全局变量控制是否响应退出请求
enable_shutdown = False
# 全局变量用于管理correction任务
correction_tasks = {} # {lanlan_name: asyncio.Task}
correction_cancel_flags = {} # {lanlan_name: asyncio.Event}
# Phase C: 防 spawn 竞态——/process /renew /settle / IdleMaint 都共用 maybe_spawn_review,
# 多入口同时进 gate 检查会有 in-flight check → spawn 之间的 await 窗口;用 per-name lock
# 串行化 gate+spawn 这一段,确保同名角色至多一个 review 在跑。
_review_spawn_locks: dict[str, asyncio.Lock] = {}
# 每角色结算锁:首轮摘要期间阻塞 /new_dialog,确保热切换后读到最新数据
_settle_locks: dict[str, asyncio.Lock] = {}
# 强引用注册表:防止 fire-and-forget task 被 GC
_BACKGROUND_TASKS: set[asyncio.Task] = set()
# /new_dialog QPS 观测:每角色累计调用次数,由 _periodic_new_dialog_qps_log_loop
# 每 NEW_DIALOG_QPS_FLUSH_INTERVAL 秒打一行 INFO 日志后清零。用于 A 之后观测
# proactive_chat 路径是否成为 memory_server 真正的负载来源;如不是,则不必再
# 上 main_server 端缓存(C+ 方案)。
_new_dialog_qps_counter: dict[str, int] = {}
NEW_DIALOG_QPS_FLUSH_INTERVAL = 60
# ── 空闲维护相关 ────────────────────────────────────────────────────
_last_activity_time: datetime = datetime.now() # 最后一次对话活动时间
IDLE_CHECK_INTERVAL = 40 # 空闲检查轮询间隔(秒)
IDLE_THRESHOLD = 10 # 多少秒无活动视为空闲(匹配最低 proactive 间隔)
REVIEW_MIN_INTERVAL = 60 # review 最短间隔(秒)。配合消息门双重限流
REVIEW_SKIP_HISTORY_LEN = 8 # 历史不足此数的角色跳过 review
MIN_NEW_MSGS_FOR_REVIEW = 5 # 自上次 review cutoff 起累积 ≥ N 条 user msg 才允许触发新一轮
LONG_IDLE_REVIEW_BYPASS_SECONDS = 1800 # 距上次活动 ≥ 30 min 且有未 review 的新消息 → 绕过新消息门,
# 把"差几条不够批量"的尾巴也整理掉
# ── 启动错峰 initial_delay(避免首轮全部撞 startup + interval 同一时刻) ──
# 每个循环首次执行时间 = startup + 该 delay;之后按各自 INTERVAL 周期跑。
# 设计原则:archive sweep 用最长 INTERVAL (3600s) 但很多用户不到 1h 就退出,
# 必须显著前移;rebuttal/auto_promote 同 300s 间隔但不能同时跑,错开 60s;
# IdleMaint/Signal 已经间隔短,仅给 startup tasks (cloudsave / outbox replay /
# migration) 一点喘息空间。EmbeddingWarmupWorker 自带 30s warmup gate,不在此处。
_INITIAL_DELAY_IDLE_MAINT = 20 # IdleMaint 首次 (原 10s startup 高频已废)
_INITIAL_DELAY_SIGNAL = 60 # Signal extraction 首次 (原 40s)
_INITIAL_DELAY_REBUTTAL = 100 # Rebuttal 首次 (原 300s)
_INITIAL_DELAY_AUTO_PROMOTE = 150 # Auto-promote 首次 (原 300s, 错开 rebuttal 50s)
_INITIAL_DELAY_ARCHIVE = 250 # Archive sweep 首次 (原 3600s, 大幅前移确保短会话用户也能跑到)
_INITIAL_DELAY_PERSONA_REFINE = 400 # PERSONA_REFINE 首次(与 reflection refine 错峰 100s)
_INITIAL_DELAY_REFLECTION_REFINE = 500 # REFLECTION_REFINE 首次
_INITIAL_DELAY_REFLECTION_SYNTHESIS = 200 # REFLECTION_SYNTHESIS 首次(错过 AUTO_PROMOTE 150 与 ARCHIVE 250,给 SignalLoop 60s + 一两次实际 fact 产出留余地)
# ── 持久化维护状态(跨重启保留 review_clean 标记) ──────────────────
_maint_state: dict[str, dict] = {} # {角色名: {"review_clean": bool, "last_review_ts": str}}
def _maint_state_path() -> str:
return os.path.join(str(_config_manager.memory_dir), 'idle_maintenance_state.json')
async def _aload_maint_state() -> None:
"""启动时从磁盘加载维护状态。"""
from utils.file_utils import read_json_async
global _maint_state
path = _maint_state_path()
if not await asyncio.to_thread(os.path.exists, path):
_maint_state = {}
return
try:
data = await read_json_async(path)
if isinstance(data, dict):
_maint_state = data
logger.debug(f"[IdleMaint] 已加载维护状态: {len(_maint_state)} 个角色")
return
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"[IdleMaint] 维护状态文件加载失败: {e}")
_maint_state = {}
async def _asave_maint_state() -> None:
"""将维护状态持久化到磁盘。"""
from utils.file_utils import atomic_write_json_async
try:
await atomic_write_json_async(_maint_state_path(), _maint_state,
indent=2, ensure_ascii=False)
except Exception as e:
logger.warning(f"[IdleMaint] 维护状态保存失败: {e}")
def _is_review_clean(lanlan_name: str) -> bool:
"""检查角色是否处于 review_clean 状态(已 review 且无新对话)。"""
return _maint_state.get(lanlan_name, {}).get('review_clean', False)
async def _aclear_review_clean(lanlan_name: str) -> None:
"""新 human 消息到达时清除 review_clean 标记。"""
state = _maint_state.get(lanlan_name, {})
if state.get('review_clean'):
state['review_clean'] = False
await _asave_maint_state()
def _has_human_messages(messages) -> bool:
"""检查消息列表中是否包含用户(human)消息。"""
for m in messages:
if getattr(m, 'type', '') == 'human':
return True
return False
async def _ais_review_enabled() -> bool:
"""检查配置中 correction/review 是否启用(走异步 IO)。"""
from utils.file_utils import read_json_async
try:
config_path = str(_config_manager.get_runtime_config_path('core_config.json'))
if not await asyncio.to_thread(os.path.exists, config_path):
return True
config_data = await read_json_async(config_path)
if isinstance(config_data, dict) and not config_data.get('recent_memory_auto_review', True):
return False
except Exception as e:
logger.debug(f"[IdleMaint] 读取 review 开关配置失败,默认启用: {e}")
return True
async def _ais_powerful_memory_enabled() -> bool:
"""检查"强力记忆"是否启用——controls evidence-RFC 引入的全部新 LLM 路径。
关闭时只保留 RFC 之前的基础流水线(Stage-1 fact 抽取 / reflection synthesize
/ recent compress+review / recall reranker / 主动搭话回应的 check_feedback)
+ time-driven promote fallback。关后可省 ~40-50% token。
持久化到 ``core_config.json`` 的 ``powerful_memory_enabled`` 字段,缺失默
认 True(保兼容)。每次需要时再开 read_json_async,不缓存——和
``_ais_review_enabled`` 同款热加载,无需重启即生效。
"""
from utils.file_utils import read_json_async
try:
config_path = str(_config_manager.get_runtime_config_path('core_config.json'))
if not await asyncio.to_thread(os.path.exists, config_path):
return True
config_data = await read_json_async(config_path)
if isinstance(config_data, dict) and not config_data.get('powerful_memory_enabled', True):
return False
except Exception as e:
logger.debug(f"[Memory] 读取强力记忆开关配置失败,默认启用: {e}")
return True
async def _reset_confirmed_at_for_all_characters() -> int:
"""开→关 migration:所有角色的 confirmed reflection 重置 confirmed_at 锚点。
被 main_routers/memory_router.py 的 update_powerful_memory_config 调用——
只在 prev=True, new=False 切换时跑。让 time-driven fallback 走完整 14 天
计时,避免"刚关就立刻批量 promote 旧 confirmed"的体验断层。
返回真实迁移条目数。**对不可恢复失败(reflection_engine 未初始化 / 角色
列表加载失败)一律 raise**,让 caller endpoint 区分"真实 0 条"(角色都
loaded 但没需要重置的)vs"根本没跑"(早期失败)。CodeRabbit PR #997
feedback:之前两条早期失败路径都返回 0 → endpoint 包装成 ok=true,
count=0 → 上游 memory_router 误判成功 → 落盘 powerful_memory_enabled=False
→ 旧 confirmed_at 永久漏迁移。
"""
if reflection_engine is None:
raise RuntimeError(
"reflection_engine 未初始化(memory_server limited-mode 或 startup 未完成)"
)
character_data = await _config_manager.aload_characters()
catgirl_names = list(character_data.get('猫娘', {}).keys())
# 角色列表为空(没配过猫娘)是合法的"0 条要迁移" case,正常返回 0。
total = 0
for name in catgirl_names:
try:
count = await reflection_engine.areset_confirmed_at_to_now(name)
total += count
except Exception as e:
# 单角色失败不致命——记录后继续。最终 count 反映成功的 N 条。
logger.warning(f"[Memory] migration {name} 重置失败(其他角色继续): {e}")
return total
def _touch_activity() -> None:
"""记录一次对话活动,刷新空闲计时器。"""
global _last_activity_time
_last_activity_time = datetime.now()
def _is_idle() -> bool:
"""判断当前是否空闲(距上次活动超过阈值)。"""
return (datetime.now() - _last_activity_time).total_seconds() >= IDLE_THRESHOLD
def _get_settle_lock(lanlan_name: str) -> asyncio.Lock:
"""获取指定角色的结算锁(懒创建)"""
if lanlan_name not in _settle_locks:
_settle_locks[lanlan_name] = asyncio.Lock()
return _settle_locks[lanlan_name]
def _format_legacy_settings_as_text(settings: dict, lanlan_name: str) -> str:
"""将旧版 settings JSON 转为自然语言格式,替代原始 json.dumps 输出。"""
if not settings:
return f"{lanlan_name}记得:(暂无记录)"
sections = []
for name, data in settings.items():
if not isinstance(data, dict) or not data:
continue
lines = []
for key, value in data.items():
if value is None or value == '' or value == []:
continue
if isinstance(value, list):
value_str = '、'.join(str(v) for v in value)
elif isinstance(value, dict):
parts = [f"{k}: {v}" for k, v in value.items() if v is not None and v != '']
value_str = '、'.join(parts) if parts else str(value)
else:
value_str = str(value)
lines.append(f"- {key}:{value_str}")
if lines:
sections.append(f"关于{name}:\n" + "\n".join(lines))
if not sections:
return f"{lanlan_name}记得:(暂无记录)"
return f"{lanlan_name}记得:\n" + "\n".join(sections)
def _spawn_background_task(coro) -> asyncio.Task:
"""Create a background task with strong reference + exception logging."""
task = asyncio.create_task(coro)
_BACKGROUND_TASKS.add(task)
def _on_done(t: asyncio.Task):
_BACKGROUND_TASKS.discard(t)
if not t.cancelled():
exc = t.exception()
if exc:
logger.warning(f"[MemoryServer] 后台任务异常: {exc}")
task.add_done_callback(_on_done)
return task
# ── Outbox handler registry + replay (P1.c) ────────────────────────
# op_type → async handler(name: str, payload: dict) -> None. Handler 必须幂等。
OutboxHandler = Callable[[str, dict], Awaitable[None]]
_OUTBOX_HANDLERS: dict[str, OutboxHandler] = {}
# 启动期补跑 fan-out 并发上限:防止 24h 停机后的 outbox 洪水冲击 LLM 后端。
_REPLAY_CONCURRENCY = 2
_replay_semaphore: asyncio.Semaphore | None = None # 懒构造(event loop-bound)
def register_outbox_handler(op_type: str, handler: OutboxHandler) -> None:
_OUTBOX_HANDLERS[op_type] = handler
async def _run_outbox_op(name: str, op: dict, sem: asyncio.Semaphore | None = None) -> None:
"""跑单条 outbox op 并在成功后 append_done。失败保持 pending 等下次启动补跑。
`sem`:startup replay 路径传入共享 Semaphore 限制 LLM fan-out;日常单次
spawn 路径传 None 即不限流。
Liveness 兜底(Site 7):handler 失败时 append_attempt 一行记录失败。
若同 op_id 累计 attempt 数(含本次) ≥ ``MEMORY_LIVENESS_MAX_ATTEMPTS``
则 append_done 当 dead-letter 放弃该 op + WARN。否则毒 op(payload 触
发 handler 永久 raise,例如 LLM safety filter / parse 永久失败)每次重启
都重跑且永远不出 pending → ``compact`` 永久阻塞 → outbox.ndjson 线性增
长。``op.get('_attempt_count', 0)`` 来自 ``pending_ops`` scan 时的累计,
日常 spawn 路径调 _run_outbox_op 时 op 是临时构造的不带这个字段,按 0
起算(首次失败 → attempt=1,远 < N,正常 pending 等重启重放)。
"""
from config import MEMORY_LIVENESS_MAX_ATTEMPTS
op_id = op.get('op_id')
op_type = op.get('type')
payload = op.get('payload') or {}
from memory.facts import safe_int_field
prior_attempts = safe_int_field(op, '_attempt_count')
handler = _OUTBOX_HANDLERS.get(op_type)
if handler is None:
logger.warning(f"[Outbox] {name}: 未注册的 op type {op_type}, 跳过 {op_id}")
return
# CodeRabbit: 已达 dead-letter 阈值的 op 直接补写 done,不要再跑 handler。
# 边缘 case:上一轮 ``aappend_attempt`` 成功把 _attempt_count 推到 N,但
# 紧接着 ``aappend_done`` 写盘失败(IO transient)→ op 留在 pending →
# 重启 replay 看到 ``_attempt_count=N`` 又进 handler 再失败再尝试 done。
# 对幂等 handler 只是浪费一次调用;对非幂等 handler(outbox 契约要求幂等
# 但不保证)就是真重复副作用。进门先短路保证"达阈值后绝不再执行"。
if prior_attempts >= MEMORY_LIVENESS_MAX_ATTEMPTS:
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id}: 进入时已达 dead-letter 阈值 "
f"({prior_attempts}/{MEMORY_LIVENESS_MAX_ATTEMPTS}),跳过 handler "
f"直接补写 done。Why: 上一轮 append_done 可能 IO 失败留 pending,"
f"避免毒 op 重复执行 + 副作用重放。"
)
try:
await outbox.aappend_done(name, op_id)
except Exception as de:
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id}: dead-letter "
f"append_done 仍失败(保持 pending 等下次重放再补 done): {de}"
)
return
acquired = False
if sem is not None:
await sem.acquire()
acquired = True
try:
try:
await handler(name, payload)
except Exception as e:
try:
await outbox.aappend_attempt(name, op_id)
attempt_persisted = True
except Exception as ae:
attempt_persisted = False
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id}: append_attempt 失败: {ae}"
)
# Codex P1:不能基于"未落盘的 +1"触发 dead-letter。
# 如果本次 aappend_attempt 失败 + 接着 aappend_done 成功 →
# 重启后只看到磁盘上 prior_attempts 个 attempt 行 + 1 个 done →
# op 永久丢失而磁盘记录看起来"只失败了 N-1 次就 done",违背 "≥ N
# 次失败才放弃" 的契约。Attempt 没落盘 → 本次失败按 transient 处理
# (保留 pending,下次重试自然再走一次 attempt),不进 dead-letter
# 判定。
if not attempt_persisted:
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id} 执行失败(attempt 持久化"
f"失败,按 transient 保留 pending 等下次重放): {e}"
)
return
total_attempts = prior_attempts + 1
if total_attempts >= MEMORY_LIVENESS_MAX_ATTEMPTS:
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id}: handler 累计失败 "
f"{total_attempts} 次 ≥ {MEMORY_LIVENESS_MAX_ATTEMPTS},"
f"dead-letter 放弃该 op(最近一次失败: {e})。"
f"Why: liveness 兜底,避免毒 payload 让重启 replay 永远卡住 + "
f"compact 永久阻塞。"
)
try:
await outbox.aappend_done(name, op_id)
except Exception as de:
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id}: dead-letter "
f"append_done 失败: {de}"
)
else:
logger.warning(
f"[Outbox] {name}/{op_type}/{op_id} 执行失败(保持 pending,"
f"attempts={total_attempts}/{MEMORY_LIVENESS_MAX_ATTEMPTS}): {e}"
)
return
try:
await outbox.aappend_done(name, op_id)
except Exception as e:
# append_done 失败不致命:下次启动重放这个 op,handler 幂等。
logger.warning(f"[Outbox] {name}/{op_type}/{op_id}: append_done 失败: {e}")
finally:
if acquired and sem is not None:
sem.release()
async def _spawn_outbox_post_turn_signals(lanlan_name: str, messages: list) -> asyncio.Task:
"""把 per-turn signals 背景任务登记到 outbox 并 spawn。
"per-turn signals" = counter bump(给 batch loop 计数)+ 复读嗅探 +
check_feedback + OFF-mode Stage-1 fallback,见 ``_run_post_turn_signals``。
登记的 payload 包含 messages_to_dict 序列化后的整轮对话,重启时可重放。
"""
from utils.llm_client import messages_to_dict
payload = {'messages': messages_to_dict(messages)}
try:
op_id = await outbox.aappend_pending(lanlan_name, OP_POST_TURN_SIGNALS, payload)
except Exception as e:
# Outbox 写失败不能阻塞主流程,降级为一次性任务(与重构前行为一致)
logger.warning(
f"[Outbox] {lanlan_name}: append_pending 失败,降级为内存任务: "
f"{type(e).__name__}: {e}"
)
return _spawn_background_task(
_run_post_turn_signals(messages, lanlan_name)
)
op = {'op_id': op_id, 'type': OP_POST_TURN_SIGNALS, 'payload': payload}
return _spawn_background_task(_run_outbox_op(lanlan_name, op))
async def _replay_pending_outbox() -> list[asyncio.Task]:
"""启动期扫描 outbox,补跑未完成 op。返回 spawn 出的 Task 列表。
返回值方便调用方(或测试)await 所有任务跑完,而不是靠
`_BACKGROUND_TASKS` 快照 + `asyncio.sleep(0)` 这种弱保证等法。
扫描范围 = 当前 config 的角色名 ∪ memory_dir 下有 `outbox.ndjson` 的
子目录。仅扫 config 会漏掉"曾经在用、后来被移出 config 但仍有 pending
op 的角色",导致那些 op 永远不会被补跑。
"""
global _replay_semaphore
spawned: list[asyncio.Task] = []
names: set[str] = set()
try:
character_data = await _config_manager.aload_characters()
names.update(character_data.get('猫娘', {}).keys())
except Exception as e:
logger.warning(f"[Outbox] 启动补跑:加载角色列表失败: {e}")
# 即便 config 加载失败,仍允许走磁盘扫描兜底——这正是 config
# 变化后仍需保证 crash-recovery 的场景。
try:
memory_dir = _config_manager.memory_dir
if memory_dir and os.path.isdir(memory_dir):
for entry in os.listdir(memory_dir):
candidate = os.path.join(memory_dir, entry, 'outbox.ndjson')
if os.path.isfile(candidate):
names.add(entry)
except Exception as e:
logger.warning(f"[Outbox] 启动补跑:扫描 memory_dir 失败: {e}")
if not names:
return spawned
# Semaphore 在 event loop 里构造(不能在模块级构造)
if _replay_semaphore is None:
_replay_semaphore = asyncio.Semaphore(_REPLAY_CONCURRENCY)
for name in sorted(names):
try:
pending = await outbox.apending_ops(name)
except Exception as e:
logger.warning(f"[Outbox] {name}: 读取 pending ops 失败: {e}")
continue
if not pending:
# 机会性 compact:文件可能累积了很多 done 行。失败不影响主流程
# (compact 仅是空间回收),debug 级别记录便于观测。
try:
dropped = await outbox.amaybe_compact(name)
if dropped:
logger.info(f"[Outbox] {name}: compact 丢弃 {dropped} 行")
except Exception as e:
logger.debug(f"[Outbox] {name}: 机会性 compact 失败(可忽略): {e}")
continue
logger.info(f"[Outbox] {name}: 补跑 {len(pending)} 条未完成 op")
for op in pending:
spawned.append(
_spawn_background_task(_run_outbox_op(name, op, _replay_semaphore))
)
return spawned
@app.post("/shutdown")
async def shutdown_memory_server():
"""接收来自main_server的关闭信号"""
global enable_shutdown
if not enable_shutdown:
logger.warning("收到关闭信号,但当前模式不允许响应退出请求")
return {"status": "shutdown_disabled", "message": "当前模式不允许响应退出请求"}
try:
logger.info("收到来自main_server的关闭信号")
shutdown_event.set()
return {"status": "shutdown_signal_received"}
except Exception as e:
logger.error(f"处理关闭信号时出错: {e}")
return {"status": "error", "message": str(e)}
REBUTTAL_CHECK_INTERVAL = 180 # 3 分钟
REBUTTAL_FIRST_RUN_LOOKBACK_HOURS = 1 # 首次启动 / 时钟回拨兜底回扫窗口
# Drain pattern: 一次最多处理 N 条 user 消息,避免高频用户场景下 prompt 爆炸。
# 多余的留到下一轮(cursor 推进到第 N 条的 timestamp,不丢消息)。
REBUTTAL_DRAIN_BATCH_LIMIT = 20
# 读 SQL 时的硬上限——bound memory,防止 1h fallback 把整张表拉进来。
# 200 行通常包含 50-100 条 user 消息,足以喂多次 drain。
REBUTTAL_SQL_ROW_LIMIT = 200
def _coerce_db_ts(ts) -> datetime | None:
"""归一化 SQL 行里的 timestamp 字段为 **naive** datetime。
SQLAlchemy + SQLite 在某些 driver 配置下返回字符串而非 datetime;与
memory/timeindex.py:get_last_conversation_time 同款归一化。返回 None
表示无法解析(caller 应跳过此行而不是把 None 写进 cursor)。
若解析出 TZ-aware datetime(import / migration 路径写入 "...+00:00"
之类),强制 `replace(tzinfo=None)` 转 naive——本文件所有 cursor /
比较都按 naive 语义工作(last_b_check_ts / last_a_msg_ts / facts.json
`created_at` 全是 naive `datetime.now().isoformat()`),aware 跟 naive
比较会抛 TypeError 让 caller 永久哑火(Codex P1+P2 round-7/8 on PR
#1408 双侧 case)。
"""
if isinstance(ts, datetime):
result = ts
elif isinstance(ts, str):
try:
result = datetime.fromisoformat(ts)
except ValueError:
try:
result = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
return None
else:
return None
if result.tzinfo is not None:
result = result.replace(tzinfo=None)
return result
def _extract_user_messages_with_ts_from_rows(rows: list) -> list[tuple[str, datetime]]:
"""从 time_indexed SQL 查询结果中提取 (用户消息文本, timestamp) 元组。
rows: [(timestamp, session_id, message_json), ...] (ASC ordered by ts)
message_json 是 langchain SQLChatMessageHistory 存储的 JSON 字符串。
content 可能是 str 或 list[{type, text}]。
返回的 list 按 ts ASC 排序,caller 可基于 last item 的 ts 推 cursor。
timestamp 通过 _coerce_db_ts 归一化为 datetime 对象(SQL driver 可能
返回 str);解析失败的行会被跳过。
"""
out: list[tuple[str, datetime]] = []
for ts_raw, _, msg_json in rows:
ts = _coerce_db_ts(ts_raw)
if ts is None:
continue
try:
msg = json.loads(msg_json) if isinstance(msg_json, str) else msg_json
if isinstance(msg, dict) and msg.get('type') == 'human':
content = msg.get('data', {}).get('content', '')
if isinstance(content, str):
if content.strip():
out.append((content, ts))
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_val = part.get('text', '')
if text_val.strip():
out.append((text_val, ts))
except (json.JSONDecodeError, TypeError):
continue
return out
def _extract_user_messages_from_rows(rows: list) -> list[str]:
"""从 time_indexed SQL 查询结果中提取用户消息文本(legacy text-only 视图)。
rows: [(timestamp, session_id, message_json), ...]
"""
user_msgs = []
for _, _, msg_json in rows:
try:
msg = json.loads(msg_json) if isinstance(msg_json, str) else msg_json
if isinstance(msg, dict) and msg.get('type') == 'human':
content = msg.get('data', {}).get('content', '')
if isinstance(content, str):
if content.strip():
user_msgs.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text = part.get('text', '')
if text.strip():
user_msgs.append(text)
except (json.JSONDecodeError, TypeError):
continue
return user_msgs
def _extract_role_tagged_messages_from_rows(rows: list) -> list[dict]:
"""Path B 用的全消息提取——保留 user + ai 两种 type,输出 message_dict
list 直接喂 ``convert_to_messages``。
跟 ``_extract_user_messages_from_rows`` 的区别:
- 收 type ∈ {'human', 'ai'} 两种(不再仅 human)
- 返回 [{'type': 'human'|'ai', 'data': {'content': str}}, ...] 而不是
纯 str list,让下游 ``convert_to_messages`` 还原成 HumanMessage/AIMessage
让 ``FactStore._format_conversation`` 渲染时按 type → name_mapping 出
"{MASTER_NAME} | xxx" / "{LANLAN_NAME} | xxx" 形式,path B prompt 据此
判每条 fact 的 source 归属(user_observation / ai_disclosure)
PR #1399 的教训:这里返回 list[dict] 让 caller 拼 message_dicts 后用
``convert_to_messages(message_dicts)`` 直接转——**不要** ``json.dumps``
包一层(convert_to_messages 只接 list,str 会被静默吞成 [])。
"""
out: list[dict] = []
for _, _, msg_json in rows:
try:
msg = json.loads(msg_json) if isinstance(msg_json, str) else msg_json
if not isinstance(msg, dict):
continue
msg_type = msg.get('type')
if msg_type not in ('human', 'ai'):
continue
content = msg.get('data', {}).get('content', '')
# content 归一化:内部可能是 str 或 [{type:'text', text:'...'}, ...]
# 后者拼回单个 str(path B prompt 不需要细粒度 part 结构,
# FactStore._format_conversation 把 list content 拼成 ''.join 也是
# 同样语义)。
if isinstance(content, str):
text = content
elif isinstance(content, list):
parts = [
p.get('text', '')
for p in content
if isinstance(p, dict) and p.get('type') == 'text'
]
text = ''.join(parts)
else:
continue
if not text.strip():
continue
out.append({'type': msg_type, 'data': {'content': text}})
except (json.JSONDecodeError, TypeError):
continue
return out
def _trim_to_user_msg_bracket(message_dicts: list[dict]) -> list[dict]:
"""只保留首条 human msg 到末条 human msg 之间(含两端)的消息。
Product thesis 防廉价层污染:AI 在首条 user msg **之前**的内容是 user
还没印证的 proactive 试探,AI 在末条 user msg **之后**的内容是 user
还没回应过的独白——两段都是廉价层,不该当 fact 沉淀。只有夹在两条
user msg 中间的 AI 内容才意味着 "user 看到了 / 认可了这段对话上下
文",才有资格被 path B 拣回当 ai_disclosure fact。
完全无 human msg → 返 [](caller 视作 AI-only 窗口跳过)。
只有一条 human msg → 返该条(bracket 退化为单点,仍合法:那条本身
就是 user 发声,path B 可借 known_pool 看相邻 AI 上下文)。
"""
human_indices = [
i for i, m in enumerate(message_dicts) if m.get('type') == 'human'
]
if not human_indices:
return []
return message_dicts[human_indices[0]:human_indices[-1] + 1]
async def _resolve_rebuttal_start_time(name: str, now: datetime):
"""决定 rebuttal_loop 本轮查询的起始时间。
优先级:
1. 持久化的 CURSOR_REBUTTAL_CHECKED_UNTIL
2. 兜底回扫窗口(首次启动 / cursor 文件缺失)
3. 时钟回拨保护:cursor > now 视为脏数据,走兜底并**立刻重写**游标
rollback 分支立即覆写游标的原因:若只在主循环 success branch 才覆写,
遇上 LLM 持续失败 + 时钟回拨,主循环每轮都会命中 fallback 并告警,
但游标永远停留在未来时间,无法自愈;这里直接写回 fallback 打破死循环。
写 fallback 而非 now:若写 now,本 tick 的 LLM 调用若失败,
窗口 `[fallback, now]` 的消息会因下轮 cursor 已推进到 now 而被跳过;
写 fallback 则保持重试语义——主循环 success branch 再把 cursor 推进到 now。
独立成函数便于单测验证。
"""
cursor = await cursor_store.aget_cursor(name, CURSOR_REBUTTAL_CHECKED_UNTIL)
fallback = now - timedelta(hours=REBUTTAL_FIRST_RUN_LOOKBACK_HOURS)
if cursor is None:
# 首次启动:把 fallback 落盘锚定。否则 LLM 连续失败时,下轮