Skip to content

Commit 80b7d17

Browse files
committed
性能优化:优化 A_Memorix WebImport 并发导入
1 parent 10211f0 commit 80b7d17

7 files changed

Lines changed: 594 additions & 130 deletions

File tree

pytests/A_memorix_test/test_web_import_manager_payloads.py

Lines changed: 147 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pathlib import Path
22
from types import SimpleNamespace
33

4+
import asyncio
45
import numpy as np
56
import pytest
67

@@ -18,6 +19,8 @@ def __init__(self) -> None:
1819
self.paragraphs: list[dict[str, object]] = []
1920
self.entities: list[str] = []
2021
self.relations: list[tuple[str, str, str]] = []
22+
self.paragraph_backfills: list[tuple[str, str]] = []
23+
self.relation_vector_states: list[tuple[str, str, str | None, bool]] = []
2124

2225
def add_paragraph(self, **kwargs):
2326
self.paragraphs.append(dict(kwargs))
@@ -33,8 +36,17 @@ def add_relation(self, *, subject: str, predicate: str, obj: str, **kwargs) -> s
3336
self.relations.append((subject, predicate, obj))
3437
return f"relation-{len(self.relations)}"
3538

36-
def set_relation_vector_state(self, rel_hash: str, state: str) -> None:
37-
del rel_hash, state
39+
def set_relation_vector_state(
40+
self,
41+
rel_hash: str,
42+
state: str,
43+
error: str | None = None,
44+
bump_retry: bool = False,
45+
) -> None:
46+
self.relation_vector_states.append((rel_hash, state, error, bump_retry))
47+
48+
def enqueue_paragraph_vector_backfill(self, paragraph_hash: str, *, error: str = "") -> None:
49+
self.paragraph_backfills.append((paragraph_hash, error))
3850

