Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 124 additions & 4 deletions app/memory_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ async def _periodic_idle_maintenance_loop():
)
try:
# 传空消息列表仅触发压缩逻辑
await recent_history_manager.update_history([], name, detailed=True)
await recent_history_manager.update_history([], name, detailed=True, on_compress_done=_on_compress_done)
logger.info(f"[IdleMaint] {name}: 历史记录压缩完成")
except Exception as e:
logger.warning(f"[IdleMaint] {name}: 历史记录压缩失败: {e}")
Expand Down Expand Up @@ -3348,6 +3348,126 @@ async def _record_review_failure(lanlan_name: str, snapshot: list) -> int:
return state['review_fail_attempts']


# ── best-effort 后台压缩(主路径 compress 失败时兜底)─────────────────────
# 真根因:主路径压缩走 LLM 耗时数秒~数十秒,限流抖动 / 偶发失败 → #1629 跳过
# 保留完整历史、下轮重试。但若持续失败,历史一直压不掉、越积越多。这里在主路径
# 压缩失败时起一个受保护的一次性后台任务尽力压(基于快照、不被对话打断;压完用
# fingerprint 对齐合并回写)。主路径某轮成功 → cancel 在跑的后台。失败退避复用
# review 的 Gate 6 模式,防 summary 模型持续故障时每轮起一个注定失败的任务空烧。
compress_backup_tasks: dict[str, asyncio.Task] = {}


async def _record_compress_backup_failure(lanlan_name: str, snapshot: list) -> int:
"""记一次后台压缩失败到退避计数并返回累计次数。输入 fingerprint 变了先归零
(每段积压各享独立预算,不跨输入累积),与 _record_review_failure 对偶。"""
from memory.recent import build_review_fingerprint
state = _maint_state.setdefault(lanlan_name, {})
cur_fp = build_review_fingerprint(snapshot)
if state.get('compress_backup_fail_fp') != cur_fp:
state['compress_backup_fail_attempts'] = 0
state['compress_backup_fail_attempts'] = (state.get('compress_backup_fail_attempts', 0) or 0) + 1
state['compress_backup_fail_fp'] = cur_fp
await _asave_maint_state()
return state['compress_backup_fail_attempts']


async def _clear_compress_backup_failure(lanlan_name: str) -> None:
"""清后台压缩失败退避计数(主路径成功 / 后台成功或白做时调)。"""
state = _maint_state.setdefault(lanlan_name, {})
if state.get('compress_backup_fail_attempts') or state.get('compress_backup_fail_fp'):
state['compress_backup_fail_attempts'] = 0
state['compress_backup_fail_fp'] = None
await _asave_maint_state()


async def _run_backup_compress(lanlan_name: str, snapshot: list, detailed: bool):
"""后台跑 best-effort 压缩:compress 在锁外(LLM 耗时,不阻塞其它端点),
merge 在 _get_settle_lock 内(快,串行化对该角色 history 的写)。"""
try:
# 1) 压缩(锁外)。compress_history 内部按输入大小自动分段,避免输入过大超时。
try:
result = await recent_history_manager.compress_history(snapshot, lanlan_name, detailed)
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(f"[CompressBackup] {lanlan_name} 后台压缩抛异常,按失败处理: {e}")
result = None
if result is None:
attempts = await _record_compress_backup_failure(lanlan_name, snapshot)
logger.info(f"[CompressBackup] {lanlan_name} 后台压缩失败,退避计数 → {attempts}")
# best-effort 也没压成 → 实在不行才丢:若历史仍超硬上限,裁剪最旧未压缩
# 原文兜底(锁内串行化写)。暂时性失败时后台会成功、走不到这里。
async with _get_settle_lock(lanlan_name):
await recent_history_manager.enforce_hard_cap(lanlan_name)
return
# 2) 合并写回(锁内,快)。merge_backup_memo 用 fingerprint 对齐,积压已被
# 主路径压掉 / 被清空就返回 'moot' 丢弃(白做)。
async with _get_settle_lock(lanlan_name):
status = await recent_history_manager.merge_backup_memo(lanlan_name, snapshot, result[0])
Comment on lines +3405 to +3406
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Serialize backup merges with /process writes

