Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 75 additions & 7 deletions libs/langchain-classic/src/storage/file_system.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as fs from "node:fs/promises";
import * as path from "node:path";
import { randomUUID } from "node:crypto";
import { BaseStore } from "@langchain/core/stores";

/**
Expand Down Expand Up @@ -37,6 +38,12 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {

rootPath: string;

/**
* Map to track ongoing write operations per key.
* This ensures that concurrent writes to the same key are serialized.
*/
private writeQueues = new Map<string, Promise<void>>();

constructor(fields: { rootPath: string }) {
super(fields);
this.rootPath = fields.rootPath;
Expand Down Expand Up @@ -75,17 +82,36 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
}

/**
* Writes the given key-value pairs to the file at the given path.
* @param fileContent An object with the key-value pairs to be written to the file.
* Writes the given key-value pairs to the file at the given path using atomic write.
* This method writes to a temporary file first, then atomically renames it to the
* final destination. This prevents partial writes and corruption if the process
* crashes during the write operation.
* @param content The content to write to the file.
* @param key The key identifying the file.
*/
private async setFileContent(content: Uint8Array, key: string) {
const finalPath = this.getFullPath(key);
const tempPath = `${finalPath}.${randomUUID()}.tmp`;

try {
await fs.writeFile(this.getFullPath(key), content);
// Write to temporary file first
await fs.writeFile(tempPath, content);

// Atomically rename to final destination
// On most filesystems, rename is atomic - either the old file exists or the new one does
await fs.rename(tempPath, finalPath);
} catch (error) {
// Clean up temporary file if it exists
try {
await fs.unlink(tempPath);
} catch {
// Ignore cleanup errors - file might not exist
}

throw new Error(
`Error writing file at path: ${this.getFullPath(
key
)}.\nError: ${JSON.stringify(error)}`
`Error writing file at path: ${finalPath}.\nError: ${JSON.stringify(
error
)}`
Comment on lines +93 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of writing a temporary file if the write operations are queued?

Copy link
Author

@sagarjariwala333 sagarjariwala333 Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for writing to a temporary file first

Writing to a temporary file and then renaming it ensures the final file is only replaced once the new data is fully written. If the process crashes mid-write, only the temp file is affected and the original file stays intact.

Why rename is used

The rename() operation is atomic on most filesystems.
This gives us transaction-like semantics:
either the entire write succeeds and the file is replaced atomically,
or nothing changes at all.

There is never a state where the final file is partially written or corrupted.

What if the rename operation fails

While rare, rename() can fail due to permission errors, file locks, disk I/O issues, or filesystem differences.
If rename fails:
the original file stays untouched (so integrity is preserved)
the new data is not applied
we can safely propagate the error, because the operation behaves like a failed transaction

Since the write either fully applies or not at all, the failure mode is predictable and safe.

);
}
}
Expand Down Expand Up @@ -135,14 +161,55 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
return values;
}

/**
* Queues a write operation for a specific key to ensure serialization.
* If there's already a pending write for this key, the new write will
* wait for it to complete before executing.
* @param key The key to write to.
* @param value The value to write.
* @returns Promise that resolves when the write is complete.
*/
private async queueWrite(key: string, value: Uint8Array): Promise<void> {
// Get the existing queue for this key, or start with a resolved promise
const existingQueue = this.writeQueues.get(key) || Promise.resolve();

// Chain the new write operation after the existing queue
const writePromise = existingQueue
.then(() => this.setFileContent(value, key))
.finally(() => {
// Clean up the queue entry if this is still the current promise
// This prevents memory leaks from accumulating promises
if (this.writeQueues.get(key) === writePromise) {
this.writeQueues.delete(key);
}
});

// Store the new promise as the current queue for this key
this.writeQueues.set(key, writePromise);

return writePromise;
}

/**
* Sets the values for the given keys in the store.
* This method handles concurrent writes safely by:
* 1. Deduplicating keys within the same batch (last value wins)
* 2. Serializing writes to the same key across different mset() calls
* @param keyValuePairs Array of key-value pairs to set in the store.
* @returns Promise that resolves when all key-value pairs have been set.
*/
async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {
// Deduplicate keys within this batch - last value wins
const uniqueEntries = new Map<string, Uint8Array>();
for (const [key, value] of keyValuePairs) {
uniqueEntries.set(key, value);
}

// Queue all writes, ensuring serialization per key
await Promise.all(
keyValuePairs.map(([key, value]) => this.setFileContent(value, key))
Array.from(uniqueEntries.entries()).map(([key, value]) =>
this.queueWrite(key, value)
)
);
}

