Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 34 additions & 48 deletions libs/core/langchain_core/indexing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def index(
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()
incremental_cleanup_source_ids: set[str] = set()

for doc_batch in _batch(batch_size, doc_iterator):
# Track original batch size before deduplication
Expand Down Expand Up @@ -494,6 +495,8 @@ def index(
raise ValueError(msg)
if cleanup == "scoped_full":
scoped_full_cleanup_source_ids.add(source_id)
if cleanup == "incremental":
incremental_cleanup_source_ids.add(source_id)
# Source IDs cannot be None after for loop above.
source_ids = cast("Sequence[str]", source_ids)

Expand Down Expand Up @@ -550,29 +553,20 @@ def index(
time_at_least=index_start_dt,
)

# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source IDs cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
msg = (
"source_id cannot be None at this point. "
"Reached unreachable code."
)
raise AssertionError(msg)

source_ids_ = cast("Sequence[str]", source_ids)

while uids_to_delete := record_manager.list_keys(
group_ids=source_ids_, before=index_start_dt, limit=cleanup_batch_size
):
# Then delete from vector store.
_delete(destination, uids_to_delete)
# First delete from record store.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
# Incremental cleanup: delete stale documents after all batches are processed.
# This is done after the loop (rather than per-batch) to avoid prematurely
# deleting documents from later batches that share the same source_id,
# which would cause unnecessary re-embedding on every run.
if cleanup == "incremental" and incremental_cleanup_source_ids:
incremental_source_ids_list = list(incremental_cleanup_source_ids)
while uids_to_delete := record_manager.list_keys(
group_ids=incremental_source_ids_list,
before=index_start_dt,
limit=cleanup_batch_size,
):
_delete(destination, uids_to_delete)
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)

if cleanup == "full" or (
cleanup == "scoped_full" and scoped_full_cleanup_source_ids
Expand Down Expand Up @@ -810,6 +804,7 @@ async def aindex(
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()
incremental_cleanup_source_ids: set[str] = set()

async for doc_batch in _abatch(batch_size, async_doc_iterator):
# Track original batch size before deduplication
Expand Down Expand Up @@ -844,6 +839,8 @@ async def aindex(
raise ValueError(msg)
if cleanup == "scoped_full":
scoped_full_cleanup_source_ids.add(source_id)
if cleanup == "incremental":
incremental_cleanup_source_ids.add(source_id)
# Source IDs cannot be None after for loop above.
source_ids = cast("Sequence[str]", source_ids)

Expand Down Expand Up @@ -899,31 +896,20 @@ async def aindex(
time_at_least=index_start_dt,
)

# If source IDs are provided, we can do the deletion incrementally!

if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.

# mypy isn't good enough to determine that source IDs cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
msg = (
"source_id cannot be None at this point. "
"Reached unreachable code."
)
raise AssertionError(msg)

source_ids_ = cast("Sequence[str]", source_ids)

while uids_to_delete := await record_manager.alist_keys(
group_ids=source_ids_, before=index_start_dt, limit=cleanup_batch_size
):
# Then delete from vector store.
await _adelete(destination, uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
# Incremental cleanup: delete stale documents after all batches are processed.
# This is done after the loop (rather than per-batch) to avoid prematurely
# deleting documents from later batches that share the same source_id,
# which would cause unnecessary re-embedding on every run.
if cleanup == "incremental" and incremental_cleanup_source_ids:
incremental_source_ids_list = list(incremental_cleanup_source_ids)
while uids_to_delete := await record_manager.alist_keys(
group_ids=incremental_source_ids_list,
before=index_start_dt,
limit=cleanup_batch_size,
):
await _adelete(destination, uids_to_delete)
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)

if cleanup == "full" or (
cleanup == "scoped_full" and scoped_full_cleanup_source_ids
Expand Down
242 changes: 239 additions & 3 deletions libs/core/tests/unit_tests/indexing/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,6 +1616,9 @@ def test_incremental_indexing_with_batch_size(
}
assert doc_texts == {"1", "2", "3", "4"}

# Re-indexing the same documents should skip all of them without
# redundant deletes and re-adds, even when batch_size < total docs
# and all docs share the same source_id. (Fixes #32612)
with patch.object(
record_manager,
"get_time",
Expand All @@ -1630,9 +1633,9 @@ def test_incremental_indexing_with_batch_size(
batch_size=2,
key_encoder="sha256",
) == {
"num_added": 2,
"num_deleted": 2,
"num_skipped": 2,
"num_added": 0,
"num_deleted": 0,
"num_skipped": 4,
"num_updated": 0,
}

Expand All @@ -1644,6 +1647,239 @@ def test_incremental_indexing_with_batch_size(
assert doc_texts == {"1", "2", "3", "4"}


def test_incremental_indexing_with_batch_size_no_redundant_work(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Regression test for #32612.

Re-indexing unchanged docs with batch_size < total docs and same
source_id should not cause redundant deletes and re-adds.
"""
docs = [
Document(page_content=str(i), metadata={"source": "same_source"})
for i in range(10)
]

# First run: index all 10 docs with batch_size=3
with patch.object(
record_manager,
"get_time",
return_value=datetime(2021, 1, 1, tzinfo=timezone.utc).timestamp(),
):
result = index(
docs,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=3,
key_encoder="sha256",
)
assert result == {
"num_added": 10,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}

# Second run: all docs unchanged, should all be skipped
with patch.object(
record_manager,
"get_time",
return_value=datetime(2021, 1, 2, tzinfo=timezone.utc).timestamp(),
):
result = index(
docs,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=3,
key_encoder="sha256",
)
assert result == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 10,
"num_updated": 0,
}

# Third run: verify stability (no drift over repeated runs)
with patch.object(
record_manager,
"get_time",
return_value=datetime(2021, 1, 3, tzinfo=timezone.utc).timestamp(),
):
result = index(
docs,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=3,
key_encoder="sha256",
)
assert result == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 10,
"num_updated": 0,
}

# All original docs still present in the vector store
doc_texts = {
vector_store.get_by_ids([uid])[0].page_content for uid in vector_store.store
}
assert doc_texts == {str(i) for i in range(10)}


async def test_aincremental_indexing_with_batch_size_no_redundant_work(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Async regression test for #32612.

Re-indexing unchanged docs with batch_size < total docs and same
source_id should not cause redundant deletes and re-adds.
"""
docs = [
Document(page_content=str(i), metadata={"source": "same_source"})
for i in range(10)
]

# First run: index all 10 docs with batch_size=3
with patch.object(
arecord_manager,
"aget_time",
return_value=datetime(2021, 1, 1, tzinfo=timezone.utc).timestamp(),
):
result = await aindex(
docs,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=3,
key_encoder="sha256",
)
assert result == {
"num_added": 10,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}

# Second run: all docs unchanged, should all be skipped
with patch.object(
arecord_manager,
"aget_time",
return_value=datetime(2021, 1, 2, tzinfo=timezone.utc).timestamp(),
):
result = await aindex(
docs,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=3,
key_encoder="sha256",
)
assert result == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 10,
"num_updated": 0,
}

# Third run: verify stability
with patch.object(
arecord_manager,
"aget_time",
return_value=datetime(2021, 1, 3, tzinfo=timezone.utc).timestamp(),
):
result = await aindex(
docs,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=3,
key_encoder="sha256",
)
assert result == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 10,
"num_updated": 0,
}

doc_texts = {
vector_store.get_by_ids([uid])[0].page_content for uid in vector_store.store
}
assert doc_texts == {str(i) for i in range(10)}


def test_incremental_indexing_stale_docs_still_cleaned_up(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Verify that truly stale docs are still deleted after the fix.

When a document is updated (content changes), the old version should be
cleaned up even with batch_size < total docs.
"""
docs_v1 = [
Document(page_content="a", metadata={"source": "s1"}),
Document(page_content="b", metadata={"source": "s1"}),
Document(page_content="c", metadata={"source": "s1"}),
Document(page_content="d", metadata={"source": "s1"}),
]

with patch.object(
record_manager,
"get_time",
return_value=datetime(2021, 1, 1, tzinfo=timezone.utc).timestamp(),
):
index(
docs_v1,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
key_encoder="sha256",
)

# Update: replace "c" and "d" with "e" and "f"
docs_v2 = [
Document(page_content="a", metadata={"source": "s1"}),
Document(page_content="b", metadata={"source": "s1"}),
Document(page_content="e", metadata={"source": "s1"}),
Document(page_content="f", metadata={"source": "s1"}),
]

with patch.object(
record_manager,
"get_time",
return_value=datetime(2021, 1, 2, tzinfo=timezone.utc).timestamp(),
):
result = index(
docs_v2,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
key_encoder="sha256",
)
assert result["num_added"] == 2 # "e" and "f"
assert result["num_deleted"] == 2 # "c" and "d"
assert result["num_skipped"] == 2 # "a" and "b"

doc_texts = {
vector_store.get_by_ids([uid])[0].page_content for uid in vector_store.store
}
assert doc_texts == {"a", "b", "e", "f"}


def test_incremental_delete_with_batch_size(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
Expand Down
Loading