This lock only protects callers that also use _get_settle_lock, but /process still writes recent history without that lock at app/memory_server.py:3937. If a backup merge reaches the awaited disk-write section while a /process update reloads and writes the same recent.json, the last writer can drop either the newly generated memo or the just-added turn, so the backup merge is not actually protected during active chat.

Useful? React with 👍 / 👎.

if status == 'failed':
# 合并落盘失败 → 没真正写成功,bump 退避(不清),下次再试。
attempts = await _record_compress_backup_failure(lanlan_name, snapshot)
logger.info(f"[CompressBackup] {lanlan_name} 后台压缩合并落盘失败,退避计数 → {attempts}")
return
# 'merged' 或 'moot' 都说明这段积压已处理 / 已过时,清退避计数。
await _clear_compress_backup_failure(lanlan_name)
logger.info(f"[CompressBackup] {lanlan_name} 后台压缩完成:{status}")
except asyncio.CancelledError:
logger.info(f"[CompressBackup] {lanlan_name} 后台压缩被取消(主路径已成功)")
except Exception as e:
logger.error(f"[CompressBackup] {lanlan_name} 后台压缩后处理出错: {e}")
Comment on lines +3403 to +3418
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

把 merge/回写阶段的异常也计入退避喵

现在只有 compress_history() 失败才会 bump compress_backup_fail_attempts。如果 merge_backup_memo() 或后面的 _clear_compress_backup_failure() 持续抛错,这个任务会直接退出且不记失败;下一次主路径压缩失败又会对同一份 snapshot 重新起后台压缩,等于把这套 Gate6 退避绕过去了喵。这样磁盘/merge 侧的持续故障还是会反复空烧 summary 调用喵。

😼 可参考的修法喵
-        async with _get_settle_lock(lanlan_name):
-            status = await recent_history_manager.merge_backup_memo(lanlan_name, snapshot, result[0])
-        # 'merged' 或 'moot' 都说明这段积压已处理 / 已过时,清退避计数。
-        await _clear_compress_backup_failure(lanlan_name)
-        logger.info(f"[CompressBackup] {lanlan_name} 后台压缩完成:{status}")
+        try:
+            async with _get_settle_lock(lanlan_name):
+                status = await recent_history_manager.merge_backup_memo(
+                    lanlan_name, snapshot, result[0]
+                )
+            # 'merged' 或 'moot' 都说明这段积压已处理 / 已过时,清退避计数。
+            await _clear_compress_backup_failure(lanlan_name)
+            logger.info(f"[CompressBackup] {lanlan_name} 后台压缩完成:{status}")
+        except Exception as e:
+            attempts = await _record_compress_backup_failure(lanlan_name, snapshot)
+            logger.warning(
+                f"[CompressBackup] {lanlan_name} 后台压缩后处理失败,"
+                f"退避计数 → {attempts}: {e}"
+            )
+            return
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/memory_server.py` around lines 3399 - 3409, compress/merge-stage
exceptions from recent_history_manager.merge_backup_memo and the subsequent
_clear_compress_backup_failure are currently not counted toward
compress_backup_fail_attempts; wrap the merge+clear steps in their own
try/except that, on any Exception (but not asyncio.CancelledError), calls the
same failure-counter helper used when compress_history() fails to bump
compress_backup_fail_attempts for lanlan_name (reuse the existing helper that
increments compress_backup_fail_attempts), then re-raise or log consistently;
keep the existing asyncio.CancelledError handling separate and ensure you
reference recent_history_manager.merge_backup_memo,
_clear_compress_backup_failure, compress_history(), and
compress_backup_fail_attempts when making the change.

finally:
cur = asyncio.current_task()
if compress_backup_tasks.get(lanlan_name) is cur:
compress_backup_tasks.pop(lanlan_name, None)


async def _on_compress_done(lanlan_name: str, snapshot: list, ok: bool, detailed: bool):
"""update_history 压缩结束回调(recent.py 注入)。
ok=True(主路径压成功)→ cancel 在跑的后台兜底 + 清退避;
ok=False(主路径压失败)→ 起一个受保护的后台兜底压缩(若无在跑、未被退避挡)。

本回调只 spawn / cancel task,不 await 后台 LLM——它可能在 _get_settle_lock
内被调(/renew、/settle),绝不能阻塞。"""
if ok:
task = compress_backup_tasks.get(lanlan_name)
if task is not None and not task.done():
task.cancel()
await _clear_compress_backup_failure(lanlan_name)
return
# ok=False:主路径压缩失败 → 起后台兜底
if not snapshot:
return
existing = compress_backup_tasks.get(lanlan_name)
if existing is not None and not existing.done():
return # in-flight:同角色已有后台压缩在跑,不重复起
# 失败退避(Gate 6 模式):连续失败 ≥ N 且输入未变 → dead-letter,不再起,
# 防 summary 模型持续故障时每轮都起一个注定失败的后台任务空烧。
from config import MEMORY_LIVENESS_MAX_ATTEMPTS
from memory.recent import build_review_fingerprint
state = _maint_state.setdefault(lanlan_name, {})
fail_attempts = state.get('compress_backup_fail_attempts', 0) or 0
if fail_attempts >= MEMORY_LIVENESS_MAX_ATTEMPTS:
cur_fp = build_review_fingerprint(snapshot)
if state.get('compress_backup_fail_fp') == cur_fp:
logger.debug(
f"[CompressBackup] {lanlan_name} 失败退避 dead-letter"
f"(连续失败 {fail_attempts} 次且输入未变),跳过"
)
# dead-letter:后台已救不回 → 此时才裁剪兜底(实在不行才丢)。不 acquire
# settle lock:本回调可能已在 /renew·/settle 的锁内被调(重入会死锁);
# enforce_hard_cap 是 best-effort 写。
await recent_history_manager.enforce_hard_cap(lanlan_name)
return
# 输入变了 → 旧计数过期,复位放行
state['compress_backup_fail_attempts'] = 0
state['compress_backup_fail_fp'] = None
await _asave_maint_state()
task = _spawn_background_task(_run_backup_compress(lanlan_name, list(snapshot), detailed))
Comment on lines +3425 to +3466
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

这个回调还在临界区里做写盘喵

注释里说这里“只 spawn / cancel task”,但 ok=True 分支会 await _clear_compress_backup_failure(),退避复位分支也会 await _asave_maint_state()/renew/settle 这两条路径都是在 _get_settle_lock() 内调用 update_history() 的,所以这些 await 会把 idle_maintenance_state.json 的写盘绑进用户请求的串行窗口里,慢盘时会直接拖长请求并额外阻塞同角色的 /new_dialog 喵。这里最好只改内存状态,然后 fire-and-forget 持久化喵。

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/memory_server.py` around lines 3416 - 3453, _on_compress_done currently
performs blocking awaits (_clear_compress_backup_failure and _asave_maint_state)
while running inside the settle/renew critical section; change it to only mutate
in-memory _maint_state and fire-and-forget the persistence: in the ok=True
branch update/reset the in-memory compress-backup failure counters in
_maint_state (do not await _clear_compress_backup_failure) and spawn a
background task to call _clear_compress_backup_failure (or a small wrapper that
does the save) via _spawn_background_task; likewise, when resetting fail
counters after input change set state['compress_backup_fail_attempts']=0 and
state['compress_backup_fail_fp']=None and call _spawn_background_task to run
_asave_maint_state (do not await); keep spawn/cancel logic for
_run_backup_compress as-is so no awaits happen inside the critical path.

compress_backup_tasks[lanlan_name] = task
logger.info(f"[CompressBackup] {lanlan_name} 主路径压缩失败,已起后台兜底压缩任务")