Expand Down Expand Up @@ -174,6 +241,7 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
/**
* Static method for initializing the class.
* Preforms a check to see if the directory exists, and if not, creates it.
* Also cleans up any orphaned temporary files from previous crashes.
* @param path Path to the directory.
* @returns Promise that resolves to an instance of the class.
*/
Expand Down
82 changes: 75 additions & 7 deletions libs/langchain/src/storage/file_system.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as fs from "node:fs/promises";
import * as path from "node:path";
import { randomUUID } from "node:crypto";
import { BaseStore } from "@langchain/core/stores";

/**
Expand Down Expand Up @@ -37,6 +38,12 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {

rootPath: string;

/**
* Map to track ongoing write operations per key.
* This ensures that concurrent writes to the same key are serialized.
*/
private writeQueues = new Map<string, Promise<void>>();

constructor(fields: { rootPath: string }) {
super(fields);
this.rootPath = fields.rootPath;
Expand Down Expand Up @@ -75,17 +82,36 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
}

/**
* Writes the given key-value pairs to the file at the given path.
* @param fileContent An object with the key-value pairs to be written to the file.
* Writes the given key-value pairs to the file at the given path using atomic write.
* This method writes to a temporary file first, then atomically renames it to the
* final destination. This prevents partial writes and corruption if the process
* crashes during the write operation.
* @param content The content to write to the file.
* @param key The key identifying the file.
*/
private async setFileContent(content: Uint8Array, key: string) {
const finalPath = this.getFullPath(key);
const tempPath = `${finalPath}.${randomUUID()}.tmp`;

try {
await fs.writeFile(this.getFullPath(key), content);
// Write to temporary file first
await fs.writeFile(tempPath, content);

// Atomically rename to final destination
// On most filesystems, rename is atomic - either the old file exists or the new one does
await fs.rename(tempPath, finalPath);
} catch (error) {
// Clean up temporary file if it exists
try {
await fs.unlink(tempPath);
} catch {
// Ignore cleanup errors - file might not exist
}

throw new Error(
`Error writing file at path: ${this.getFullPath(
key
)}.\nError: ${JSON.stringify(error)}`
`Error writing file at path: ${finalPath}.\nError: ${JSON.stringify(
error
)}`
);
}
}
Expand Down Expand Up @@ -135,14 +161,55 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
return values;
}

/**
* Queues a write operation for a specific key to ensure serialization.
* If there's already a pending write for this key, the new write will
* wait for it to complete before executing.
* @param key The key to write to.
* @param value The value to write.
* @returns Promise that resolves when the write is complete.
*/
private async queueWrite(key: string, value: Uint8Array): Promise<void> {
// Get the existing queue for this key, or start with a resolved promise
const existingQueue = this.writeQueues.get(key) || Promise.resolve();

// Chain the new write operation after the existing queue
const writePromise = existingQueue
.then(() => this.setFileContent(value, key))
.finally(() => {
// Clean up the queue entry if this is still the current promise
// This prevents memory leaks from accumulating promises
if (this.writeQueues.get(key) === writePromise) {
this.writeQueues.delete(key);
}
});

// Store the new promise as the current queue for this key
this.writeQueues.set(key, writePromise);

return writePromise;
}

/**
* Sets the values for the given keys in the store.
* This method handles concurrent writes safely by:
* 1. Deduplicating keys within the same batch (last value wins)
* 2. Serializing writes to the same key across different mset() calls
* @param keyValuePairs Array of key-value pairs to set in the store.
* @returns Promise that resolves when all key-value pairs have been set.
*/
async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {
// Deduplicate keys within this batch - last value wins
const uniqueEntries = new Map<string, Uint8Array>();
for (const [key, value] of keyValuePairs) {
uniqueEntries.set(key, value);
}

// Queue all writes, ensuring serialization per key
await Promise.all(
keyValuePairs.map(([key, value]) => this.setFileContent(value, key))
Array.from(uniqueEntries.entries()).map(([key, value]) =>
this.queueWrite(key, value)
)
);
}

Expand Down Expand Up @@ -174,6 +241,7 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
/**
* Static method for initializing the class.
* Preforms a check to see if the directory exists, and if not, creates it.
* Also cleans up any orphaned temporary files from previous crashes.
* @param path Path to the directory.
* @returns Promise that resolves to an instance of the class.
*/
Expand Down