Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,9 @@ async def background_delete_documents(
total_docs = len(doc_ids)
successful_deletions = []
failed_deletions = []
# Aggregate rebuild targets across all deletions so we only rebuild once
all_entities_to_rebuild: dict[str, list] = {}
all_relationships_to_rebuild: dict[tuple, list] = {}

# Double-check pipeline status before proceeding
async with pipeline_status_lock:
Expand Down Expand Up @@ -1906,13 +1909,22 @@ async def background_delete_documents(
file_path = "#"
try:
result = await rag.adelete_by_doc_id(
doc_id, delete_llm_cache=delete_llm_cache
doc_id,
delete_llm_cache=delete_llm_cache,
skip_rebuild=True,
)
file_path = (
getattr(result, "file_path", "-") if "result" in locals() else "-"
)
if result.status == "success":
successful_deletions.append(doc_id)
# Collect deferred rebuild targets
if result.entities_to_rebuild:
all_entities_to_rebuild.update(result.entities_to_rebuild)
if result.relationships_to_rebuild:
all_relationships_to_rebuild.update(
result.relationships_to_rebuild
)
success_msg = (
f"Document deleted {i}/{total_docs}: {doc_id}[{file_path}]"
)
Expand Down Expand Up @@ -2049,6 +2061,98 @@ async def background_delete_documents(
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)

# Single deferred rebuild for all affected entities/relations
rebuild_ok = True
if all_entities_to_rebuild or all_relationships_to_rebuild:
from dataclasses import asdict

from lightrag.operate import rebuild_knowledge_from_chunks

# --- Issue 1: filter out entities/relations that were already fully
# deleted by a later per-doc call. When documents share entities,
# an earlier doc might tag an entity for "rebuild" while a later doc
# ends up deleting it entirely. Trying to rebuild a deleted node is
# wasteful and may error out.
pruned_entities = 0
for ent_name in list(all_entities_to_rebuild):
if not await rag.chunk_entity_relation_graph.has_node(ent_name):
del all_entities_to_rebuild[ent_name]
pruned_entities += 1
pruned_relations = 0
for edge_key in list(all_relationships_to_rebuild):
src, tgt = edge_key
if not await rag.chunk_entity_relation_graph.has_edge(src, tgt):
del all_relationships_to_rebuild[edge_key]
pruned_relations += 1
if pruned_entities or pruned_relations:
prune_msg = (
f"Pruned {pruned_entities} entities and {pruned_relations} "
f"relations that were already deleted by overlapping docs"
)
logger.info(prune_msg)
async with pipeline_status_lock:
pipeline_status["history_messages"].append(prune_msg)

if all_entities_to_rebuild or all_relationships_to_rebuild:
rebuild_msg = (
f"Rebuilding knowledge graph: {len(all_entities_to_rebuild)} entities, "
f"{len(all_relationships_to_rebuild)} relations"
)
logger.info(rebuild_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = rebuild_msg
pipeline_status["history_messages"].append(rebuild_msg)
try:
await rebuild_knowledge_from_chunks(
entities_to_rebuild=all_entities_to_rebuild,
relationships_to_rebuild=all_relationships_to_rebuild,
knowledge_graph_inst=rag.chunk_entity_relation_graph,
entities_vdb=rag.entities_vdb,
relationships_vdb=rag.relationships_vdb,
text_chunks_storage=rag.text_chunks,
llm_response_cache=rag.llm_response_cache,
global_config=asdict(rag),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
entity_chunks_storage=rag.entity_chunks,
relation_chunks_storage=rag.relation_chunks,
)
await rag._insert_done()
except Exception as rebuild_err:
rebuild_ok = False
rebuild_error_msg = (
f"Failed to rebuild knowledge graph after batch deletion: {rebuild_err}"
)
logger.error(rebuild_error_msg)
logger.error(traceback.format_exc())
async with pipeline_status_lock:
pipeline_status["latest_message"] = rebuild_error_msg
pipeline_status["history_messages"].append(rebuild_error_msg)

# --- Issue 2: only finalize doc_status removal after rebuild succeeds.
# adelete_by_doc_id(skip_rebuild=True) intentionally keeps doc_status
# alive so a failed rebuild doesn't lose track of the documents.
if rebuild_ok and successful_deletions:
try:
await rag.doc_status.delete(successful_deletions)
except Exception as status_err:
logger.error(
f"Failed to clean up doc_status for {len(successful_deletions)} docs: {status_err}"
)
async with pipeline_status_lock:
pipeline_status["history_messages"].append(
f"Warning: doc_status cleanup failed: {status_err}"
)
elif not rebuild_ok:
keep_msg = (
f"Keeping doc_status for {len(successful_deletions)} docs "
f"because rebuild failed — re-trigger deletion to retry"
)
Comment on lines +2146 to +2150

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark deferred rebuild failures as failed deletions

In the rebuild-failure branch, the code logs a warning but never reclassifies the previously successful_deletions as failed. The finally block then emits a completion summary from these counters, so users can see a fully successful batch even though doc cleanup was intentionally withheld due to rebuild failure; this makes operational status inaccurate and can hide that manual retry is still required.

Useful? React with 👍 / 👎.

logger.warning(keep_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = keep_msg
pipeline_status["history_messages"].append(keep_msg)

except Exception as e:
error_msg = f"Critical error during batch deletion: {str(e)}"
logger.error(error_msg)
Expand Down
3 changes: 3 additions & 0 deletions lightrag/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,9 @@ class DeletionResult:
message: str
status_code: int = 200
file_path: str | None = None
# Populated when skip_rebuild=True so the caller can do a single deferred rebuild
entities_to_rebuild: Dict[str, list] | None = None
relationships_to_rebuild: Dict[Any, list] | None = None


# Unified Query Result Data Structures for Reference List Support
Expand Down
76 changes: 52 additions & 24 deletions lightrag/lightrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3137,7 +3137,10 @@ async def aget_docs_by_ids(
return found_statuses

async def adelete_by_doc_id(
self, doc_id: str, delete_llm_cache: bool = False
self,
doc_id: str,
delete_llm_cache: bool = False,
skip_rebuild: bool = False,
) -> DeletionResult:
"""Delete a document and all its related data, including chunks, graph elements.

Expand Down Expand Up @@ -3170,6 +3173,10 @@ async def adelete_by_doc_id(
doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results
associated with the document. Defaults to False.
skip_rebuild (bool): When True, skip the per-document KG rebuild step.
The caller is responsible for performing a single deferred rebuild
using the entities/relationships returned in the DeletionResult.
Used by batch deletion to avoid N redundant rebuilds. Defaults to False.

Returns:
DeletionResult: An object containing the outcome of the deletion process.
Expand All @@ -3178,6 +3185,8 @@ async def adelete_by_doc_id(
- `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
- `file_path` (str | None): The file path of the deleted document, if available.
- `entities_to_rebuild` (dict | None): Populated when skip_rebuild=True.
- `relationships_to_rebuild` (dict | None): Populated when skip_rebuild=True.
"""
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
Expand Down Expand Up @@ -3860,27 +3869,39 @@ async def adelete_by_doc_id(
raise Exception(f"Failed to persist pre-rebuild changes: {e}") from e

# 8. Rebuild entities and relationships from remaining chunks
# When skip_rebuild is set (batch deletion), we hand the targets back
# to the caller so it can do one combined rebuild at the end.
if entities_to_rebuild or relationships_to_rebuild:
try:
deletion_stage = "rebuild_knowledge_graph"
await rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks_storage=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
if skip_rebuild:
logger.info(
"Skipping per-doc rebuild (skip_rebuild=True), "
"%d entities / %d relations deferred",
len(entities_to_rebuild),
len(relationships_to_rebuild),
)
else:
try:
deletion_stage = "rebuild_knowledge_graph"
await rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks_storage=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
)

except Exception as e:
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
raise Exception(f"Failed to rebuild knowledge graph: {e}") from e
except Exception as e:
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
raise Exception(
f"Failed to rebuild knowledge graph: {e}"
) from e

# 9. Delete LLM cache while the document status still exists so a failure
# remains retryable via the same doc_id.
Expand Down Expand Up @@ -3940,26 +3961,33 @@ async def adelete_by_doc_id(
) from e

# 11. Delete original document and status.
# doc_status is deleted first so that if full_docs.delete fails, a retry
# finds no doc_status record and treats the document as already gone,
# rather than finding a doc_status that points to a missing full_docs entry.
# doc_status is deleted first so that if full_docs.delete fails, a retry
# finds no doc_status record and treats the document as already gone.
# When skip_rebuild is set the caller handles a deferred rebuild that may
# fail -- keep doc_status alive so the user can re-trigger deletion later.
try:
deletion_stage = "delete_doc_entries"
in_final_delete_stage = True
await self.doc_status.delete([doc_id])
if not skip_rebuild:
await self.doc_status.delete([doc_id])
await self.full_docs.delete([doc_id])
Comment on lines +3971 to 3973

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve rebuild metadata when deferring cleanup

When skip_rebuild=True, this path skips doc_status deletion but still proceeds after stage 10, where full_entities/full_relations are removed before returning success. If the later deferred rebuild fails in background_delete_documents, a retry cannot reconstruct entities_to_rebuild/relationships_to_rebuild because the per-doc graph metadata is already gone, so the promised “re-trigger deletion to retry” flow cannot actually repair the KG and can leave stale summaries indefinitely.

Useful? React with 👍 / 👎.

except Exception as e:
logger.error(f"Failed to delete document and status: {e}")
raise Exception(f"Failed to delete document and status: {e}") from e

deletion_fully_completed = True
return DeletionResult(

result = DeletionResult(
status="success",
doc_id=doc_id,
message=log_message,
status_code=200,
file_path=file_path,
)
if skip_rebuild:
result.entities_to_rebuild = entities_to_rebuild
result.relationships_to_rebuild = relationships_to_rebuild
return result

except Exception as e:
original_exception = e
Expand Down
Loading
Loading