From 1243cc08a86c9fcb905de967b4f1ef1d2fe14385 Mon Sep 17 00:00:00 2001 From: Josh Engle Date: Wed, 12 Nov 2025 16:14:15 -0700 Subject: [PATCH 1/3] feat(langchain): implement key locking and atomic file writes in LocalFileStore --- .../src/storage/file_system.ts | 123 +++++++++++++++-- .../src/storage/tests/file_system.test.ts | 103 +++++++++++++- libs/langchain/src/storage/file_system.ts | 126 ++++++++++++++++-- .../src/storage/tests/file_system.test.ts | 110 ++++++++++++++- 4 files changed, 434 insertions(+), 28 deletions(-) diff --git a/libs/langchain-classic/src/storage/file_system.ts b/libs/langchain-classic/src/storage/file_system.ts index d5b5c2ce57a3..cdabf541043e 100644 --- a/libs/langchain-classic/src/storage/file_system.ts +++ b/libs/langchain-classic/src/storage/file_system.ts @@ -37,6 +37,8 @@ export class LocalFileStore extends BaseStore { rootPath: string; + private keyLocks: Map> = new Map(); + constructor(fields: { rootPath: string }) { super(fields); this.rootPath = fields.rootPath; @@ -79,15 +81,18 @@ export class LocalFileStore extends BaseStore { * @param fileContent An object with the key-value pairs to be written to the file. */ private async setFileContent(content: Uint8Array, key: string) { - try { - await fs.writeFile(this.getFullPath(key), content); - } catch (error) { - throw new Error( - `Error writing file at path: ${this.getFullPath( - key - )}.\nError: ${JSON.stringify(error)}` - ); - } + await this.withKeyLock(key, async () => { + const fullPath = this.getFullPath(key); + try { + await this.writeFileAtomically(content, fullPath); + } catch (error) { + throw new Error( + `Error writing file at path: ${fullPath}.\nError: ${JSON.stringify( + error + )}` + ); + } + }); } /** @@ -141,8 +146,15 @@ export class LocalFileStore extends BaseStore { * @returns Promise that resolves when all key-value pairs have been set. */ async mset(keyValuePairs: [string, Uint8Array][]): Promise { + const deduped = new Map(); + for (const [key, value] of keyValuePairs) { + deduped.set(key, value); + } + await Promise.all( - keyValuePairs.map(([key, value]) => this.setFileContent(value, key)) + Array.from(deduped.entries(), ([key, value]) => + this.setFileContent(value, key) + ) ); } @@ -152,7 +164,19 @@ export class LocalFileStore extends BaseStore { * @returns Promise that resolves when all keys have been deleted. */ async mdelete(keys: string[]): Promise { - await Promise.all(keys.map((key) => fs.unlink(this.getFullPath(key)))); + await Promise.all( + keys.map((key) => + this.withKeyLock(key, async () => { + try { + await fs.unlink(this.getFullPath(key)); + } catch (error) { + if (!error || (error as { code?: string }).code !== "ENOENT") { + throw error; + } + } + }) + ) + ); } /** @@ -162,8 +186,10 @@ export class LocalFileStore extends BaseStore { * @returns AsyncGenerator that yields keys from the store. */ async *yieldKeys(prefix?: string): AsyncGenerator { - const allFiles = await fs.readdir(this.rootPath); - const allKeys = allFiles.map((file) => file.replace(".txt", "")); + const allFiles: string[] = await fs.readdir(this.rootPath); + const allKeys = allFiles + .filter((file) => file.endsWith(".txt")) + .map((file) => file.replace(/\.txt$/, "")); for (const key of allKeys) { if (prefix === undefined || key.startsWith(prefix)) { yield key; @@ -194,6 +220,77 @@ export class LocalFileStore extends BaseStore { } } + // Clean up orphaned temp files left by interrupted atomic writes. + try { + const entries = await fs.readdir(rootPath); + await Promise.all( + entries + .filter((file) => file.endsWith(".tmp")) + .map((tempFile) => + fs.unlink(path.join(rootPath, tempFile)).catch(() => {}) + ) + ); + } catch { + // Ignore cleanup errors. + } + return new this({ rootPath }); } + + /** + * Ensures calls for the same key run sequentially by chaining promises. + * @param key Key to serialize operations for. + * @param fn Async work to execute while the lock is held. + * @returns Promise resolving with the callback result once the lock releases. + */ + private async withKeyLock(key: string, fn: () => Promise): Promise { + const previous = this.keyLocks.get(key) ?? Promise.resolve(); + const waitForPrevious = previous.catch(() => {}); + + let resolveCurrent: (() => void) | undefined; + const current = new Promise((resolve) => { + resolveCurrent = resolve; + }); + + const tail = waitForPrevious.then(() => current); + this.keyLocks.set(key, tail); + + await waitForPrevious; + try { + return await fn(); + } finally { + resolveCurrent?.(); + if (this.keyLocks.get(key) === tail) { + this.keyLocks.delete(key); + } + } + } + + private async writeFileAtomically(content: Uint8Array, fullPath: string) { + const directory = path.dirname(fullPath); + await fs.mkdir(directory, { recursive: true }); + + const tempPath = `${fullPath}.${Date.now()}-${Math.random() + .toString(16) + .slice(2)}.tmp`; + + try { + await fs.writeFile(tempPath, content); + + try { + await fs.rename(tempPath, fullPath); + } catch (renameError) { + const code = (renameError as { code?: string }).code; + if (renameError && (code === "EPERM" || code === "EACCES")) { + await fs.writeFile(fullPath, content); + await fs.unlink(tempPath).catch(() => {}); + } else { + throw renameError; + } + } + } catch (error) { + await fs.unlink(tempPath).catch(() => {}); + throw error; + } + } } diff --git a/libs/langchain-classic/src/storage/tests/file_system.test.ts b/libs/langchain-classic/src/storage/tests/file_system.test.ts index 4f5214213966..4674feedbdc0 100644 --- a/libs/langchain-classic/src/storage/tests/file_system.test.ts +++ b/libs/langchain-classic/src/storage/tests/file_system.test.ts @@ -2,10 +2,24 @@ import fs from "node:fs"; import path from "node:path"; import os from "node:os"; -import { describe, expect, test } from "vitest"; +import { describe, expect, test, vi } from "vitest"; import { LocalFileStore } from "../file_system.js"; +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve: (value: T | PromiseLike) => resolve(value), + reject: (reason?: unknown) => reject(reason), + }; +} + describe("LocalFileStore", () => { const keys = ["key1", "key2"]; const tempDir = fs.mkdtempSync( @@ -32,6 +46,93 @@ describe("LocalFileStore", () => { ]); }); + test("LocalFileStore uses last value for duplicate keys in mset", async () => { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const store = await LocalFileStore.fromPath(tempDir); + const key = "duplicate-key"; + await store.mset([ + [key, encoder.encode("first")], + [key, encoder.encode("second")], + ]); + const [value] = await store.mget([key]); + expect(value).toBeDefined(); + expect(decoder.decode(value!)).toBe("second"); + await store.mdelete([key]); + }); + + test("LocalFileStore queues writes for the same key while a lock is held", async () => { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const store = await LocalFileStore.fromPath(tempDir); + const key = "locked-key"; + + const prototype = Object.getPrototypeOf(store) as { + writeFileAtomically: ( + this: LocalFileStore, + content: Uint8Array, + fullPath: string + ) => Promise; + }; + type WriteFileArgs = [Uint8Array, string]; + const originalWriteFileAtomically = prototype.writeFileAtomically; + const firstWriteGate = createDeferred(); + const writeFileSpy = vi + .spyOn(prototype, "writeFileAtomically") + .mockImplementationOnce(async function (this: LocalFileStore, ...args: WriteFileArgs) { + await firstWriteGate.promise; + return originalWriteFileAtomically.apply(this, args); + }) + .mockImplementation(function (this: LocalFileStore, ...args: WriteFileArgs) { + return originalWriteFileAtomically.apply(this, args); + }); + + try { + const firstWrite = store.mset([[key, encoder.encode("first")]]); + + await expect.poll(() => writeFileSpy.mock.calls.length).toBe(1); + + const secondWrite = store.mset([[key, encoder.encode("second")]]); + + await new Promise((resolve) => setTimeout(resolve, 25)); + + expect(writeFileSpy.mock.calls.length).toBe(1); + + firstWriteGate.resolve(); + + await Promise.all([firstWrite, secondWrite]); + + expect(writeFileSpy.mock.calls.length).toBe(2); + + const [value] = await store.mget([key]); + expect(value).toBeDefined(); + expect(decoder.decode(value!)).toBe("second"); + + const { keyLocks } = store as unknown as { + keyLocks: Map>; + }; + expect(keyLocks.size).toBe(0); + } finally { + writeFileSpy.mockRestore(); + await store.mdelete([key]); + } + }); + + test("LocalFileStore removes orphaned temp files during initialization", async () => { + const cleanupDir = fs.mkdtempSync( + path.join(os.tmpdir(), "file_system_classic_cleanup") + ); + const orphanFile = path.join(cleanupDir, "orphan.tmp"); + fs.writeFileSync(orphanFile, "stale"); + + await LocalFileStore.fromPath(cleanupDir); + + const remaining = await fs.promises.readdir(cleanupDir); + expect(remaining).not.toContain("orphan.tmp"); + + await fs.promises.rm(cleanupDir, { recursive: true, force: true }); + }); + test("LocalFileStore can delete values", async () => { const encoder = new TextEncoder(); const store = await LocalFileStore.fromPath(tempDir); diff --git a/libs/langchain/src/storage/file_system.ts b/libs/langchain/src/storage/file_system.ts index d5b5c2ce57a3..94077c0d34b1 100644 --- a/libs/langchain/src/storage/file_system.ts +++ b/libs/langchain/src/storage/file_system.ts @@ -37,6 +37,8 @@ export class LocalFileStore extends BaseStore { rootPath: string; + private keyLocks: Map> = new Map(); + constructor(fields: { rootPath: string }) { super(fields); this.rootPath = fields.rootPath; @@ -79,15 +81,18 @@ export class LocalFileStore extends BaseStore { * @param fileContent An object with the key-value pairs to be written to the file. */ private async setFileContent(content: Uint8Array, key: string) { - try { - await fs.writeFile(this.getFullPath(key), content); - } catch (error) { - throw new Error( - `Error writing file at path: ${this.getFullPath( - key - )}.\nError: ${JSON.stringify(error)}` - ); - } + await this.withKeyLock(key, async () => { + const fullPath = this.getFullPath(key); + try { + await this.writeFileAtomically(content, fullPath); + } catch (error) { + throw new Error( + `Error writing file at path: ${fullPath}.\nError: ${JSON.stringify( + error + )}` + ); + } + }); } /** @@ -141,8 +146,15 @@ export class LocalFileStore extends BaseStore { * @returns Promise that resolves when all key-value pairs have been set. */ async mset(keyValuePairs: [string, Uint8Array][]): Promise { + const deduped = new Map(); + for (const [key, value] of keyValuePairs) { + deduped.set(key, value); + } + await Promise.all( - keyValuePairs.map(([key, value]) => this.setFileContent(value, key)) + Array.from(deduped.entries(), ([key, value]) => + this.setFileContent(value, key) + ) ); } @@ -152,7 +164,20 @@ export class LocalFileStore extends BaseStore { * @returns Promise that resolves when all keys have been deleted. */ async mdelete(keys: string[]): Promise { - await Promise.all(keys.map((key) => fs.unlink(this.getFullPath(key)))); + await Promise.all( + keys.map((key) => + this.withKeyLock(key, async () => { + try { + await fs.unlink(this.getFullPath(key)); + } catch (error) { + // Ignore missing files so deletes remain idempotent. + if (!error || (error as { code?: string }).code !== "ENOENT") { + throw error; + } + } + }) + ) + ); } /** @@ -162,8 +187,10 @@ export class LocalFileStore extends BaseStore { * @returns AsyncGenerator that yields keys from the store. */ async *yieldKeys(prefix?: string): AsyncGenerator { - const allFiles = await fs.readdir(this.rootPath); - const allKeys = allFiles.map((file) => file.replace(".txt", "")); + const allFiles: string[] = await fs.readdir(this.rootPath); + const allKeys = allFiles + .filter((file) => file.endsWith(".txt")) + .map((file) => file.replace(/\.txt$/, "")); for (const key of allKeys) { if (prefix === undefined || key.startsWith(prefix)) { yield key; @@ -194,6 +221,79 @@ export class LocalFileStore extends BaseStore { } } + // Clean up orphaned temp files left by interrupted atomic writes. + try { + const entries = await fs.readdir(rootPath); + await Promise.all( + entries + .filter((file) => file.endsWith(".tmp")) + .map((tempFile) => + fs.unlink(path.join(rootPath, tempFile)).catch(() => {}) + ) + ); + } catch { + // Ignore cleanup errors. + } + return new this({ rootPath }); } + + /** + * Ensures calls for the same key run sequentially by chaining promises. + * @param key Key to serialize operations for. + * @param fn Async work to execute while the lock is held. + * @returns Promise resolving with the callback result once the lock releases. + */ + private async withKeyLock(key: string, fn: () => Promise): Promise { + const previous = this.keyLocks.get(key) ?? Promise.resolve(); + const waitForPrevious = previous.catch(() => {}); + + let resolveCurrent: (() => void) | undefined; + const current = new Promise((resolve) => { + resolveCurrent = resolve; + }); + + const tail = waitForPrevious.then(() => current); + this.keyLocks.set(key, tail); + + await waitForPrevious; + try { + return await fn(); + } finally { + resolveCurrent?.(); + if (this.keyLocks.get(key) === tail) { + this.keyLocks.delete(key); + } + } + } + + private async writeFileAtomically(content: Uint8Array, fullPath: string) { + const directory = path.dirname(fullPath); + await fs.mkdir(directory, { recursive: true }); + + const tempPath = `${fullPath}.${Date.now()}-${Math.random() + .toString(16) + .slice(2)}.tmp`; + + try { + await fs.writeFile(tempPath, content); + + try { + await fs.rename(tempPath, fullPath); + } catch (renameError) { + const code = (renameError as { code?: string }).code; + if (renameError && (code === "EPERM" || code === "EACCES")) { + // Fallback for Windows where replacing an existing file can fail if the + // destination is locked by another process. + await fs.writeFile(fullPath, content); + await fs.unlink(tempPath).catch(() => {}); + } else { + throw renameError; + } + } + } catch (error) { + await fs.unlink(tempPath).catch(() => {}); + throw error; + } + } } diff --git a/libs/langchain/src/storage/tests/file_system.test.ts b/libs/langchain/src/storage/tests/file_system.test.ts index 4f5214213966..41f9bea641e2 100644 --- a/libs/langchain/src/storage/tests/file_system.test.ts +++ b/libs/langchain/src/storage/tests/file_system.test.ts @@ -2,10 +2,24 @@ import fs from "node:fs"; import path from "node:path"; import os from "node:os"; -import { describe, expect, test } from "vitest"; +import { describe, expect, test, vi } from "vitest"; import { LocalFileStore } from "../file_system.js"; +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve: (value: T | PromiseLike) => resolve(value), + reject: (reason?: unknown) => reject(reason), + }; +} + describe("LocalFileStore", () => { const keys = ["key1", "key2"]; const tempDir = fs.mkdtempSync( @@ -32,6 +46,100 @@ describe("LocalFileStore", () => { ]); }); + test("LocalFileStore uses last value for duplicate keys in mset", async () => { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const store = await LocalFileStore.fromPath(tempDir); + const key = "duplicate-key"; + await store.mset([ + [key, encoder.encode("first")], + [key, encoder.encode("second")], + ]); + const [value] = await store.mget([key]); + expect(value).toBeDefined(); + expect(decoder.decode(value!)).toBe("second"); + await store.mdelete([key]); + }); + + test("LocalFileStore queues writes for the same key while a lock is held", async () => { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const store = await LocalFileStore.fromPath(tempDir); + const key = "locked-key"; + + const prototype = Object.getPrototypeOf(store) as { + writeFileAtomically: ( + this: LocalFileStore, + content: Uint8Array, + fullPath: string + ) => Promise; + }; + type WriteFileArgs = [Uint8Array, string]; + const originalWriteFileAtomically = prototype.writeFileAtomically; + const firstWriteGate = createDeferred(); + const writeFileSpy = vi + .spyOn(prototype, "writeFileAtomically") + .mockImplementationOnce(async function ( + this: LocalFileStore, + ...args: WriteFileArgs + ) { + await firstWriteGate.promise; + // Preserve original behavior once the first write is allowed to proceed. + return originalWriteFileAtomically.apply(this, args); + }) + .mockImplementation(function ( + this: LocalFileStore, + ...args: WriteFileArgs + ) { + return originalWriteFileAtomically.apply(this, args); + }); + + try { + const firstWrite = store.mset([[key, encoder.encode("first")]]); + + await expect.poll(() => writeFileSpy.mock.calls.length).toBe(1); + + const secondWrite = store.mset([[key, encoder.encode("second")]]); + + await new Promise((resolve) => setTimeout(resolve, 25)); + + expect(writeFileSpy.mock.calls.length).toBe(1); + + firstWriteGate.resolve(); + + await Promise.all([firstWrite, secondWrite]); + + expect(writeFileSpy.mock.calls.length).toBe(2); + + const [value] = await store.mget([key]); + expect(value).toBeDefined(); + expect(decoder.decode(value!)).toBe("second"); + + const { keyLocks } = store as unknown as { + keyLocks: Map>; + }; + expect(keyLocks.size).toBe(0); + } finally { + writeFileSpy.mockRestore(); + await store.mdelete([key]); + } + }); + + test("LocalFileStore removes orphaned temp files during initialization", async () => { + const cleanupDir = fs.mkdtempSync( + path.join(os.tmpdir(), "file_system_store_cleanup") + ); + const orphanFile = path.join(cleanupDir, "orphan.tmp"); + fs.writeFileSync(orphanFile, "stale"); + + await LocalFileStore.fromPath(cleanupDir); + + const remaining = await fs.promises.readdir(cleanupDir); + expect(remaining).not.toContain("orphan.tmp"); + + await fs.promises.rm(cleanupDir, { recursive: true, force: true }); + }); + test("LocalFileStore can delete values", async () => { const encoder = new TextEncoder(); const store = await LocalFileStore.fromPath(tempDir); From b80fcd77405bb7ae6aa898a1c157e61e2dded53e Mon Sep 17 00:00:00 2001 From: Josh Engle Date: Wed, 12 Nov 2025 16:39:43 -0700 Subject: [PATCH 2/3] update descriptions --- libs/langchain-classic/src/storage/file_system.ts | 1 + libs/langchain/src/storage/file_system.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/libs/langchain-classic/src/storage/file_system.ts b/libs/langchain-classic/src/storage/file_system.ts index cdabf541043e..61e3d6b34c95 100644 --- a/libs/langchain-classic/src/storage/file_system.ts +++ b/libs/langchain-classic/src/storage/file_system.ts @@ -142,6 +142,7 @@ export class LocalFileStore extends BaseStore { /** * Sets the values for the given keys in the store. + * The last value for duplicate keys will be used. * @param keyValuePairs Array of key-value pairs to set in the store. * @returns Promise that resolves when all key-value pairs have been set. */ diff --git a/libs/langchain/src/storage/file_system.ts b/libs/langchain/src/storage/file_system.ts index 94077c0d34b1..9651bf41160a 100644 --- a/libs/langchain/src/storage/file_system.ts +++ b/libs/langchain/src/storage/file_system.ts @@ -142,6 +142,7 @@ export class LocalFileStore extends BaseStore { /** * Sets the values for the given keys in the store. + * The last value for duplicate keys will be used. * @param keyValuePairs Array of key-value pairs to set in the store. * @returns Promise that resolves when all key-value pairs have been set. */ From f207ba45253b7837ac034c1b2432045c4fe5a1e7 Mon Sep 17 00:00:00 2001 From: Josh Engle Date: Wed, 12 Nov 2025 17:06:07 -0700 Subject: [PATCH 3/3] docstrings --- libs/langchain-classic/src/storage/file_system.ts | 5 +++++ libs/langchain/src/storage/file_system.ts | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/libs/langchain-classic/src/storage/file_system.ts b/libs/langchain-classic/src/storage/file_system.ts index 61e3d6b34c95..6b989c2e8b66 100644 --- a/libs/langchain-classic/src/storage/file_system.ts +++ b/libs/langchain-classic/src/storage/file_system.ts @@ -267,6 +267,11 @@ export class LocalFileStore extends BaseStore { } } + /** + * Writes data to a temporary file before atomically renaming it into place. + * @param content Serialized value to persist. + * @param fullPath Destination path for the stored key. + */ private async writeFileAtomically(content: Uint8Array, fullPath: string) { const directory = path.dirname(fullPath); await fs.mkdir(directory, { recursive: true }); diff --git a/libs/langchain/src/storage/file_system.ts b/libs/langchain/src/storage/file_system.ts index 9651bf41160a..b8a4cfc2563c 100644 --- a/libs/langchain/src/storage/file_system.ts +++ b/libs/langchain/src/storage/file_system.ts @@ -268,6 +268,11 @@ export class LocalFileStore extends BaseStore { } } + /** + * Writes data to a temporary file before atomically renaming it into place. + * @param content Serialized value to persist. + * @param fullPath Destination path for the stored key. + */ private async writeFileAtomically(content: Uint8Array, fullPath: string) { const directory = path.dirname(fullPath); await fs.mkdir(directory, { recursive: true });