3951
def get_live_paragraphs_by_source(self, source: str):
4052
return [
@@ -58,32 +70,60 @@ def add_edges(self, edges, relation_hashes=None):
5870

5971

6072
class _DummyVectorStore:
73+
def __init__(self) -> None:
74+
self.ids: list[str] = []
75+
6176
def __contains__(self, item: str) -> bool:
62-
del item
63-
return False
77+
return item in self.ids
6478

6579
def add(self, vectors, ids):
66-
del vectors, ids
80+
del vectors
81+
self.ids.extend(list(ids))
6782

6883

6984
class _DummyEmbeddingManager:
85+
def __init__(self, *, delay: float = 0.0, fail_for: str = "") -> None:
86+
self.delay = delay
87+
self.fail_for = fail_for
88+
self.inflight = 0
89+
self.max_inflight = 0
90+
self.calls: list[str] = []
91+
7092
async def encode(self, text: str) -> np.ndarray:
71-
del text
93+
self.calls.append(text)
94+
self.inflight += 1
95+
self.max_inflight = max(self.max_inflight, self.inflight)
96+
try:
97+
if self.delay:
98+
await asyncio.sleep(self.delay)
99+
if self.fail_for and self.fail_for in text:
100+
raise RuntimeError("embedding failed")
101+
finally:
102+
self.inflight -= 1
72103
return np.ones(4, dtype=np.float32)
73104

74105

75-
def _build_manager() -> tuple[ImportTaskManager, _DummyMetadataStore]:
106+
def _build_manager(
107+
*,
108+
embedding_manager: _DummyEmbeddingManager | None = None,
109+
relation_vectorization_enabled: bool = False,
110+
) -> tuple[ImportTaskManager, _DummyMetadataStore]:
76111
metadata_store = _DummyMetadataStore()
112+
config = {
113+
"retrieval.relation_vectorization": {
114+
"enabled": relation_vectorization_enabled,
115+
"write_on_import": relation_vectorization_enabled,
116+
}
117+
}
77118
plugin = SimpleNamespace(
78119
metadata_store=metadata_store,
79120
graph_store=_DummyGraphStore(),
80121
vector_store=_DummyVectorStore(),
81-
embedding_manager=_DummyEmbeddingManager(),
122+
embedding_manager=embedding_manager or _DummyEmbeddingManager(),
82123
relation_write_service=None,
83-
get_config=lambda key, default=None: default,
124+
get_config=lambda key, default=None: config.get(key, default),
84125
_is_embedding_degraded=lambda: False,
85126
_allow_metadata_only_write=lambda: True,
86-
write_paragraph_vector_or_enqueue=None,
87127
)
88128
manager = ImportTaskManager(plugin)
89129
return manager, metadata_store
@@ -260,3 +300,100 @@ async def test_persist_processed_chunk_skips_invalid_nested_items() -> None:
260300
assert len(metadata_store.paragraphs) == 1
261301
assert set(metadata_store.entities) >= {"Alice", "地图"}
262302
assert metadata_store.relations == [("Alice", "持有", "地图")]
303+
304+
305+
@pytest.mark.asyncio
306+
async def test_persist_processed_chunk_does_not_hold_storage_lock_during_embedding() -> None:
307+
embedding_manager = _DummyEmbeddingManager(delay=0.05)
308+
manager, metadata_store = _build_manager(embedding_manager=embedding_manager)
309+
file_record = SimpleNamespace(source_path="", source_kind="paste", name="demo.txt")
310+
311+
await asyncio.gather(
312+
manager._persist_processed_chunk(
313+
file_record,
314+
ProcessedChunk(
315+
type=KnowledgeType.FACTUAL,
316+
source=SourceInfo(file="demo.txt", offset_start=0, offset_end=4),
317+
chunk=ChunkContext(chunk_id="chunk-1", index=0, text="第一段事实"),
318+
data={},
319+
),
320+
),
321+
manager._persist_processed_chunk(
322+
file_record,
323+
ProcessedChunk(
324+
type=KnowledgeType.FACTUAL,
325+
source=SourceInfo(file="demo.txt", offset_start=5, offset_end=9),
326+
chunk=ChunkContext(chunk_id="chunk-2", index=1, text="第二段事实"),
327+
data={},
328+
),
329+
),
330+
)
331+
332+
assert len(metadata_store.paragraphs) == 2
333+
assert embedding_manager.max_inflight == 2
334+
335+
336+
@pytest.mark.asyncio
337+
async def test_relation_vector_failure_keeps_metadata_and_marks_failed() -> None:
338+
manager, metadata_store = _build_manager(
339+
embedding_manager=_DummyEmbeddingManager(fail_for="关系是持有"),
340+
relation_vectorization_enabled=True,
341+
)
342+
343+
relation_hash = await manager._add_relation("Alice", "持有", "地图", source_paragraph="paragraph-1")
344+
345+
assert relation_hash == "relation-1"
346+
assert metadata_store.relations == [("Alice", "持有", "地图")]
347+
assert ("relation-1", "pending", None, False) in metadata_store.relation_vector_states
348+
assert metadata_store.relation_vector_states[-1] == ("relation-1", "failed", "embedding failed", True)
349+
350+
351+
@pytest.mark.asyncio
352+
async def test_high_concurrency_persist_processed_chunks_keep_all_writes_consistent() -> None:
353+
chunk_count = 60
354+
relations_per_chunk = 2
355+
entities_per_chunk = 5
356+
embedding_manager = _DummyEmbeddingManager(delay=0.001)
357+
manager, metadata_store = _build_manager(
358+
embedding_manager=embedding_manager,
359+
relation_vectorization_enabled=True,
360+
)
361+
file_record = SimpleNamespace(source_path="", source_kind="paste", name="stress.txt")
362+
363+
async def persist(index: int) -> None:
364+
await manager._persist_processed_chunk(
365+
file_record,
366+
ProcessedChunk(
367+
type=KnowledgeType.FACTUAL,
368+
source=SourceInfo(file="stress.txt", offset_start=index * 10, offset_end=index * 10 + 9),
369+
chunk=ChunkContext(chunk_id=f"chunk-{index}", index=index, text=f"第 {index} 段高并发事实"),
370+
data={
371+
"triples": [
372+
{"subject": f"subject-{index}-a", "predicate": "关联", "object": f"object-{index}-a"},
373+
],
374+
"relations": [
375+
{"subject": f"subject-{index}-b", "predicate": "包含", "object": f"object-{index}-b"},
376+
],
377+
"entities": [f"marker-{index}"],
378+
},
379+
),
380+
)
381+
382+
await asyncio.wait_for(
383+
asyncio.gather(*(persist(index) for index in range(chunk_count))),
384+
timeout=15,
385+
)
386+
387+
vector_ids = set(manager.plugin.vector_store.ids)
388+
ready_states = [state for _, state, _, _ in metadata_store.relation_vector_states if state == "ready"]
389+
failed_states = [state for _, state, _, _ in metadata_store.relation_vector_states if state == "failed"]
390+
391+
assert len(metadata_store.paragraphs) == chunk_count
392+
assert len(metadata_store.relations) == chunk_count * relations_per_chunk
393+
assert len(manager.plugin.graph_store.edges) == chunk_count * relations_per_chunk
394+
assert len({paragraph["source"] for paragraph in metadata_store.paragraphs}) == 1
395+
assert len(vector_ids) == chunk_count * (1 + entities_per_chunk + relations_per_chunk)
396+
assert len(ready_states) == chunk_count * relations_per_chunk
397+
assert failed_states == []
398+
assert metadata_store.paragraph_backfills == []
399+
assert embedding_manager.max_inflight > 1

src/A_memorix/CONFIG_REFERENCE.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ max_paste_chars = 200000
107107
default_file_concurrency = 2
108108
default_chunk_concurrency = 4
109109

110+
[web.import.timeout]
111+
llm_call_seconds = 240
112+
process_poll_seconds = 1
113+
process_terminate_seconds = 5
114+
process_kill_seconds = 3
115+
convert_preflight_seconds = 20
116+
110117
[web.tuning]
111118
enabled = true
112119
max_queue_size = 8
@@ -121,7 +128,7 @@ default_sample_size = 24
121128

122129
- 长期记忆控制台:适合修改高频项,例如 embedding、检索、Episode、人物画像、导入与调优的常用开关。
123130
- 原始 TOML:适合复制整份配置、批量调整参数,或修改未在可视化表单中展示的高级项。
124-
- raw-only 高级项仍包括:`retrieval.fusion.*``retrieval.search.relation_intent.*``retrieval.search.graph_recall.*``retrieval.search.posterior_graph.*``retrieval.aggregate.*``memory.orphan.*``advanced.extraction_model``web.import.llm_retry.*``web.import.path_aliases``web.import.convert.*``web.tuning.llm_retry.*``web.tuning.eval_query_timeout_seconds`
131+
- raw-only 高级项仍包括:`retrieval.fusion.*``retrieval.search.relation_intent.*``retrieval.search.graph_recall.*``retrieval.search.posterior_graph.*``retrieval.aggregate.*``memory.orphan.*``advanced.extraction_model``web.import.llm_retry.*``web.import.timeout.*``web.import.path_aliases``web.import.convert.*``web.tuning.llm_retry.*``web.tuning.eval_query_timeout_seconds`
125132

126133
## 1. 存储与嵌入
127134

@@ -339,6 +346,14 @@ chats = ["group:123", "user:456", "stream:abc"]
339346
- `web.import.max_chunk_concurrency` (默认 `12`)
340347
- `web.import.poll_interval_ms` (默认 `1000`)
341348

349+
### 超时
350+
351+
- `web.import.timeout.llm_call_seconds` (默认 `240``0` 表示不额外限制)
352+
- `web.import.timeout.process_poll_seconds` (默认 `1`)
353+
- `web.import.timeout.process_terminate_seconds` (默认 `5`)
354+
- `web.import.timeout.process_kill_seconds` (默认 `3`)
355+
- `web.import.timeout.convert_preflight_seconds` (默认 `20`)
356+
342357
### 重试与路径
343358

344359
- `web.import.llm_retry.max_attempts` (默认 `4`)

src/A_memorix/QUICK_START.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ max_paste_chars = 200000
132132
default_file_concurrency = 2
133133
default_chunk_concurrency = 4
134134

135+
[web.import.timeout]
136+
llm_call_seconds = 240
137+
process_poll_seconds = 1
138+
process_terminate_seconds = 5
139+
process_kill_seconds = 3
140+
convert_preflight_seconds = 20
141+
135142
[web.tuning]
136143
enabled = true
137144
max_queue_size = 8

src/A_memorix/config_schema.json

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"sections": [
5757
"advanced",
5858
"web.import",
59+
"web.import.timeout",
5960
"web.tuning"
6061
],
6162
"order": 5
@@ -1275,7 +1276,7 @@
12751276
"description": "控制检索调优任务的队列与默认策略。",
12761277
"icon": null,
12771278
"collapsed": false,
1278-
"order": 15,
1279+
"order": 16,
12791280
"fields": {
12801281
"enabled": {
12811282
"name": "enabled",
@@ -1396,6 +1397,101 @@
13961397
"choices": null
13971398
}
13981399
}
1400+
},
1401+
"web.import.timeout": {
1402+
"name": "web.import.timeout",
1403+
"title": "导入超时",
1404+
"description": "控制 Web Import 中 LLM 调用、迁移/转换子进程与预检步骤的等待时间。",
1405+
"icon": null,
1406+
"collapsed": false,
1407+
"order": 15,
1408+
"fields": {
1409+
"llm_call_seconds": {
1410+
"name": "llm_call_seconds",
1411+
"type": "number",
1412+
"default": 240,
1413+
"description": "单次 LLM 抽取调用超时时间,0 表示不额外限制。",
1414+
"label": "LLM 单次调用超时(秒)",
1415+
"ui_type": "number",
1416+
"required": false,
1417+
"hidden": false,
1418+
"disabled": false,
1419+
"order": 1,
1420+
"hint": "大量导入时用于避免单个上游请求长时间挂起。",
1421+
"min": 0,
1422+
"max": 3600,
1423+
"step": 1,
1424+
"choices": null
1425+
},
1426+
"process_poll_seconds": {
1427+
"name": "process_poll_seconds",
1428+
"type": "number",
1429+
"default": 1,
1430+
"description": "迁移或转换子进程状态轮询等待时间。",
1431+
"label": "子进程轮询等待(秒)",
1432+
"ui_type": "number",
1433+
"required": false,
1434+
"hidden": false,
1435+
"disabled": false,
1436+
"order": 2,
1437+
"hint": "值越小取消响应越快,但轮询更频繁。",
1438+
"min": 0.1,
1439+
"max": 60,
1440+
"step": 0.1,
1441+
"choices": null
1442+
},
1443+
"process_terminate_seconds": {
1444+
"name": "process_terminate_seconds",
1445+
"type": "number",
1446+
"default": 5,
1447+
"description": "取消任务时等待子进程正常终止的时间。",
1448+
"label": "子进程终止等待(秒)",
1449+
"ui_type": "number",
1450+
"required": false,
1451+
"hidden": false,
1452+
"disabled": false,
1453+
"order": 3,
1454+
"hint": "超时后会尝试强制结束子进程。",
1455+
"min": 0.1,
1456+
"max": 120,
1457+
"step": 0.1,
1458+
"choices": null
1459+
},
1460+
"process_kill_seconds": {
1461+
"name": "process_kill_seconds",
1462+
"type": "number",
1463+
"default": 3,
1464+
"description": "强制结束子进程后的等待时间。",
1465+
"label": "子进程强杀等待(秒)",
1466+
"ui_type": "number",
1467+
"required": false,
1468+
"hidden": false,
1469+
"disabled": false,
1470+
"order": 4,
1471+
"hint": "用于取消任务时回收转换/迁移子进程。",
1472+
"min": 0.1,
1473+
"max": 120,
1474+
"step": 0.1,
1475+
"choices": null
1476+
},
1477+
"convert_preflight_seconds": {
1478+
"name": "convert_preflight_seconds",
1479+
"type": "number",
1480+
"default": 20,
1481+
"description": "LPMM 转换依赖预检的超时时间。",
1482+
"label": "转换预检超时(秒)",
1483+
"ui_type": "number",
1484+
"required": false,
1485+
"hidden": false,
1486+
"disabled": false,
1487+
"order": 5,
1488+
"hint": "依赖环境较慢时可以适当调大。",
1489+
"min": 0.1,
1490+
"max": 600,
1491+
"step": 0.1,
1492+
"choices": null
1493+
}
1494+
}
13991495
}
14001496
}
14011497
}

0 commit comments

Comments
 (0)