Skip to content

Commit 5f2b867

Browse files
committed
fix(kb): replace stale chunk rows on re-chunk (issue #199)
When users re-chunk a document with different parameters, the old chunk rows (different config_hash) must be removed. Introduce `replace_chunks()` in the storage abstraction layer that safely inserts new rows first then deletes the old generation (crash-safe: worst case is brief duplicate data, never data loss). Also: - Add scope-key whitelist validation to prevent filter injection - Add concurrency note (last-write-wins) to chunk_document docstring - Add 4 unit tests for replace_chunks edge cases - Make separator-vs-default integration test deterministic (tmp_path) Refs: #199
1 parent f03074a commit 5f2b867

5 files changed

Lines changed: 277 additions & 26 deletions

File tree

src/xagent/core/tools/core/RAG_tools/chunk/chunk_document.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ def chunk_document(
8787
"""
8888
Chunk parsed paragraphs and write to chunks table.
8989
90+
Concurrency note:
91+
This function uses **last-write-wins** semantics. If two concurrent
92+
calls target the same (collection, doc_id, parse_hash) with different
93+
parameters, the last one to reach ``_write_chunks_to_db`` will replace
94+
the other's output. Callers requiring mutual exclusion should hold a
95+
collection-level lock before invoking this function.
96+
9097
Args:
9198
collection: Collection name for data isolation
9299
doc_id: Document ID whose parsed result to chunk
@@ -474,7 +481,12 @@ def _write_chunks_to_db(
474481
user_id: Optional[int] = None,
475482
is_admin: bool = False,
476483
) -> bool:
477-
"""Write chunk records to database using abstraction layer."""
484+
"""Write chunk records to database using abstraction layer.
485+
486+
Replaces any existing rows for the same document parse (collection, doc_id,
487+
parse_hash) and tenancy scope before inserting, so a new ``config_hash`` does
488+
not leave stale chunks from a previous configuration (issue #199).
489+
"""
478490
try:
479491
rows = []
480492
for chunk in chunks:
@@ -503,9 +515,17 @@ def _write_chunks_to_db(
503515
if not rows:
504516
return False
505517

506-
# Use abstraction layer for upsert
507518
vector_store = get_vector_index_store()
508-
vector_store.upsert_chunks(rows)
519+
vector_store.replace_chunks(
520+
rows,
521+
replace_scope={
522+
"collection": collection,
523+
"doc_id": doc_id,
524+
"parse_hash": parse_hash,
525+
},
526+
user_id=user_id,
527+
is_admin=is_admin,
528+
)
509529

510530
logger.info(
511531
"Chunk records written to database: doc_id=%s, parse_hash=%s, config_hash=%s",

src/xagent/core/tools/core/RAG_tools/storage/contracts.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,30 @@ def upsert_chunks(self, records: List[Dict[str, Any]]) -> None:
713713
records: List of chunk record dictionaries to upsert.
714714
"""
715715

716+
@abstractmethod
717+
def replace_chunks(
718+
self,
719+
records: List[Dict[str, Any]],
720+
*,
721+
replace_scope: Dict[str, Any],
722+
user_id: Optional[int] = None,
723+
is_admin: bool = False,
724+
) -> None:
725+
"""Replace chunk records within a scope (delete old + insert new).
726+
727+
Deletes all existing chunk rows matching *replace_scope* (and tenancy
728+
filters), then inserts *records*. This guarantees that re-chunking with
729+
different parameters does not leave stale rows from a previous
730+
configuration (issue #199).
731+
732+
Args:
733+
records: New chunk records to insert after deletion.
734+
replace_scope: Filter dict (e.g. collection, doc_id, parse_hash)
735+
identifying rows to delete before inserting.
736+
user_id: Optional user ID for multi-tenancy scoped deletion.
737+
is_admin: Whether the caller can operate across tenants.
738+
"""
739+
716740
@abstractmethod
717741
def upsert_embeddings(self, model_tag: str, records: List[Dict[str, Any]]) -> None:
718742
"""Upsert embedding records (sync).

src/xagent/core/tools/core/RAG_tools/storage/lancedb_stores.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,68 @@ def upsert_chunks(self, records: List[Dict[str, Any]]) -> None:
14481448
_safe_close_table(table)
14491449
self.invalidate_table_cache("chunks")
14501450

1451+
_REPLACE_CHUNKS_ALLOWED_KEYS: frozenset = frozenset(
1452+
{"collection", "doc_id", "parse_hash"}
1453+
)
1454+
1455+
def replace_chunks(
1456+
self,
1457+
records: List[Dict[str, Any]],
1458+
*,
1459+
replace_scope: Dict[str, Any],
1460+
user_id: Optional[int] = None,
1461+
is_admin: bool = False,
1462+
) -> None:
1463+
"""Replace chunk records within a scope (delete old + insert new).
1464+
1465+
Safety: inserts new records first, then deletes rows that do NOT belong
1466+
to the new generation. This avoids data loss if the process crashes
1467+
between the two operations (worst case: brief duplicate data, not zero
1468+
data).
1469+
"""
1470+
from ..LanceDB.schema_manager import _safe_close_table, ensure_chunks_table
1471+
1472+
for key in replace_scope:
1473+
if key not in self._REPLACE_CHUNKS_ALLOWED_KEYS:
1474+
raise ValueError(f"Invalid replace_scope column: {key}")
1475+
1476+
conn = self._get_connection()
1477+
ensure_chunks_table(conn)
1478+
table = conn.open_table("chunks")
1479+
try:
1480+
# Step 1: Insert new records (safe — idempotent add)
1481+
if records:
1482+
table.add(records)
1483+
1484+
# Step 2: Build delete filter targeting old generations
1485+
scope_parts = [
1486+
f"{k} == '{escape_lancedb_string(str(v))}'"
1487+
for k, v in replace_scope.items()
1488+
]
1489+
base_filter = " AND ".join(scope_parts)
1490+
1491+
user_filter = UserPermissions.get_user_filter(user_id, is_admin)
1492+
if base_filter and user_filter:
1493+
delete_expr = f"({base_filter}) AND ({user_filter})"
1494+
elif user_filter:
1495+
delete_expr = user_filter
1496+
else:
1497+
delete_expr = base_filter
1498+
1499+
# Exclude newly inserted chunk_ids from deletion
1500+
if records and delete_expr:
1501+
new_ids = [r["chunk_id"] for r in records]
1502+
id_list = ", ".join(
1503+
f"'{escape_lancedb_string(cid)}'" for cid in new_ids
1504+
)
1505+
delete_expr = f"({delete_expr}) AND chunk_id NOT IN ({id_list})"
1506+
1507+
if delete_expr:
1508+
table.delete(delete_expr)
1509+
finally:
1510+
_safe_close_table(table)
1511+
self.invalidate_table_cache("chunks")
1512+
14511513
def upsert_embeddings(self, model_tag: str, records: List[Dict[str, Any]]) -> None:
14521514
"""Upsert embedding records to LanceDB with fallback pattern.
14531515

tests/core/tools/core/RAG_tools/chunk/test_chunk_document.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,7 @@ def test_chunk_config_hash_idempotency(
810810
def test_chunk_separators_create_new_version(
811811
self, temp_lancedb_dir, test_collection, test_doc_id
812812
):
813-
"""Test that different separators create new version."""
813+
"""Re-chunking with different separators replaces prior rows (single config_hash)."""
814814
# Step 1: Register and parse document
815815
txt_path = "tests/resources/test_files/test.txt"
816816
register_document(
@@ -852,11 +852,9 @@ def test_chunk_separators_create_new_version(
852852
user_id=1,
853853
)
854854

855-
# Both should write (different versions)
856855
assert chunk_result1["created"] is True
857856
assert chunk_result2["created"] is True
858857

859-
# Verify database has two different config_hash versions
860858
from xagent.core.tools.core.RAG_tools.storage.factory import (
861859
get_vector_store_raw_connection,
862860
)
@@ -871,14 +869,11 @@ def test_chunk_separators_create_new_version(
871869
.to_pandas()
872870
)
873871

874-
# Should have two different config_hash values
872+
# Only the latest chunking generation is kept for this parse (issue #199).
875873
config_hashes = df["config_hash"].unique()
876-
assert len(config_hashes) == 2
877-
878-
# Each version should have > 0 rows
879-
for config_hash in config_hashes:
880-
version_rows = df[df["config_hash"] == config_hash]
881-
assert len(version_rows) > 0
874+
assert len(config_hashes) == 1
875+
assert len(df) == chunk_result2["chunk_count"]
876+
assert len(df) > 0
882877

883878
def test_chunk_recursive_custom_separators_integration(
884879
self, temp_lancedb_dir, test_collection, test_doc_id
@@ -934,13 +929,19 @@ def test_chunk_recursive_custom_separators_integration(
934929
)
935930

936931
def test_chunk_recursive_custom_separators_vs_default_different_result(
937-
self, temp_lancedb_dir, test_collection, test_doc_id
932+
self, temp_lancedb_dir, test_collection, test_doc_id, tmp_path
938933
):
939-
"""Integration: custom separators produce different chunk count or config than default."""
940-
txt_path = "tests/resources/test_files/test.txt"
934+
"""Integration: default then custom separators; DB keeps only the last generation."""
935+
# Generate deterministic long text to guarantee different chunk counts
936+
lines = [
937+
f"Line {i}: The quick brown fox jumps over the lazy dog." for i in range(20)
938+
]
939+
long_doc = tmp_path / "long_doc.txt"
940+
long_doc.write_text("\n".join(lines), encoding="utf-8")
941+
941942
register_document(
942943
collection=test_collection,
943-
source_path=txt_path,
944+
source_path=str(long_doc),
944945
doc_id=test_doc_id,
945946
user_id=1,
946947
)
@@ -953,13 +954,14 @@ def test_chunk_recursive_custom_separators_vs_default_different_result(
953954
)
954955
parse_hash = parse_result["parse_hash"]
955956

957+
# Large chunk_size → few chunks; small chunk_size with newline separator → many chunks.
956958
chunk_default = chunk_document(
957959
collection=test_collection,
958960
doc_id=test_doc_id,
959961
parse_hash=parse_hash,
960962
chunk_strategy=ChunkStrategy.RECURSIVE,
961-
chunk_size=50,
962-
chunk_overlap=10,
963+
chunk_size=500,
964+
chunk_overlap=0,
963965
separators=None,
964966
user_id=1,
965967
)
@@ -968,14 +970,16 @@ def test_chunk_recursive_custom_separators_vs_default_different_result(
968970
doc_id=test_doc_id,
969971
parse_hash=parse_hash,
970972
chunk_strategy=ChunkStrategy.RECURSIVE,
971-
chunk_size=50,
972-
chunk_overlap=10,
973-
separators=["。", "\n"],
973+
chunk_size=25,
974+
chunk_overlap=5,
975+
separators=["\n"],
974976
user_id=1,
975977
)
976978
assert chunk_default["created"] is True
977979
assert chunk_custom["created"] is True
978-
# Different separators must yield different config_hash (hence different version)
980+
assert chunk_default["chunk_count"] != chunk_custom["chunk_count"], (
981+
"Expected distinct chunking outcomes so the replacement semantics are observable"
982+
)
979983
from xagent.core.tools.core.RAG_tools.storage.factory import (
980984
get_vector_store_raw_connection,
981985
)
@@ -990,9 +994,8 @@ def test_chunk_recursive_custom_separators_vs_default_different_result(
990994
.to_pandas()
991995
)
992996
config_hashes = df["config_hash"].unique()
993-
assert len(config_hashes) == 2, (
994-
"Default and custom separators should produce two distinct chunk versions"
995-
)
997+
assert len(config_hashes) == 1
998+
assert len(df) == chunk_custom["chunk_count"]
996999

9971000
def test_chunk_row_level_hash_uniqueness(
9981001
self, temp_lancedb_dir, test_collection, test_doc_id

0 commit comments

Comments
 (0)