Skip to content

Conversation

@sagarjariwala333
Copy link

@sagarjariwala333 sagarjariwala333 commented Nov 12, 2025

Summary

Fixes concurrent write corruption in LocalFileStore.mset() by implementing queue-based write serialization with atomic file operations.

Closes #9337

Problem

When multiple concurrent mset() calls write to the same key, the underlying fs.writeFile() operations can interleave, corrupting the cache files. This commonly occurs when using CacheBackedEmbeddings with LocalFileStore for persistent embedding caching.

Reproduction

// 100 concurrent calls writing to same key
await Promise.all(
  Array.from({ length: 100 }, () => 
    store.mset([["key", data]])
  )
);
// Result: Corrupted file with mixed data

Symptoms

  • JSON parse errors when reading cache
  • Corrupted file content like: {"chunkId":998,"data":"0vCRv..."}rY\"}04Fsfa...
  • Cache misses due to invalid data
  • Unreliable embedding caching

Solution

Implemented a queue-based write serialization approach combined with atomic writes:

1. Write Queue (Prevents Race Conditions)

  • Added writeQueues Map to track pending writes per key
  • Writes to the same key are serialized (queued)
  • Writes to different keys remain concurrent (maintains performance)
  • Automatic cleanup prevents memory leaks

2. Atomic Writes (Prevents Partial Corruption)

  • Write to temporary file first (.tmp extension)
  • Atomically rename to final destination
  • Prevents partial writes if process crashes
  • Cleanup orphaned temp files on initialization

Changes

Core Implementation

Files Modified:

  • libs/langchain-classic/src/storage/file_system.ts
  • libs/langchain/src/storage/file_system.ts

Key Changes:

  1. Added writeQueues: Map<string, Promise<void>> property
  2. Added queueWrite() method for serialization
  3. Updated setFileContent() to use atomic write-then-rename
  4. Updated mset() to deduplicate and queue writes
  5. Updated fromPath() to cleanup orphaned temp files

Code Example

// Before (Buggy)
async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {
  await Promise.all(
    keyValuePairs.map(([key, value]) => this.setFileContent(value, key))
  );
}

// After (Fixed)
async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {
  // 1. Deduplicate keys
  const uniqueEntries = new Map<string, Uint8Array>();
  for (const [key, value] of keyValuePairs) {
    uniqueEntries.set(key, value);
  }
  
  // 2. Queue writes per key
  await Promise.all(
    Array.from(uniqueEntries.entries()).map(([key, value]) =>
      this.queueWrite(key, value)
    )
  );
}

Breaking Changes

None - This is 100% backward compatible.

  • ✅ No API changes
  • ✅ Same method signatures
  • ✅ Same behavior (except no corruption)
  • ✅ No new dependencies

Verification

// Your existing code works as-is
const store = await LocalFileStore.fromPath("./cache");
await store.mset([["key", data]]);
const result = await store.mget(["key"]);
// No corruption, even with concurrent operations ✅

Technical Details

Why Queue-Based Approach?

  1. No dependencies: Pure TypeScript/Node.js solution
  2. Simple logic: Easy to understand and maintain
  3. Automatic cleanup: Prevents memory leaks
  4. Maintains performance: Different keys still concurrent
  5. Deterministic: Last write wins (predictable behavior)

Why Atomic Writes?

  1. Crash safety: Prevents partial writes if process crashes
  2. OS-level atomicity: fs.rename() is atomic on most filesystems
  3. Better multi-process: Works better across processes (though not perfect)
  4. No partial data: Either old file exists or new one does

Real-World Impact

Who This Affects

  • Users of CacheBackedEmbeddings with LocalFileStore
  • Applications caching embeddings to disk
  • RAG applications with persistent caching
  • Any code with concurrent mset() calls

Before Fix

❌ Concurrent writes → File corruption
❌ JSON parse errors
❌ Cache misses
❌ Unreliable caching

After Fix

✅ Concurrent writes → No corruption
✅ Valid JSON always
✅ Reliable cache hits
✅ Production ready

Checklist

Related Issues

Credits

  • Issue Reporter: @Josh-Engle
  • Solution: Queue-based write serialization + atomic writes

@changeset-bot
Copy link

changeset-bot bot commented Nov 12, 2025

⚠️ No Changeset found

Latest commit: 919fc02

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Member

@hntrl hntrl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sagarjariwala333! One question about your approach:

Comment on lines +93 to +114
const finalPath = this.getFullPath(key);
const tempPath = `${finalPath}.${randomUUID()}.tmp`;

try {
await fs.writeFile(this.getFullPath(key), content);
// Write to temporary file first
await fs.writeFile(tempPath, content);

// Atomically rename to final destination
// On most filesystems, rename is atomic - either the old file exists or the new one does
await fs.rename(tempPath, finalPath);
} catch (error) {
// Clean up temporary file if it exists
try {
await fs.unlink(tempPath);
} catch {
// Ignore cleanup errors - file might not exist
}

throw new Error(
`Error writing file at path: ${this.getFullPath(
key
)}.\nError: ${JSON.stringify(error)}`
`Error writing file at path: ${finalPath}.\nError: ${JSON.stringify(
error
)}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of writing a temporary file if the write operations are queued?

Copy link
Author

@sagarjariwala333 sagarjariwala333 Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for writing to a temporary file first

Writing to a temporary file and then renaming it ensures the final file is only replaced once the new data is fully written. If the process crashes mid-write, only the temp file is affected and the original file stays intact.

Why rename is used

The rename() operation is atomic on most filesystems.
This gives us transaction-like semantics:
either the entire write succeeds and the file is replaced atomically,
or nothing changes at all.

There is never a state where the final file is partially written or corrupted.

What if the rename operation fails

While rare, rename() can fail due to permission errors, file locks, disk I/O issues, or filesystem differences.
If rename fails:
the original file stays untouched (so integrity is preserved)
the new data is not applied
we can safely propagate the error, because the operation behaves like a failed transaction

Since the write either fully applies or not at all, the failure mode is predictable and safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Concurrent LocalFileStore mset writes can corrupt chunked embeddings cache

3 participants