async def _run_review_in_background(
lanlan_name: str, snapshot: list, cancel_event: asyncio.Event,
):
Expand Down Expand Up @@ -3827,7 +3947,7 @@ async def process_conversation(request: HistoryRequest, lanlan_name: str):
if _has_human_messages(input_history):
await _aclear_review_clean(lanlan_name)
logger.info(f"[MemoryServer] 收到 {lanlan_name} 的对话历史处理请求,消息数: {len(input_history)}")
await recent_history_manager.update_history(input_history, lanlan_name)
await recent_history_manager.update_history(input_history, lanlan_name, on_compress_done=_on_compress_done)
# 旧模块已禁用(性能不足):
# await settings_manager.extract_and_update_settings(input_history, lanlan_name)
# await semantic_manager.store_conversation(uid, input_history, lanlan_name)
Expand Down Expand Up @@ -3872,7 +3992,7 @@ async def process_conversation_for_renew(request: HistoryRequest, lanlan_name: s
logger.info(f"[MemoryServer] renew: 收到 {lanlan_name} 的对话历史处理请求,消息数: {len(input_history)}")
# 首轮摘要带锁:阻塞 /new_dialog 直到摘要+时间戳写入完成
async with _get_settle_lock(lanlan_name):
await recent_history_manager.update_history(input_history, lanlan_name, detailed=True)
await recent_history_manager.update_history(input_history, lanlan_name, detailed=True, on_compress_done=_on_compress_done)
await time_manager.astore_conversation(uid, input_history, lanlan_name)

# 以下操作在锁外执行,不阻塞 /new_dialog
Expand Down Expand Up @@ -3908,7 +4028,7 @@ async def settle_conversation(request: HistoryRequest, lanlan_name: str):
async with _get_settle_lock(lanlan_name):
if input_history:
await time_manager.astore_conversation(uid, input_history, lanlan_name)
await recent_history_manager.update_history([], lanlan_name, detailed=True)
await recent_history_manager.update_history([], lanlan_name, detailed=True, on_compress_done=_on_compress_done)

if input_history:
await _spawn_outbox_post_turn_signals(lanlan_name, input_history)
Expand Down
18 changes: 18 additions & 0 deletions config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,22 @@ def translate_value(val):
- 上游:用户/AI 的原始对话文本,正常一轮 30-500 token,长贴可能数 KB。
- 截断策略:保留头尾各 250 token,中段用 "…[省略中段]…" 替换。"""

RECENT_COMPRESS_INPUT_BUDGET_TOKENS = 8000
"""后台 best-effort 压缩的单段输入 token 预算(分段阈值)。
- 用途:待压积压渲染成文本后若超过此值,compress_history 走分段
map-reduce——切成每段 ≤ 此值的小段分别压成中间摘要,再 reduce 成最终
备忘录,减小单次 LLM 输入、避免输入过大导致超时。未超此值的正常压缩
走原一次性路径,行为不变。
- 上游:积压对话渲染文本的 token 数。"""

RECENT_HARD_CAP_TOKENS = 60000
"""recent 历史的硬上限(最终兜底,平时不触发)。
- 用途:压缩持续失败(如持续 429,best-effort 后台也救不回)导致历史
一直压不掉、无限膨胀时,update_history 保留完整历史前若总 token 超过
此值,丢弃最旧的未压缩对话原文,保留近期若干条 + 备忘录,保证 prompt
有界。设得很大,只作最后防线。
- 上游:未被压缩而累积的 recent 历史 token 数。"""

# ---- Memory: reflection ----
REFLECTION_TEXT_MAX_TOKENS = 150
"""单条 reflection 文本的 soft cap。
Expand Down Expand Up @@ -2022,6 +2038,8 @@ def translate_value(val):
'RECENT_COMPRESS_THRESHOLD_ITEMS',
'RECENT_SUMMARY_MAX_TOKENS',
'RECENT_PER_MESSAGE_MAX_TOKENS',
'RECENT_COMPRESS_INPUT_BUDGET_TOKENS',
'RECENT_HARD_CAP_TOKENS',
'REFLECTION_TEXT_MAX_TOKENS',
'REFLECTION_SURFACE_TOP_K',
'REFLECTION_SYNTHESIS_FACTS_MAX',
Expand Down
Loading
Loading