Skip to content

Commit 4b349b6

Browse files
wehosHongzhi Wenclaude
authored
fix(memory): 7 处 LLM 终态失败的 liveness dead-letter 兜底 (#1409) (#1412)
* fix(memory): 7 处 LLM 终态失败的 liveness dead-letter 兜底 (#1409) 毒 input(safety filter / 永远 parse 不出 / prompt 过长等)让 LLM ``_allm_call_with_retries`` 永久耗尽 → 上层不动 progress marker → 同一 输入下轮再被打到 LLM 同样失败 → 永久卡死该角色 pipeline。issue #1409 诊断出 7 处同源缺口,统一加 ``MEMORY_LIVENESS_MAX_ATTEMPTS=5`` per-input attempt counter,超阈值强推 marker / 丢 dead-letter,对偶 schema 重判 已有的 ``_bump_fact_recheck_attempts`` pattern。 修复 site: - 0a/0b: signal extract path A/B 同 cursor 反复 ``FactExtractionFailed`` / ``aextract_facts_with_known_pool 返 None`` → 强推 cursor (in-memory) - 1: rebuttal loop 同 cursor 反复 ``check_feedback_for_confirmed 返 None`` → 强推落盘 ``CURSOR_REBUTTAL_CHECKED_UNTIL`` (in-memory counter, cursor 落盘) - 2: ``PersonaManager.resolve_corrections`` 批量 LLM 失败 → 给本批 entry bump 落盘 ``resolve_attempts``,超阈值从 queue 丢 dead-letter - 3: ``FactDedupResolver.aresolve`` 同上(pair 落盘 ``resolve_attempts``) - 4: refine cluster LLM 失败 → 给非 fact 成员 bump 落盘 ``refine_attempts``; 达上限的 entry 在下次 cluster gather 过滤掉;apply_refine_actions 在 stamp 成功时清回 0 - 7: outbox handler 永久 raise → append_attempt 行落盘;累计达上限 append_done 当 dead-letter,顺带解锁 compact 永久阻塞 新增 ``config.MEMORY_LIVENESS_MAX_ATTEMPTS=5``。已有保护参考 (``_bump_fact_recheck_attempts`` + ``_apromote_with_merge`` dead-letter) 不动。``recent.review_history`` / ``synthesize_reflections`` / embedding worker 自然演进或本地服务,不属本类,不收。 测试 ``tests/unit/test_memory_liveness_dead_letter.py`` 17 条覆盖每 site: N-1 次不动 marker、N 次触发 dead-letter、成功路径清 counter。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(tests): 清掉 test_memory_liveness_dead_letter 里 lint 报的 3 处冗余 - 删 `import os` / `import tempfile` (整文件未引用) - `op_id = outbox.append_pending(...)` 在 test_run_outbox_op_dead_letters_at_threshold 里没后续使用(pending op 通过 outbox.pending_ops 重新读),改为不接 return value 不动 config 那条 `MEMORY_LIVENESS_MAX_ATTEMPTS` "unused global" 告警——它实际被 4 处 跨模块 import 使用(persona / fact_dedup / refine driver / memory_server),CodeQL 只扫 模块内 scope 看不到 cross-module 引用,是误报。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(memory): 3 处 review feedback 修复(Codex P1 + CodeRabbit P1×2) 1. Outbox `_run_outbox_op` dead-letter 守 attempt 持久化失败 (Codex P1) - 旧逻辑:`aappend_attempt` 抛异常时仍递增 `total_attempts = prior+1` 并可能立即 `aappend_done`,但 attempt 没落盘 → 重启后磁盘只看到 prior_attempts (=N-1) 个 attempt + 1 个 done → op 永久丢但记录显示 "只失败 N-1 次就 done",违背 ≥ N 次失败才放弃契约 - 修:attempt 持久化失败时按 transient 处理,保留 pending 等下次重放 - 加 regression test:模拟 N-1 个 attempt + IOError 注入,验证不 dead-letter 2. Site 0a cursor_key 用 start_time 不要字面 'cold' (CodeRabbit P1) - 旧 `cursor_key = state.get('last_check_ts') or 'cold'` 把所有冷启动多轮 失败聚合到同一桶,第 N 次强推 cursor 到当时的 now,把那段时间进来的 正常 msg 一起 dead-letter - 修:`cursor_key = start_time.isoformat(...)`——有 cursor 时 start_time == cursor(_signal_check_window_start 直接返 cursor),冷启动时 start_time 每轮不同 → 不误聚合 3. dedup + corrections "LLM 返 list 但 0 消费" 也算 attempt (CodeRabbit P1 + 对偶) - 旧逻辑:LLM 输出有效 list 但 action 全无效(unknown action / invalid index / format 错),`_aapply_decisions` / `_apply_correction_results` 返 0 → 队列原样保留 → 队头同样毒 batch 下次重喂 → 永久卡 - 修:dedup 和 corrections 都在 `processed_keys=空` / `resolved=0` 时 调 `_abump_*_attempts_and_dead_letter` 计 attempt(对偶:CodeRabbit 只提了 dedup,对偶补 corrections) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(memory): 2 处 CodeRabbit P1 round-2 修复 1. _run_outbox_op 进门先短路已 dead-letter 的 op - 边缘场景:上轮 attempt 推到 N + aappend_done 失败 → op 留 pending → 重启 replay 再跑 handler 一次。对非幂等 handler(outbox 契约要求幂等 但不强制)就是真重复副作用 - 修:进门先 `prior_attempts >= MAX` 短路直接补 append_done,handler 绝不再跑。MEMORY_LIVENESS_MAX_ATTEMPTS / prior_attempts 同步 hoist 到 函数顶部,去掉 except 块里的重复 import / 计算 - 加 regression test test_run_outbox_op_short_circuits_when_already_dead_letter: mock handler 跑就报错,验证 short-circuit 路径不触达 handler 2. _abump_refine_attempts (reflection) 保存前过滤 terminal status - `_aload_reflections_full` 加载含 terminal entries 的全集;如果传给 `asave_reflections` → `_prepare_save_reflections` 把 active_ids 当成 "想要存活的集",磁盘上同 id 的全部 continue 不归档 → 老 promoted/denied 永远 archive 不掉 - 修:跟 arecord_mentions / aupdate_suppressions / _aauto_promote_stale_locked 同款过滤 REFLECTION_TERMINAL_STATUSES 再 save。persona 这边无 archive 流程不 需要 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(memory): refine exception 路径不计 refine_attempts (Codex P1 round-3) `_resolve_cluster` 内部 `await apply_fn(...)` (manager 端持久化) 没有 try 包裹,apply_fn 抛异常(cloudsave 维护态 / atomic_write IO / 锁竞争)会直接 冒到 `refine_pass` 的 `except Exception as e:` 块,被当 cluster liveness failure 计 attempt。Apply 失败是磁盘/锁层 transient,跟 cluster 内容无关, 不该让 entry refine_attempts 因此累计触发非必要 dead-letter。 修:refine_pass 只在 `_resolve_cluster` 明确返 False(=LLM 输出空 / parse 失败 / 非 list 等持续性问题,跟 cluster 内容相关)时调 failure_fn。 exception 路径仅 failed++ + warn,不 bump。 LLM 网络 transient (ainvoke 抛 TimeoutError 等) 同理被归类 transient 不 bump——LLM 持续性问题已被 return False 路径覆盖。 加 regression test `test_refine_pass_exception_does_not_invoke_failure_fn`: mock `_resolve_cluster` 抛 RuntimeError,验证 failure_fn 不被调。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(refine): 同步 FailureFn 契约说明跟上一轮实现 42edea7 改了实现(exception 路径不调 failure_fn 不计 refine_attempts), 但 FailureFn 类型注释和 refine_pass docstring 还写着"返 False 或抛异常时 调用",跟实际不符——容易让后续 manager-side 实现者按旧契约重新把 exception 也算进 refine_attempts。doc-only 跟代码对齐。 CodeRabbit Minor 反馈。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(memory): liveness attempt 字段读取脏值兜底 (Codex P2 round-4) `int(d.get('refine_attempts', 0) or 0)` 等模式在 manual edit / legacy / migration noise 写进 `""` / `"unknown"` / list / dict 等脏值时会抛 ValueError/TypeError,让上游 list comprehension(候选 gather / batch 选取) 挂掉整个 refine pass / resolve loop —— liveness 兜底自己变了新的 liveness 缺口。 加 `memory.facts.safe_int_field` 共享 helper(跟既有 `safe_importance` 同邻 居),兜底任何脏值返 default 不抛。9 处 call site 全部换用: - memory/fact_dedup.py: 2 处 (resolve_attempts gather + bump) - memory/persona.py: 3 处 (resolve_attempts gather/bump + refine_attempts bump) - memory/reflection.py: 1 处 (refine_attempts bump,function-local import) - app/memory_server.py: 3 处 (outbox _attempt_count + 2 个 refine driver 函数的 candidate gather filter) 加 regression test `test_safe_int_field_handles_dirty_values` 覆盖 ''、 'unknown'、'high'、list、dict 等脏值,确保各场景兜底不抛。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Hongzhi Wen <cartabio.coder1@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 21677b3 commit 4b349b6

9 files changed

Lines changed: 1498 additions & 22 deletions

File tree

app/memory_server.py

Lines changed: 289 additions & 13 deletions
Large diffs are not rendered by default.

config/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,18 @@ def translate_value(val):
11101110
- 命中阈值的条目仍保留 schema_version<2(不静默升版洗白),但被 filter
11111111
排除,让循环把名额匀给其它 v1 条目。dev 可读 logger.debug 看积压。"""
11121112

1113+
MEMORY_LIVENESS_MAX_ATTEMPTS = 5
1114+
"""LLM 终态失败 N 次后强推 progress marker / dead-letter 的统一上限。
1115+
- 适用场景:所有"同点 input + 无 counter + LLM 永久失败 → 永久卡死"的后台
1116+
路径。包括 signal extraction path A/B、rebuttal feedback、persona
1117+
corrections resolve、fact dedup resolve、refine cluster、outbox handler。
1118+
- 治理思路:参考 `MEMORY_RECHECK_MAX_ATTEMPTS` (schema 重判 dead-letter) 的
1119+
套路,把"同一 cursor / 队头 / cluster_hash / op 反复打 LLM"收敛掉,避免
1120+
毒窗口 / 毒 payload 让整条 pipeline 哑火。
1121+
- 失败定义:LLM 返 None / 抛异常 / handler raise / parse 失败等终态。
1122+
- 5 跟 `MEMORY_RECHECK_MAX_ATTEMPTS` 同口径——按 40s 一轮算 3 分钟级窗口,
1123+
跨过偶发 transient failure 够用;再多就属于真正 poison。"""
1124+
11131125
# ---- Memory: followup picker (memory/reflection.py) ─
11141126
REFLECTION_FOLLOWUP_WEIGHTED = True
11151127
"""主动搭话 followup 候选采样是否按 evidence_score 加权随机。

memory/fact_dedup.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from typing import TYPE_CHECKING
4444

4545
from memory.embeddings import cosine_similarity
46+
from memory.facts import safe_int_field
4647
from utils.cloudsave_runtime import MaintenanceModeError, assert_cloudsave_writable
4748
from utils.file_utils import (
4849
atomic_write_json_async,
@@ -375,6 +376,7 @@ async def aresolve(self, name: str) -> int:
375376
return await self._aresolve_locked(name)
376377

377378
async def _aresolve_locked(self, name: str) -> int:
379+
from config import MEMORY_LIVENESS_MAX_ATTEMPTS
378380
from config.prompts.prompts_memory import get_fact_dedup_prompt
379381
from utils.language_utils import get_global_language
380382
from utils.llm_client import create_chat_llm
@@ -384,7 +386,18 @@ async def _aresolve_locked(self, name: str) -> int:
384386
if not pending:
385387
return 0
386388

387-
batch = pending[:FACT_DEDUP_BATCH_LIMIT]
389+
# Liveness:过滤已达 MEMORY_LIVENESS_MAX_ATTEMPTS 的 dead-letter pair
390+
# (防御性——_abump_dedup_attempts_and_dead_letter_locked 命中阈值时直接
391+
# 从 queue 删除,正常路径不会让 attempts ≥ MAX 的 entry 还留着)。
392+
batch: list[dict] = []
393+
for it in pending:
394+
if safe_int_field(it, 'resolve_attempts') >= MEMORY_LIVENESS_MAX_ATTEMPTS:
395+
continue
396+
batch.append(it)
397+
if len(batch) >= FACT_DEDUP_BATCH_LIMIT:
398+
break
399+
if not batch:
400+
return 0
388401
pairs_text = "\n".join(
389402
f"[{i}] candidate: {item.get('candidate_text', '')}"
390403
f" | existing: {item.get('existing_text', '')}"
@@ -421,15 +434,38 @@ async def _aresolve_locked(self, name: str) -> int:
421434
"[FactDedup] %s: LLM 返回非数组 (%s),跳过本轮",
422435
name, type(results).__name__,
423436
)
437+
# Parse 失败也算 attempt(same input → same parse failure);
438+
# 跟 Exception 分支同治。
439+
await self._abump_dedup_attempts_and_dead_letter_locked(name, batch)
424440
return 0
425441
except Exception as e:
426442
logger.warning("[FactDedup] %s: LLM 调用失败: %s", name, e)
443+
# Liveness 兜底:给本批 pair bump resolve_attempts;达
444+
# MEMORY_LIVENESS_MAX_ATTEMPTS 的 entry 从 queue dead-letter
445+
# 丢弃。否则毒 pair(safety filter / prompt 过长 / 永远 parse
446+
# 不出来)一直占队头让 dedup 永久卡死。caller (aresolve) 已持
447+
# 着 _get_alock,这里走 _locked 变体不再重复获取。
448+
await self._abump_dedup_attempts_and_dead_letter_locked(name, batch)
427449
return 0
428450

429451
applied, processed_keys = await self._aapply_decisions(
430452
name, batch, results,
431453
)
432454

455+
# CodeRabbit: LLM 返了 list 但 ``_aapply_decisions`` 没消费任何 pair
456+
# (所有 action 都被 reject = unknown action / missing index / invalid
457+
# format 等),processed_keys 为空 → 下面的 ``remaining`` filter 不会
458+
# 删任何东西 → 队头同一批 pair 下次 tick 重新喂 LLM 同样输出垃圾 →
459+
# 永久卡死。算 attempts 一次(跟 LLM Exception / 非 list 同治)。
460+
if not processed_keys:
461+
logger.warning(
462+
"[FactDedup] %s: LLM 输出 %d 条 action 全部无效(unknown action / "
463+
"invalid index / conflict), batch 无任何 pair 消费,按 attempt 失败计",
464+
name, len(results),
465+
)
466+
await self._abump_dedup_attempts_and_dead_letter_locked(name, batch)
467+
return 0
468+
433469
# Read-modify-write the queue so concurrent enqueue calls
434470
# that landed during the LLM call survive — same shape as
435471
# PersonaManager._resolve_corrections_locked's processed-keys
@@ -463,6 +499,55 @@ async def _aresolve_locked(self, name: str) -> int:
463499
)
464500
return applied
465501

502+
async def _abump_dedup_attempts_and_dead_letter_locked(
503+
self, name: str, batch_items: list[dict],
504+
) -> None:
505+
"""aresolve LLM 失败时的 liveness 兜底(caller MUST hold _get_alock)。
506+
507+
给本批 pending pair bump ``resolve_attempts``;累计 ≥
508+
``MEMORY_LIVENESS_MAX_ATTEMPTS`` 的 pair 直接从 queue 删除并 WARN。
509+
510+
Why: 毒 pair(LLM 永远 parse 不出 / safety filter / prompt 过长)让
511+
队头每个 tick 都被送进同样 prompt 同样失败 → 整条 dedup pipeline 永久
512+
卡死该角色。caller 已持着 _get_alock,所以不再 async with;这跟
513+
``_aresolve_locked`` 里 ``_aapply_decisions`` / ``aload_pending`` /
514+
``_asave_pending`` 全在 lock 内同一规则。
515+
"""
516+
from config import MEMORY_LIVENESS_MAX_ATTEMPTS
517+
if not batch_items:
518+
return
519+
bumped_keys = {
520+
(it.get('candidate_id'), it.get('existing_id')) for it in batch_items
521+
}
522+
bumped_keys.discard((None, None))
523+
if not bumped_keys:
524+
return
525+
current = await self.aload_pending(name)
526+
kept: list[dict] = []
527+
dropped = 0
528+
for it in current:
529+
key = (it.get('candidate_id'), it.get('existing_id'))
530+
if key in bumped_keys:
531+
new_attempts = safe_int_field(it, 'resolve_attempts') + 1
532+
if new_attempts >= MEMORY_LIVENESS_MAX_ATTEMPTS:
533+
dropped += 1
534+
logger.warning(
535+
"[FactDedup] %s: dead-letter pair (%s, %s) resolve %d 次失败 ≥ %d,丢弃",
536+
name, key[0], key[1], new_attempts, MEMORY_LIVENESS_MAX_ATTEMPTS,
537+
)
538+
continue
539+
it['resolve_attempts'] = new_attempts
540+
kept.append(it)
541+
if not await self._asave_pending(name, kept):
542+
logger.debug(
543+
"[FactDedup] %s: 维护态跳过 dedup attempts 写盘", name,
544+
)
545+
elif dropped:
546+
logger.info(
547+
"[FactDedup] %s: dead-letter 丢弃 %d 对 dedup pair,剩余队列 %d 条",
548+
name, dropped, len(kept),
549+
)
550+
466551
# Whitelist of action vocabulary the LLM may return. Anything
467552
# outside this set (case mismatch, trailing whitespace, localised
468553
# synonym) is treated as malformed and the queue entry is

memory/facts.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,29 @@ def safe_importance(f: dict, default: int = 5) -> int:
7474
return default
7575

7676

77+
def safe_int_field(d: dict, key: str, default: int = 0) -> int:
78+
"""Defensively coerce ``d[key]`` to int (Codex P2 on PR #1412)。
79+
80+
Liveness attempt counters (``refine_attempts`` / ``resolve_attempts`` /
81+
``_attempt_count``) 都从 JSON / ndjson 反序列化出来的 dict 字段读,
82+
一旦 manual edit / legacy / migration noise 写进 ``""`` / ``"unknown"``
83+
/ list / dict 等脏值,原 ``int(d.get(key, 0) or 0)`` 会抛 ValueError /
84+
TypeError 让整个 list comprehension(候选 gather)挂掉 → 那条 pass
85+
永久 fail → liveness 兜底自己变了新的 liveness 缺口。
86+
87+
跟 ``safe_importance`` 的区别:本 helper 把 ``0`` / ``"0"`` 当合法值返回 0
88+
(attempt counter 0 是合法计数),不退 default。``safe_importance`` 把
89+
falsy 都退 default 是 importance-specific 语义。
90+
"""
91+
try:
92+
val = d.get(key)
93+
if val is None:
94+
return default
95+
return int(val)
96+
except (ValueError, TypeError):
97+
return default
98+
99+
77100
class FactExtractionFailed(RuntimeError):
78101
"""Stage-1 LLM call exhausted retries (RFC §3.4.2 末段).
79102

memory/outbox.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ def append_done(self, name: str, op_id: str) -> None:
133133
with self._get_lock(name):
134134
self._write_line(self._outbox_path(name), line)
135135

136+
def append_attempt(self, name: str, op_id: str) -> None:
137+
"""记录一次 handler 失败 attempt(Site 7 liveness 兜底)。
138+
139+
scan 时按 op_id 累计 attempt 数;caller (memory_server._run_outbox_op)
140+
见累计 ≥ ``MEMORY_LIVENESS_MAX_ATTEMPTS`` 时 append_done 当
141+
dead-letter 放弃该 op。否则毒 op(payload 触发 handler 永久 raise)
142+
每次重启都重跑、永远不出 pending → ``compact`` 永久阻塞 →
143+
outbox.ndjson 线性增长。
144+
"""
145+
record = {
146+
'op_id': op_id,
147+
'status': 'attempt',
148+
'ts': datetime.now().isoformat(),
149+
}
150+
line = json.dumps(record, ensure_ascii=False)
151+
with self._get_lock(name):
152+
self._write_line(self._outbox_path(name), line)
153+
136154
# ── scan ────────────────────────────────────────────────────
137155

138156
def _read_all_records(self, path: str) -> list[dict]:
@@ -154,12 +172,19 @@ def _read_all_records(self, path: str) -> list[dict]:
154172
return records
155173

156174
def pending_ops(self, name: str) -> list[dict]:
157-
"""返回 pending 且无对应 done 的 op 记录(按登记顺序)。"""
175+
"""返回 pending 且无对应 done 的 op 记录(按登记顺序)。
176+
177+
每条返回的 record 会附带非持久化字段 ``_attempt_count``(int),
178+
scan 时统计的 ``status='attempt'`` 行数。caller 用它判 dead-letter
179+
阈值。返回 dict 是 ``_read_all_records`` 当轮 JSON-load 出的新实例,
180+
附 ``_attempt_count`` 不会污染磁盘上的 pending 行。
181+
"""
158182
path = self._outbox_path(name)
159183
with self._get_lock(name):
160184
records = self._read_all_records(path)
161185

162186
pending: dict[str, dict] = {}
187+
attempts: dict[str, int] = {}
163188
for rec in records:
164189
op_id = rec.get('op_id')
165190
status = rec.get('status')
@@ -172,20 +197,32 @@ def pending_ops(self, name: str) -> list[dict]:
172197
pending[op_id] = rec
173198
elif status == 'done':
174199
pending.pop(op_id, None)
200+
attempts.pop(op_id, None)
201+
elif status == 'attempt':
202+
attempts[op_id] = attempts.get(op_id, 0) + 1
203+
for op_id, rec in pending.items():
204+
rec['_attempt_count'] = attempts.get(op_id, 0)
175205
return list(pending.values())
176206

177207
# ── compact ─────────────────────────────────────────────────
178208

179209
def compact(self, name: str) -> int:
180-
"""重写 outbox.ndjson,只保留未完成的 pending 行。返回丢弃行数。
210+
"""重写 outbox.ndjson,只保留未完成的 pending 行 + 它们的 attempt 行。
211+
返回丢弃行数。
181212
182213
通过 atomic_write_text 原子替换。compact 期间被 lock 阻塞的 append
183214
会在 rename 完成后继续到新文件。
215+
216+
Attempt 行处理(Site 7 liveness):still-pending 的 op 的 attempt
217+
行保留(attempt 计数 → 决定 dead-letter 时机的依据,丢了会让重启后
218+
计数器归零);done 的 op 把它对应的 attempt 行也一并丢(done 后就
219+
没有人再读 attempt 计数)。
184220
"""
185221
path = self._outbox_path(name)
186222
with self._get_lock(name):
187223
records = self._read_all_records(path)
188224
pending: dict[str, dict] = {}
225+
attempts_by_op: dict[str, list[dict]] = {}
189226
for rec in records:
190227
op_id = rec.get('op_id')
191228
status = rec.get('status')
@@ -195,9 +232,19 @@ def compact(self, name: str) -> int:
195232
pending[op_id] = rec
196233
elif status == 'done':
197234
pending.pop(op_id, None)
235+
attempts_by_op.pop(op_id, None)
236+
elif status == 'attempt':
237+
attempts_by_op.setdefault(op_id, []).append(rec)
238+
239+
kept_records: list[dict] = []
240+
for rec in pending.values():
241+
kept_records.append(rec)
242+
for op_id, attempt_recs in attempts_by_op.items():
243+
if op_id in pending:
244+
kept_records.extend(attempt_recs)
198245

199246
total_lines = len(records)
200-
kept = len(pending)
247+
kept = len(kept_records)
201248
if total_lines == kept:
202249
return 0 # 没有可丢弃的行,避免无用 IO
203250

@@ -206,7 +253,7 @@ def compact(self, name: str) -> int:
206253
atomic_write_text(path, '', encoding='utf-8')
207254
else:
208255
body = '\n'.join(
209-
json.dumps(r, ensure_ascii=False) for r in pending.values()
256+
json.dumps(r, ensure_ascii=False) for r in kept_records
210257
) + '\n'
211258
atomic_write_text(path, body, encoding='utf-8')
212259
return total_lines - kept
@@ -237,6 +284,9 @@ async def aappend_pending(self, name: str, op_type: str, payload: dict) -> str:
237284
async def aappend_done(self, name: str, op_id: str) -> None:
238285
await asyncio.to_thread(self.append_done, name, op_id)
239286

287+
async def aappend_attempt(self, name: str, op_id: str) -> None:
288+
await asyncio.to_thread(self.append_attempt, name, op_id)
289+
240290
async def apending_ops(self, name: str) -> list[dict]:
241291
return await asyncio.to_thread(self.pending_ops, name)
242292

0 commit comments

Comments
 (0)