Skip to content

feat: batch Neo4j graph writes with UNWIND queries#2816

Closed
ndcorder wants to merge 1 commit intoHKUDS:mainfrom
ndcorder:feat/batch-neo4j-writes
Closed

feat: batch Neo4j graph writes with UNWIND queries#2816
ndcorder wants to merge 1 commit intoHKUDS:mainfrom
ndcorder:feat/batch-neo4j-writes

Conversation

@ndcorder
Copy link
Copy Markdown

@ndcorder ndcorder commented Mar 21, 2026

Currently every entity and relationship gets its own Neo4j session + transaction + MERGE query during the merge phase. For a document producing 80 entities and 120 relations, that's 200+ round-trips.

This adds batch_upsert_nodes/batch_upsert_edges to the graph storage interface with a default loop fallback so other backends aren't affected. Neo4JStorage overrides them with UNWIND-based Cypher. The merge phase in operate.py now collects results and writes them in bulk instead of one-at-a-time.

Should help with #1387, #1957, #1648, #2264.

Replace individual upsert_node/upsert_edge calls with batched UNWIND
queries during the merge phase. Nodes are grouped by entity_type,
large batches chunked at 500.

Also adds has_nodes_batch for bulk existence checks.
@ndcorder ndcorder force-pushed the feat/batch-neo4j-writes branch from 7d6dc48 to 4f6ba51 Compare March 21, 2026 09:35
@danielaskdd
Copy link
Copy Markdown
Collaborator

Thanks for your interesting in LightRAG and contributions. However I noticed a critical concurrency issue (Race Condition) introduced by deferring the graph writes.

In the current implementation of lightrag/operate.py, the _locked_process_entity_name (and similarly _locked_process_edges) function acquires an application-level distributed lock using async with get_storage_keyed_lock:

async with get_storage_keyed_lock([entity_name], namespace=namespace, enable_logging=False):
    entity_data = await _merge_nodes_then_upsert(...) 
    # This reads the old node from DB, merges data, but DOES NOT write it yet!
# Lock is released here!

By deferring the upsert_node out of the loop and putting it into a batch at the end of the phase (after all locks are released):

# Batch-write all entity nodes to the graph in one call
if nodes_to_upsert:
    await knowledge_graph_inst.batch_upsert_nodes(nodes_to_upsert)

This breaks the "Read-Modify-Write" atomic cycle:

  1. Task A acquires the lock for Apple, reads from the DB (finds it missing or gets old state), calculates the merged state, and releases the lock.
  2. Task B acquires the lock for Apple, reads from the DB (still finds it missing/old state, because Task A hasn't executed the batch upsert yet!), calculates the merged state, and releases the lock.
  3. Both tasks append their result to their respective nodes_to_upsert lists.
  4. When batch_upsert_nodes is finally called, Task B will simply overwrite Task A's result (or trigger a duplication/constraint error).

This could lead to severe data loss and inconsistencies when multiple documents are processed concurrently and contain the same entities/edges.

To maintain concurrent safety while benefiting from batching, we might need to rely entirely on DB-level atomic merge operations (e.g., using ON CREATE SET and ON MATCH SET in Cypher) and skip application-level reading before writing. (Though this might break compatibility with other GraphDB implementations).

This PR will not be merged until you find a way to work it out. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants