Skip to content

Commit cbe7aca

Browse files
committed
fix(kb): address PR xorbitsai#202 review — harden replace_chunks and add ingestion lock
- Reject empty replace_scope to prevent unbounded deletion (comment 1) - Use merge_insert instead of table.add for idempotent upsert (comment 2) - Validate chunk_id presence before write to fail fast (comment 3) - Fix docstring to match insert-before-delete order (comment 4) - Cascade-delete orphaned embeddings rows after chunk replacement (comment 5) - Allow replace_chunks with empty records to clean stale data (comment 6) - Add per-document threading lock in process_document to prevent concurrent chunk replacement races (comment 7) - Update tests for merge_insert and add ingestion lock tests
1 parent 5f2b867 commit cbe7aca

6 files changed

Lines changed: 260 additions & 28 deletions

File tree

src/xagent/core/tools/core/RAG_tools/chunk/chunk_document.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,10 @@ def chunk_document(
8888
Chunk parsed paragraphs and write to chunks table.
8989
9090
Concurrency note:
91-
This function uses **last-write-wins** semantics. If two concurrent
92-
calls target the same (collection, doc_id, parse_hash) with different
93-
parameters, the last one to reach ``_write_chunks_to_db`` will replace
94-
the other's output. Callers requiring mutual exclusion should hold a
95-
collection-level lock before invoking this function.
91+
This function uses **last-write-wins** semantics. The ingestion
92+
pipeline serialises calls per (collection, source_path) via
93+
``_get_ingestion_lock``, so concurrent chunk replacement races are
94+
prevented at the pipeline level.
9695
9796
Args:
9897
collection: Collection name for data isolation
@@ -512,9 +511,6 @@ def _write_chunks_to_db(
512511
}
513512
rows.append(row)
514513

515-
if not rows:
516-
return False
517-
518514
vector_store = get_vector_index_store()
519515
vector_store.replace_chunks(
520516
rows,
@@ -527,6 +523,9 @@ def _write_chunks_to_db(
527523
is_admin=is_admin,
528524
)
529525

526+
if not rows:
527+
return False
528+
530529
logger.info(
531530
"Chunk records written to database: doc_id=%s, parse_hash=%s, config_hash=%s",
532531
doc_id,

src/xagent/core/tools/core/RAG_tools/pipelines/document_ingestion.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import asyncio
66
import logging
77
import os
8+
import threading
89
import time
910
from contextlib import contextmanager
1011
from pathlib import Path
@@ -62,6 +63,24 @@
6263

6364
logger = logging.getLogger(__name__)
6465

66+
# Per-document ingestion lock keyed by (collection, source_path).
67+
# Prevents concurrent ingestion of the same document which can cause
68+
# data loss in the chunk replacement flow (see PR #202 comment 7).
69+
_ingestion_locks: Dict[tuple, threading.Lock] = {}
70+
_ingestion_locks_guard = threading.Lock()
71+
72+
73+
def _get_ingestion_lock(collection: str, source_path: str) -> threading.Lock:
74+
"""Get or create a per-document threading lock."""
75+
key = (collection, source_path)
76+
if key in _ingestion_locks:
77+
return _ingestion_locks[key]
78+
with _ingestion_locks_guard:
79+
if key not in _ingestion_locks:
80+
_ingestion_locks[key] = threading.Lock()
81+
return _ingestion_locks[key]
82+
83+
6584
_SPREADSHEET_EXTENSIONS = {".xlsx", ".xls", ".csv"}
6685
_SPREADSHEET_CHUNK_SIZE_TOKENS = 512
6786
_SPREADSHEET_CHUNK_OVERLAP_TOKENS = 64
@@ -469,7 +488,33 @@ def process_document(
469488
inputs will reuse existing records when possible.
470489
- Downstream API layers should surface `result.failed_step` and
471490
`result.warnings` to callers for better observability.
491+
- A per-document lock serialises concurrent calls for the same
492+
(collection, source_path) to prevent chunk replacement races.
472493
"""
494+
lock = _get_ingestion_lock(collection, source_path)
495+
with lock:
496+
return _process_document_impl(
497+
collection,
498+
source_path,
499+
config=config,
500+
progress_manager=progress_manager,
501+
user_id=user_id,
502+
is_admin=is_admin,
503+
file_id=file_id,
504+
)
505+
506+
507+
def _process_document_impl(
508+
collection: str,
509+
source_path: str,
510+
*,
511+
config: Optional[IngestionConfig] = None,
512+
progress_manager: Optional[ProgressManager] = None,
513+
user_id: Optional[int] = None,
514+
is_admin: bool = False,
515+
file_id: Optional[str] = None,
516+
) -> IngestionResult:
517+
"""Internal implementation of process_document (runs under per-document lock)."""
473518
cfg = _apply_spreadsheet_ingestion_safeguards(
474519
coerce_ingestion_config(config),
475520
source_path,

src/xagent/core/tools/core/RAG_tools/storage/contracts.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -722,19 +722,31 @@ def replace_chunks(
722722
user_id: Optional[int] = None,
723723
is_admin: bool = False,
724724
) -> None:
725-
"""Replace chunk records within a scope (delete old + insert new).
725+
"""Replace chunk records within a scope (insert new, then delete old).
726726
727-
Deletes all existing chunk rows matching *replace_scope* (and tenancy
728-
filters), then inserts *records*. This guarantees that re-chunking with
729-
different parameters does not leave stale rows from a previous
730-
configuration (issue #199).
727+
Inserts *records* first via idempotent merge_insert, then deletes rows
728+
matching *replace_scope* that do not belong to the new generation.
729+
This insert-before-delete order avoids data loss if the process crashes
730+
between the two operations (worst case: brief duplicate data, not zero
731+
data). Guarantees that re-chunking with different parameters does not
732+
leave stale rows from a previous configuration (issue #199).
733+
734+
After updating the chunks table, cascade-deletes orphaned rows from
735+
all ``embeddings_*`` tables whose chunk_id no longer belongs to the
736+
new generation, preventing stale embeddings from appearing in search
737+
results.
731738
732739
Args:
733-
records: New chunk records to insert after deletion.
734-
replace_scope: Filter dict (e.g. collection, doc_id, parse_hash)
735-
identifying rows to delete before inserting.
740+
records: New chunk records to insert. Each record must contain a
741+
``chunk_id`` field.
742+
replace_scope: Non-empty filter dict (e.g. collection, doc_id,
743+
parse_hash) identifying the scope of rows to replace.
736744
user_id: Optional user ID for multi-tenancy scoped deletion.
737745
is_admin: Whether the caller can operate across tenants.
746+
747+
Raises:
748+
ValueError: If *replace_scope* is empty or contains disallowed
749+
keys, or if any record is missing ``chunk_id``.
738750
"""
739751

740752
@abstractmethod

src/xagent/core/tools/core/RAG_tools/storage/lancedb_stores.py

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,26 +1460,42 @@ def replace_chunks(
14601460
user_id: Optional[int] = None,
14611461
is_admin: bool = False,
14621462
) -> None:
1463-
"""Replace chunk records within a scope (delete old + insert new).
1463+
"""Replace chunk records within a scope (insert new, then delete old).
14641464
1465-
Safety: inserts new records first, then deletes rows that do NOT belong
1466-
to the new generation. This avoids data loss if the process crashes
1465+
Inserts *records* first via idempotent merge_insert, then deletes rows
1466+
matching *replace_scope* that do not belong to the new generation.
1467+
This insert-before-delete order avoids data loss if the process crashes
14671468
between the two operations (worst case: brief duplicate data, not zero
1468-
data).
1469+
data). Also cascade-deletes orphaned embedding rows from all
1470+
``embeddings_*`` tables.
14691471
"""
14701472
from ..LanceDB.schema_manager import _safe_close_table, ensure_chunks_table
14711473

1474+
if not replace_scope:
1475+
raise ValueError("replace_scope must not be empty")
1476+
14721477
for key in replace_scope:
14731478
if key not in self._REPLACE_CHUNKS_ALLOWED_KEYS:
14741479
raise ValueError(f"Invalid replace_scope column: {key}")
14751480

1481+
if records:
1482+
missing = [i for i, r in enumerate(records) if "chunk_id" not in r]
1483+
if missing:
1484+
raise ValueError(
1485+
f"records at index {missing} missing required 'chunk_id' field"
1486+
)
1487+
14761488
conn = self._get_connection()
14771489
ensure_chunks_table(conn)
14781490
table = conn.open_table("chunks")
14791491
try:
1480-
# Step 1: Insert new records (safe — idempotent add)
1492+
# Step 1: Upsert new records (merge_insert is idempotent on retry)
14811493
if records:
1482-
table.add(records)
1494+
table.merge_insert(
1495+
["collection", "doc_id", "parse_hash", "chunk_id"]
1496+
).when_matched_update_all().when_not_matched_insert_all().execute(
1497+
records
1498+
)
14831499

14841500
# Step 2: Build delete filter targeting old generations
14851501
scope_parts = [
@@ -1510,6 +1526,48 @@ def replace_chunks(
15101526
_safe_close_table(table)
15111527
self.invalidate_table_cache("chunks")
15121528

1529+
# Step 3: Cascade-delete orphaned embeddings for the same scope.
1530+
# After chunk replacement, old chunk_ids no longer exist in the chunks
1531+
# table but their embedding rows would still be searchable.
1532+
embed_scope_parts = [
1533+
f"{k} == '{escape_lancedb_string(str(v))}'"
1534+
for k, v in replace_scope.items()
1535+
if k in ("collection", "doc_id", "parse_hash")
1536+
]
1537+
if not embed_scope_parts:
1538+
return
1539+
1540+
embed_base = " AND ".join(embed_scope_parts)
1541+
embed_user = UserPermissions.get_user_filter(user_id, is_admin)
1542+
if embed_base and embed_user:
1543+
embed_filter = f"({embed_base}) AND ({embed_user})"
1544+
elif embed_user:
1545+
embed_filter = embed_user
1546+
else:
1547+
embed_filter = embed_base
1548+
1549+
if records:
1550+
new_ids = [r["chunk_id"] for r in records]
1551+
id_list = ", ".join(f"'{escape_lancedb_string(cid)}'" for cid in new_ids)
1552+
embed_filter = f"({embed_filter}) AND chunk_id NOT IN ({id_list})"
1553+
1554+
for tname in self.list_table_names():
1555+
if not tname.startswith("embeddings_"):
1556+
continue
1557+
try:
1558+
etable = conn.open_table(tname)
1559+
try:
1560+
etable.delete(embed_filter)
1561+
finally:
1562+
_safe_close_table(etable)
1563+
except Exception: # noqa: BLE001
1564+
logger.warning(
1565+
"Failed to cascade-delete embeddings from %s",
1566+
tname,
1567+
exc_info=True,
1568+
)
1569+
self.invalidate_table_cache()
1570+
15131571
def upsert_embeddings(self, model_tag: str, records: List[Dict[str, Any]]) -> None:
15141572
"""Upsert embedding records to LanceDB with fallback pattern.
15151573

tests/core/tools/core/RAG_tools/pipelines/test_document_ingestion.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from __future__ import annotations
44

5+
import threading
6+
import time
57
from typing import Dict, List, Union
68

79
import pytest
@@ -972,3 +974,117 @@ def test_process_document_rejects_absolute_paths_outside_allowed_dir(
972974
or "permission" in message_lower
973975
or "denied" in message_lower
974976
), f"Unexpected error message for {abs_path}: {result.message}"
977+
978+
979+
# ---------------------------------------------------------------------------
980+
# Ingestion lock tests (PR #202 comment 7)
981+
# ---------------------------------------------------------------------------
982+
983+
984+
def test_get_ingestion_lock_returns_same_instance() -> None:
985+
"""_get_ingestion_lock returns the same Lock for the same key."""
986+
document_ingestion._ingestion_locks.clear()
987+
lock_a = document_ingestion._get_ingestion_lock("col", "/tmp/a.pdf")
988+
lock_b = document_ingestion._get_ingestion_lock("col", "/tmp/a.pdf")
989+
assert lock_a is lock_b
990+
991+
992+
def test_get_ingestion_lock_different_keys() -> None:
993+
"""Different (collection, source_path) pairs get independent locks."""
994+
document_ingestion._ingestion_locks.clear()
995+
lock_a = document_ingestion._get_ingestion_lock("col", "/tmp/a.pdf")
996+
lock_b = document_ingestion._get_ingestion_lock("col", "/tmp/b.pdf")
997+
lock_c = document_ingestion._get_ingestion_lock("other", "/tmp/a.pdf")
998+
assert lock_a is not lock_b
999+
assert lock_a is not lock_c
1000+
1001+
1002+
def test_ingestion_lock_serialises_same_document(
1003+
monkeypatch: pytest.MonkeyPatch,
1004+
) -> None:
1005+
"""Two concurrent process_document calls for the same document run serially."""
1006+
document_ingestion._ingestion_locks.clear()
1007+
1008+
execution_log: List[str] = []
1009+
original_impl = document_ingestion._process_document_impl
1010+
1011+
def _slow_impl(*args: object, **kwargs: object) -> IngestionResult:
1012+
execution_log.append("enter")
1013+
# Without the lock, both threads would be here simultaneously
1014+
time.sleep(0.1)
1015+
execution_log.append("exit")
1016+
return original_impl(*args, **kwargs)
1017+
1018+
_patch_pipeline_dependencies(monkeypatch)
1019+
monkeypatch.setattr(document_ingestion, "_process_document_impl", _slow_impl)
1020+
1021+
results: List[IngestionResult] = [None, None] # type: ignore[list-item]
1022+
1023+
def _run(idx: int) -> None:
1024+
results[idx] = document_ingestion.process_document(
1025+
collection="demo",
1026+
source_path="/tmp/doc.pdf",
1027+
config=IngestionConfig(),
1028+
)
1029+
1030+
t1 = threading.Thread(target=_run, args=(0,))
1031+
t2 = threading.Thread(target=_run, args=(1,))
1032+
t1.start()
1033+
t2.start()
1034+
t1.join(timeout=10)
1035+
t2.join(timeout=10)
1036+
1037+
# With serialisation the pattern must be [enter, exit, enter, exit],
1038+
# never [enter, enter, ...] which would indicate overlap.
1039+
assert len(execution_log) == 4
1040+
assert execution_log == ["enter", "exit", "enter", "exit"]
1041+
1042+
1043+
def test_ingestion_lock_allows_different_documents_concurrently(
1044+
monkeypatch: pytest.MonkeyPatch,
1045+
) -> None:
1046+
"""Calls for different documents are NOT serialised (they use independent locks)."""
1047+
document_ingestion._ingestion_locks.clear()
1048+
1049+
entered = threading.Event()
1050+
gate = threading.Event()
1051+
1052+
original_impl = document_ingestion._process_document_impl
1053+
1054+
def _blocking_impl(*args: object, **kwargs: object) -> IngestionResult:
1055+
entered.set()
1056+
gate.wait(timeout=5)
1057+
return original_impl(*args, **kwargs)
1058+
1059+
_patch_pipeline_dependencies(monkeypatch)
1060+
monkeypatch.setattr(document_ingestion, "_process_document_impl", _blocking_impl)
1061+
1062+
def _run_a() -> None:
1063+
document_ingestion.process_document(
1064+
collection="demo",
1065+
source_path="/tmp/a.pdf",
1066+
config=IngestionConfig(),
1067+
)
1068+
1069+
def _run_b() -> None:
1070+
document_ingestion.process_document(
1071+
collection="demo",
1072+
source_path="/tmp/b.pdf",
1073+
config=IngestionConfig(),
1074+
)
1075+
1076+
t1 = threading.Thread(target=_run_a)
1077+
t2 = threading.Thread(target=_run_b)
1078+
t1.start()
1079+
# Wait for thread 1 to enter _blocking_impl
1080+
assert entered.wait(timeout=5)
1081+
entered.clear()
1082+
1083+
t2.start()
1084+
# Thread 2 should also enter _blocking_impl (different document, no lock contention)
1085+
both_entered = entered.wait(timeout=2)
1086+
gate.set() # Release both threads
1087+
t1.join(timeout=5)
1088+
t2.join(timeout=5)
1089+
1090+
assert both_entered, "Different documents should not block each other"

0 commit comments

Comments
 (0)