perf:提高A_memorix的导入表现,优化后台任务#1752
Conversation
- 将 paragraph ngram 索引改为随段落写入、删除、复活路径增量维护 - sparse 检索加载与 fallback 不再触发全量 ngram 回填,索引未就绪时跳过 ngram fallback - 补充 paragraph ngram 增量维护测试,覆盖软删、物理删除、原子删除与读路径回归
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
Walkthrough本PR为段落ngram倒排索引与Web导入流程引入增量维护与超时配置管理:MetadataStore新增ngram就绪检查与增量维护接口,全量回填改进性能指标记录,所有段落生命周期操作维护计数一致性;Web导入统一集中超时配置、调整锁粒度实现嵌入并行、改进关系向量失败时的状态追踪;BM25改为响应式检查就绪状态而非主动回填;新增相应测试验证增量维护与并发语义。 Changes段落ngram索引增量维护与Web导入超时/并发优化
预估代码审查工作量🎯 4 (Complex) | ⏱️ ~60 分钟 Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
该 PR 旨在提升 A_Memorix 的 Web Import 导入性能与后台任务稳定性:通过减少写入锁持有时间、为导入流程提供更细粒度的超时配置,并将段落 ngram 索引从“读路径全量回填”调整为“写路径增量维护 + 读路径仅检查就绪”。
Changes:
- 新增
web.import.timeout.*配置项(LLM 调用/子进程轮询与终止/转换预检超时),并同步到 schema 与文档。 - 重构 Web Import 写入路径:拆分 storage lock 粒度、补充向量写入降级/回填逻辑、将若干固定
wait_for超时改为可配置。 - MetadataStore 增加 paragraph ngram 增量维护与就绪检查;SparseBM25 读路径不再触发全量回填;补充对应测试覆盖。
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/config/official_configs.py | 增加 AMemorixWebImportTimeoutConfig 并挂载到 web.import.timeout 配置树 |
| src/config/config.py | 配置版本号更新 |
| src/A_memorix/QUICK_START.md | 文档示例增加 [web.import.timeout] |
| src/A_memorix/core/utils/web_import_manager.py | Web Import 写入/向量化/子进程等待逻辑重构,新增超时配置读取 |
| src/A_memorix/core/storage/metadata_store.py | paragraph ngram 索引增量维护、元信息记录与删除/恢复路径对齐 |
| src/A_memorix/core/retrieval/sparse_bm25.py | 读路径仅检查 ngram 索引是否就绪,不再触发全量回填 |
| src/A_memorix/config_schema.json | 新增 web.import.timeout 配置分组定义与展示顺序调整 |
| src/A_memorix/CONFIG_REFERENCE.md | 配置参考补充 web.import.timeout.* 说明 |
| pytests/A_memorix_test/test_web_import_manager_payloads.py | 增加并发与降级相关测试用例 |
| pytests/A_memorix_test/test_paragraph_ngram_incremental.py | 新增 ngram 增量维护与“读路径不回填”测试 |
Comments suppressed due to low confidence (1)
src/A_memorix/core/utils/web_import_manager.py:3457
_add_entity_with_vector在持有_storage_lock时直接执行hash_value in self.plugin.vector_store,并在后续无条件使用self.plugin.embedding_manager.encode/self.plugin.vector_store.add。但_ensure_embedding_runtime_ready()在allow_metadata_only_write开启时允许vector_store/embedding_manager缺失继续执行(self-check 返回runtime_components_missing也不会 raise)。此时这里会触发TypeError/AttributeError中断整个导入。建议像_write_paragraph_vector_or_enqueue一样先判空:组件缺失且允许 metadata-only 时仅写 metadata/graph 并跳过实体向量写入。
async with self._storage_lock:
hash_value = self.plugin.metadata_store.add_entity(name=name_token, source_paragraph=source_paragraph)
self.plugin.graph_store.add_nodes([name_token])
vector_exists = hash_value in self.plugin.vector_store
if not vector_exists:
try:
if self._is_embedding_degraded():
raise RuntimeError("embedding_degraded")
emb = await self.plugin.embedding_manager.encode(name_token)
self.plugin.vector_store.add(emb.reshape(1, -1), [hash_value])
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -497,9 +569,20 @@ async def _write_paragraph_vector_or_enqueue( | |||
| "detail": "embedding_degraded", | |||
| } | |||
|
|
|||
| if token in self.plugin.vector_store: | |||
| return { | |||
| "success": True, | |||
| "vector_written": True, | |||
| "queued": False, | |||
| "warning": "", | |||
| "detail": "vector_already_exists", | |||
| } | |||
|
|
|||
| try: | |||
| emb = await self.plugin.embedding_manager.encode(content) | |||
| self.plugin.vector_store.add(emb.reshape(1, -1), [paragraph_hash]) | |||
| emb = await self.plugin.embedding_manager.encode(text) | |||
| if getattr(emb, "ndim", 1) == 1: | |||
| emb = emb.reshape(1, -1) | |||
| self.plugin.vector_store.add(emb, [token]) | |||
| return { | |||
| "success": True, | |||
| "vector_written": True, | |||
| @@ -510,13 +593,13 @@ async def _write_paragraph_vector_or_enqueue( | |||
| except Exception as exc: | |||
| if not self._allow_metadata_only_write(): | |||
| raise | |||
| self._enqueue_paragraph_backfill(paragraph_hash, error=str(exc)) | |||
| await self._enqueue_paragraph_backfill_locked(token, error=str(exc)) | |||
| return { | |||
| "success": True, | |||
| "vector_written": False, | |||
| "queued": True, | |||
| "warning": "vector_degraded_write", | |||
| "detail": str(exc), | |||
| "detail": f"{str(context or 'paragraph')} vector write failed: {exc}", | |||
| } | |||
|
|
|||
| async with self._storage_lock: | ||
| rel_hash = self.plugin.metadata_store.add_relation( | ||
| subject=subject_token, | ||
| predicate=predicate_token, | ||
| obj=object_token, | ||
| confidence=1.0, | ||
| source_paragraph=source_paragraph, | ||
| write_vector=write_vector, | ||
| ) | ||
| return result.hash_value | ||
|
|
||
| rel_hash = self.plugin.metadata_store.add_relation( | ||
| subject=subject_token, | ||
| predicate=predicate_token, | ||
| obj=object_token, | ||
| source_paragraph=source_paragraph, | ||
| confidence=1.0, | ||
| ) | ||
| self.plugin.graph_store.add_edges([(subject_token, object_token)], relation_hashes=[rel_hash]) | ||
| self.plugin.graph_store.add_edges([(subject_token, object_token)], relation_hashes=[rel_hash]) | ||
| if not write_vector: | ||
| try: | ||
| self.plugin.metadata_store.set_relation_vector_state(rel_hash, "none") | ||
| except Exception: | ||
| pass | ||
| return rel_hash | ||
| vector_exists = rel_hash in self.plugin.vector_store | ||
| self.plugin.metadata_store.set_relation_vector_state(rel_hash, "ready" if vector_exists else "pending") | ||
|
|
||
| if vector_exists: | ||
| return rel_hash | ||
|
|
||
| try: | ||
| self.plugin.metadata_store.set_relation_vector_state(rel_hash, "none") | ||
| except Exception: | ||
| pass | ||
| relation_service = getattr(self.plugin, "relation_write_service", None) | ||
| if relation_service is not None: | ||
| vector_text = relation_service.build_relation_vector_text(subject_token, predicate_token, object_token) | ||
| else: | ||
| vector_text = f"{subject_token} {predicate_token} {object_token}\n{subject_token}和{object_token}的关系是{predicate_token}" | ||
| emb = await self.plugin.embedding_manager.encode(vector_text) | ||
| self.plugin.vector_store.add(emb.reshape(1, -1), [rel_hash]) | ||
| await self._set_relation_vector_state_locked(rel_hash, "ready") |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/A_memorix/core/storage/metadata_store.py (1)
7-19: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win导入分组顺序不符合项目规范
当前导入顺序与仓库规则不一致,建议按“
from ... import ...组在前、import ...组在后”,并保持组内字母序,同时本地模块放在第三方/标准库之后。As per coding guidelines, “
**/*.py: Import from standard libraries and third-party libraries should follow this order: (1)from ... import ...syntax imports first, (2) directimport ...syntax imports second.”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/A_memorix/core/storage/metadata_store.py` around lines 7 - 19, Reorder the import block in metadata_store.py so all "from ... import ..." lines come before plain "import ..." lines, and place local project imports (those from src. or ..) after standard-library/third-party imports; specifically put "from datetime import datetime", "from pathlib import Path", and "from typing import Any, Dict, List, Optional, Sequence, Tuple, Union" and the other from-style imports (e.g., "from src.common.logger import get_logger", "from ..utils.hash import compute_hash, normalize_text", "from ..utils.time_parser import normalize_time_meta") before the plain imports, then list plain imports (json, pickle, re, sqlite3, time, uuid) alphabetically within their group; ensure group ordering and alphabetical ordering are applied consistently.src/A_memorix/core/utils/web_import_manager.py (1)
2481-2494:⚠️ Potential issue | 🟠 Major | ⚡ Quick win预检超时/异常时应终止并回收 probe 子进程
src/A_memorix/core/utils/web_import_manager.py中asyncio.wait_for(probe.communicate(), ...)一旦超时或communicate()抛错就直接return False,但未终止/回收probe,可能导致后台子进程残留并占用资源。失败返回前应在对应异常/超时分支里调用现有的进程终止与wait回收逻辑(或将清理放到finally中),确保子进程一定被结束并释放句柄。🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/A_memorix/core/utils/web_import_manager.py` around lines 2481 - 2494, The probe subprocess created by asyncio.create_subprocess_exec (variable probe) isn't cleaned up on asyncio.wait_for timeout or other exceptions; update the try/except to ensure probe is terminated and awaited before returning (or move cleanup into a finally block). Specifically, after creating probe and before any early return in the except/timeout branches, call probe.kill() (or probe.terminate() as appropriate) and await probe.wait() to reap the process, guarding with a check that probe is not None (handle creation failure separately); handle asyncio.TimeoutError distinctly if desired and use the same cleanup logic, and ensure stdout/stderr pipes are released by awaiting communicate/wait as part of the cleanup.
🧹 Nitpick comments (1)
pytests/A_memorix_test/test_web_import_manager_payloads.py (1)
22-23: ⚡ Quick win新增测试里的泛型注解请统一成
typing版本。这几处新增注解用了
list[...]/tuple[...],和仓库的 Python 注解规范不一致。这里统一改成List[...]/Tuple[...]/Optional[...]更稳妥。As per coding guidelines, "For parameterized generics, use type annotations from the
typingmodule to specify the types of generic parameters, such asList[int]for a list of integers orDict[str, Any]for a dictionary with string keys and any-type values."Also applies to: 73-75, 85-90, 106-110
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pytests/A_memorix_test/test_web_import_manager_payloads.py` around lines 22 - 23, The new test uses PEP 585-style built-in generics (e.g., list[...] / tuple[...]) which conflicts with our repo typing convention; update the annotations for paragraph_backfills and relation_vector_states (and the other new annotated variables in this test file) to use typing module generics instead—import List, Tuple, Optional (and Any if needed) from typing and change list[...] -> List[...], tuple[...] -> Tuple[...], and use Optional[...] for nullable types so signatures like paragraph_backfills: List[Tuple[str, str]] and relation_vector_states: List[Tuple[str, str, Optional[str], bool]] follow the repo standard.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pytests/A_memorix_test/test_paragraph_ngram_incremental.py`:
- Around line 98-106: The test's finally block only closes the metadata store
but not the SparseBM25Index connection created by ensure_loaded(); update the
finally to first check for the index instance and call index.unload() (or
index.unload() only if ensure_loaded() returned true/if index is not None)
before calling store.close() to explicitly release the index resources and avoid
file/connection locks between tests.
In `@src/A_memorix/core/storage/metadata_store.py`:
- Around line 1639-1646: The is_paragraph_ngram_ready function currently
converts row[0] to int but only catches sqlite3.OperationalError; update
is_paragraph_ngram_ready to also catch ValueError and TypeError (same fallback
as _get_paragraph_ngram_n_if_ready) and return False when conversion fails,
ensuring non-numeric paragraph_count metadata does not propagate an exception to
callers.
In `@src/A_memorix/core/utils/web_import_manager.py`:
- Around line 572-585: The _write_paragraph_vector_or_enqueue function has a
check-then-act race: it tests "if token in self.plugin.vector_store" then does
await self.plugin.embedding_manager.encode(...) and
self.plugin.vector_store.add(...), which can cause duplicate writes under
concurrency; fix by making the add atomic or serializing per-token (e.g.,
acquire a per-token asyncio.Lock stored on the manager before
checking/encoding/adding) and/or make add idempotent by catching duplicate-ID
errors from self.plugin.vector_store.add (e.g., catch ValueError or the
store-specific duplicate exception and treat it as "vector_already_exists"), and
ensure any subprocess or enqueue cleanup logic runs in the same
locked/exception-handling scope so state remains consistent.
- Around line 3504-3516: The except ValueError: handler currently always marks
the relation vector as "ready" even when VectorStore.add failed (e.g., dimension
mismatch); change it so that on ValueError you check whether the write actually
succeeded (for example test rel_hash membership via rel_hash in
self.plugin.vector_store or another explicit success check after
self.plugin.vector_store.add) and only call
self._set_relation_vector_state_locked(rel_hash, "ready") if the vector is
confirmed present; otherwise call
self._set_relation_vector_state_locked(rel_hash, "failed", error=str(exc),
bump_retry=True) (or leave as "pending") so failed writes increment retry and
preserve the error. Ensure you reference the same symbols:
relation_service/build_relation_vector_text,
self.plugin.embedding_manager.encode, self.plugin.vector_store.add, rel_hash and
self._set_relation_vector_state_locked.
---
Outside diff comments:
In `@src/A_memorix/core/storage/metadata_store.py`:
- Around line 7-19: Reorder the import block in metadata_store.py so all "from
... import ..." lines come before plain "import ..." lines, and place local
project imports (those from src. or ..) after standard-library/third-party
imports; specifically put "from datetime import datetime", "from pathlib import
Path", and "from typing import Any, Dict, List, Optional, Sequence, Tuple,
Union" and the other from-style imports (e.g., "from src.common.logger import
get_logger", "from ..utils.hash import compute_hash, normalize_text", "from
..utils.time_parser import normalize_time_meta") before the plain imports, then
list plain imports (json, pickle, re, sqlite3, time, uuid) alphabetically within
their group; ensure group ordering and alphabetical ordering are applied
consistently.
In `@src/A_memorix/core/utils/web_import_manager.py`:
- Around line 2481-2494: The probe subprocess created by
asyncio.create_subprocess_exec (variable probe) isn't cleaned up on
asyncio.wait_for timeout or other exceptions; update the try/except to ensure
probe is terminated and awaited before returning (or move cleanup into a finally
block). Specifically, after creating probe and before any early return in the
except/timeout branches, call probe.kill() (or probe.terminate() as appropriate)
and await probe.wait() to reap the process, guarding with a check that probe is
not None (handle creation failure separately); handle asyncio.TimeoutError
distinctly if desired and use the same cleanup logic, and ensure stdout/stderr
pipes are released by awaiting communicate/wait as part of the cleanup.
---
Nitpick comments:
In `@pytests/A_memorix_test/test_web_import_manager_payloads.py`:
- Around line 22-23: The new test uses PEP 585-style built-in generics (e.g.,
list[...] / tuple[...]) which conflicts with our repo typing convention; update
the annotations for paragraph_backfills and relation_vector_states (and the
other new annotated variables in this test file) to use typing module generics
instead—import List, Tuple, Optional (and Any if needed) from typing and change
list[...] -> List[...], tuple[...] -> Tuple[...], and use Optional[...] for
nullable types so signatures like paragraph_backfills: List[Tuple[str, str]] and
relation_vector_states: List[Tuple[str, str, Optional[str], bool]] follow the
repo standard.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 94d7e969-98df-4dbe-9306-2c1dc08fad05
📒 Files selected for processing (10)
pytests/A_memorix_test/test_paragraph_ngram_incremental.pypytests/A_memorix_test/test_web_import_manager_payloads.pysrc/A_memorix/CONFIG_REFERENCE.mdsrc/A_memorix/QUICK_START.mdsrc/A_memorix/config_schema.jsonsrc/A_memorix/core/retrieval/sparse_bm25.pysrc/A_memorix/core/storage/metadata_store.pysrc/A_memorix/core/utils/web_import_manager.pysrc/config/config.pysrc/config/official_configs.py
zh-CN目标翻译作为常规 GitHub 编辑面;常规翻译以 Crowdin ->l10n_*PR 回流为准,详见docs/i18n.md请填写以下内容
(删除掉中括号内的空格,并替换为小写的x)
main分支 禁止修改,请确认本次提交的分支 不是main分支src/A_memorix,我确认已阅读src/A_memorix/MODIFICATION_POLICY.md,不涉及则无需勾选其他信息
Summary by CodeRabbit
发布说明