diff --git a/libs/core/langchain_core/indexing/api.py b/libs/core/langchain_core/indexing/api.py index da3c54fd63f77..7f96d85b9e13c 100644 --- a/libs/core/langchain_core/indexing/api.py +++ b/libs/core/langchain_core/indexing/api.py @@ -460,6 +460,7 @@ def index( num_updated = 0 num_deleted = 0 scoped_full_cleanup_source_ids: set[str] = set() + incremental_cleanup_source_ids: set[str] = set() for doc_batch in _batch(batch_size, doc_iterator): # Track original batch size before deduplication @@ -494,6 +495,8 @@ def index( raise ValueError(msg) if cleanup == "scoped_full": scoped_full_cleanup_source_ids.add(source_id) + if cleanup == "incremental": + incremental_cleanup_source_ids.add(source_id) # Source IDs cannot be None after for loop above. source_ids = cast("Sequence[str]", source_ids) @@ -550,29 +553,20 @@ def index( time_at_least=index_start_dt, ) - # If source IDs are provided, we can do the deletion incrementally! - if cleanup == "incremental": - # Get the uids of the documents that were not returned by the loader. - # mypy isn't good enough to determine that source IDs cannot be None - # here due to a check that's happening above, so we check again. - for source_id in source_ids: - if source_id is None: - msg = ( - "source_id cannot be None at this point. " - "Reached unreachable code." - ) - raise AssertionError(msg) - - source_ids_ = cast("Sequence[str]", source_ids) - - while uids_to_delete := record_manager.list_keys( - group_ids=source_ids_, before=index_start_dt, limit=cleanup_batch_size - ): - # Then delete from vector store. - _delete(destination, uids_to_delete) - # First delete from record store. - record_manager.delete_keys(uids_to_delete) - num_deleted += len(uids_to_delete) + # Incremental cleanup: delete stale documents after all batches are processed. + # This is done after the loop (rather than per-batch) to avoid prematurely + # deleting documents from later batches that share the same source_id, + # which would cause unnecessary re-embedding on every run. + if cleanup == "incremental" and incremental_cleanup_source_ids: + incremental_source_ids_list = list(incremental_cleanup_source_ids) + while uids_to_delete := record_manager.list_keys( + group_ids=incremental_source_ids_list, + before=index_start_dt, + limit=cleanup_batch_size, + ): + _delete(destination, uids_to_delete) + record_manager.delete_keys(uids_to_delete) + num_deleted += len(uids_to_delete) if cleanup == "full" or ( cleanup == "scoped_full" and scoped_full_cleanup_source_ids @@ -810,6 +804,7 @@ async def aindex( num_updated = 0 num_deleted = 0 scoped_full_cleanup_source_ids: set[str] = set() + incremental_cleanup_source_ids: set[str] = set() async for doc_batch in _abatch(batch_size, async_doc_iterator): # Track original batch size before deduplication @@ -844,6 +839,8 @@ async def aindex( raise ValueError(msg) if cleanup == "scoped_full": scoped_full_cleanup_source_ids.add(source_id) + if cleanup == "incremental": + incremental_cleanup_source_ids.add(source_id) # Source IDs cannot be None after for loop above. source_ids = cast("Sequence[str]", source_ids) @@ -899,31 +896,20 @@ async def aindex( time_at_least=index_start_dt, ) - # If source IDs are provided, we can do the deletion incrementally! - - if cleanup == "incremental": - # Get the uids of the documents that were not returned by the loader. - - # mypy isn't good enough to determine that source IDs cannot be None - # here due to a check that's happening above, so we check again. - for source_id in source_ids: - if source_id is None: - msg = ( - "source_id cannot be None at this point. " - "Reached unreachable code." - ) - raise AssertionError(msg) - - source_ids_ = cast("Sequence[str]", source_ids) - - while uids_to_delete := await record_manager.alist_keys( - group_ids=source_ids_, before=index_start_dt, limit=cleanup_batch_size - ): - # Then delete from vector store. - await _adelete(destination, uids_to_delete) - # First delete from record store. - await record_manager.adelete_keys(uids_to_delete) - num_deleted += len(uids_to_delete) + # Incremental cleanup: delete stale documents after all batches are processed. + # This is done after the loop (rather than per-batch) to avoid prematurely + # deleting documents from later batches that share the same source_id, + # which would cause unnecessary re-embedding on every run. + if cleanup == "incremental" and incremental_cleanup_source_ids: + incremental_source_ids_list = list(incremental_cleanup_source_ids) + while uids_to_delete := await record_manager.alist_keys( + group_ids=incremental_source_ids_list, + before=index_start_dt, + limit=cleanup_batch_size, + ): + await _adelete(destination, uids_to_delete) + await record_manager.adelete_keys(uids_to_delete) + num_deleted += len(uids_to_delete) if cleanup == "full" or ( cleanup == "scoped_full" and scoped_full_cleanup_source_ids diff --git a/libs/core/tests/unit_tests/indexing/test_indexing.py b/libs/core/tests/unit_tests/indexing/test_indexing.py index f598048a98ff0..ec9e1257583fe 100644 --- a/libs/core/tests/unit_tests/indexing/test_indexing.py +++ b/libs/core/tests/unit_tests/indexing/test_indexing.py @@ -1616,6 +1616,9 @@ def test_incremental_indexing_with_batch_size( } assert doc_texts == {"1", "2", "3", "4"} + # Re-indexing the same documents should skip all of them without + # redundant deletes and re-adds, even when batch_size < total docs + # and all docs share the same source_id. (Fixes #32612) with patch.object( record_manager, "get_time", @@ -1630,9 +1633,9 @@ def test_incremental_indexing_with_batch_size( batch_size=2, key_encoder="sha256", ) == { - "num_added": 2, - "num_deleted": 2, - "num_skipped": 2, + "num_added": 0, + "num_deleted": 0, + "num_skipped": 4, "num_updated": 0, } @@ -1644,6 +1647,239 @@ def test_incremental_indexing_with_batch_size( assert doc_texts == {"1", "2", "3", "4"} +def test_incremental_indexing_with_batch_size_no_redundant_work( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Regression test for #32612. + + Re-indexing unchanged docs with batch_size < total docs and same + source_id should not cause redundant deletes and re-adds. + """ + docs = [ + Document(page_content=str(i), metadata={"source": "same_source"}) + for i in range(10) + ] + + # First run: index all 10 docs with batch_size=3 + with patch.object( + record_manager, + "get_time", + return_value=datetime(2021, 1, 1, tzinfo=timezone.utc).timestamp(), + ): + result = index( + docs, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + key_encoder="sha256", + ) + assert result == { + "num_added": 10, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + # Second run: all docs unchanged, should all be skipped + with patch.object( + record_manager, + "get_time", + return_value=datetime(2021, 1, 2, tzinfo=timezone.utc).timestamp(), + ): + result = index( + docs, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + key_encoder="sha256", + ) + assert result == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 10, + "num_updated": 0, + } + + # Third run: verify stability (no drift over repeated runs) + with patch.object( + record_manager, + "get_time", + return_value=datetime(2021, 1, 3, tzinfo=timezone.utc).timestamp(), + ): + result = index( + docs, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + key_encoder="sha256", + ) + assert result == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 10, + "num_updated": 0, + } + + # All original docs still present in the vector store + doc_texts = { + vector_store.get_by_ids([uid])[0].page_content for uid in vector_store.store + } + assert doc_texts == {str(i) for i in range(10)} + + +async def test_aincremental_indexing_with_batch_size_no_redundant_work( + arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Async regression test for #32612. + + Re-indexing unchanged docs with batch_size < total docs and same + source_id should not cause redundant deletes and re-adds. + """ + docs = [ + Document(page_content=str(i), metadata={"source": "same_source"}) + for i in range(10) + ] + + # First run: index all 10 docs with batch_size=3 + with patch.object( + arecord_manager, + "aget_time", + return_value=datetime(2021, 1, 1, tzinfo=timezone.utc).timestamp(), + ): + result = await aindex( + docs, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + key_encoder="sha256", + ) + assert result == { + "num_added": 10, + "num_deleted": 0, + "num_skipped": 0, + "num_updated": 0, + } + + # Second run: all docs unchanged, should all be skipped + with patch.object( + arecord_manager, + "aget_time", + return_value=datetime(2021, 1, 2, tzinfo=timezone.utc).timestamp(), + ): + result = await aindex( + docs, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + key_encoder="sha256", + ) + assert result == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 10, + "num_updated": 0, + } + + # Third run: verify stability + with patch.object( + arecord_manager, + "aget_time", + return_value=datetime(2021, 1, 3, tzinfo=timezone.utc).timestamp(), + ): + result = await aindex( + docs, + arecord_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=3, + key_encoder="sha256", + ) + assert result == { + "num_added": 0, + "num_deleted": 0, + "num_skipped": 10, + "num_updated": 0, + } + + doc_texts = { + vector_store.get_by_ids([uid])[0].page_content for uid in vector_store.store + } + assert doc_texts == {str(i) for i in range(10)} + + +def test_incremental_indexing_stale_docs_still_cleaned_up( + record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore +) -> None: + """Verify that truly stale docs are still deleted after the fix. + + When a document is updated (content changes), the old version should be + cleaned up even with batch_size < total docs. + """ + docs_v1 = [ + Document(page_content="a", metadata={"source": "s1"}), + Document(page_content="b", metadata={"source": "s1"}), + Document(page_content="c", metadata={"source": "s1"}), + Document(page_content="d", metadata={"source": "s1"}), + ] + + with patch.object( + record_manager, + "get_time", + return_value=datetime(2021, 1, 1, tzinfo=timezone.utc).timestamp(), + ): + index( + docs_v1, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + key_encoder="sha256", + ) + + # Update: replace "c" and "d" with "e" and "f" + docs_v2 = [ + Document(page_content="a", metadata={"source": "s1"}), + Document(page_content="b", metadata={"source": "s1"}), + Document(page_content="e", metadata={"source": "s1"}), + Document(page_content="f", metadata={"source": "s1"}), + ] + + with patch.object( + record_manager, + "get_time", + return_value=datetime(2021, 1, 2, tzinfo=timezone.utc).timestamp(), + ): + result = index( + docs_v2, + record_manager, + vector_store, + cleanup="incremental", + source_id_key="source", + batch_size=2, + key_encoder="sha256", + ) + assert result["num_added"] == 2 # "e" and "f" + assert result["num_deleted"] == 2 # "c" and "d" + assert result["num_skipped"] == 2 # "a" and "b" + + doc_texts = { + vector_store.get_by_ids([uid])[0].page_content for uid in vector_store.store + } + assert doc_texts == {"a", "b", "e", "f"} + + def test_incremental_delete_with_batch_size( record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore ) -> None: