Skip to content

Commit 9a7b140

Browse files
authored
cacheable - adding in stampede protection via coalesce async (#891)
* cacheable - adding in stampede protection via coalesce async * Update README.md * updating development packages * version bump to v1.8.4
1 parent 66c57aa commit 9a7b140

File tree

5 files changed

+174
-15
lines changed

5 files changed

+174
-15
lines changed

packages/cacheable/README.md

+25-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* Scalable and trusted storage engine by Keyv
1818
* Memory Caching with LRU and Expiration `CacheableMemory`
1919
* Resilient to failures with try/catch and offline
20-
* Wrap / Memoization for Sync and Async Functions
20+
* Wrap / Memoization for Sync and Async Functions with Stampede Protection
2121
* Hooks and Events to extend functionality
2222
* Shorthand for ttl in milliseconds `(1m = 60000) (1h = 3600000) (1d = 86400000)`
2323
* Non-blocking operations for layer 2 caching
@@ -310,6 +310,30 @@ const wrappedFunction = cache.wrap(asyncFunction, options);
310310
console.log(await wrappedFunction(2)); // 4
311311
console.log(await wrappedFunction(2)); // 4 from cache
312312
```
313+
With `Cacheable` we have also included stampede protection so that a `Promise` based call will only be called once if multiple requests of the same are executed at the same time. Here is an example of how to test for stampede protection:
314+
315+
```javascript
316+
import { Cacheable } from 'cacheable';
317+
const asyncFunction = async (value: number) => {
318+
return value;
319+
};
320+
321+
const cache = new Cacheable();
322+
const options = {
323+
ttl: '1h', // 1 hour
324+
keyPrefix: 'p1', // key prefix. This is used if you have multiple functions and need to set a unique prefix.
325+
}
326+
327+
const wrappedFunction = cache.wrap(asyncFunction, options);
328+
const promises = [];
329+
for (let i = 0; i < 10; i++) {
330+
promises.push(wrappedFunction(i));
331+
}
332+
333+
const results = await Promise.all(promises); // all results should be the same
334+
335+
console.log(results); // [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
336+
```
313337

314338
In this example we are wrapping an `async` function in a cache with a `ttl` of `1 hour`. This will cache the result of the function for `1 hour` and then expire the value. You can also wrap a `sync` function in a cache:
315339

packages/cacheable/package.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "cacheable",
3-
"version": "1.8.3",
3+
"version": "1.8.4",
44
"description": "Simple Caching Engine using Keyv",
55
"type": "module",
66
"main": "./dist/index.cjs",
@@ -29,9 +29,9 @@
2929
},
3030
"devDependencies": {
3131
"@keyv/redis": "^3.0.1",
32-
"@types/node": "^22.8.4",
32+
"@types/node": "^22.9.0",
3333
"@vitest/coverage-v8": "^2.1.4",
34-
"lru-cache": "^11.0.1",
34+
"lru-cache": "^11.0.2",
3535
"rimraf": "^6.0.1",
3636
"tsup": "^8.3.5",
3737
"typescript": "^5.6.3",
+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
type PromiseCallback<T = any, E = Error> = {
2+
resolve: (value: T | PromiseLike<T>) => void;
3+
reject: (reason: E) => void;
4+
};
5+
6+
const callbacks = new Map<string, PromiseCallback[]>();
7+
8+
function hasKey(key: string): boolean {
9+
return callbacks.has(key);
10+
}
11+
12+
function addKey(key: string): void {
13+
callbacks.set(key, []);
14+
}
15+
16+
function removeKey(key: string): void {
17+
callbacks.delete(key);
18+
}
19+
20+
function addCallbackToKey<T>(key: string, callback: PromiseCallback<T>): void {
21+
const stash = getCallbacksByKey<T>(key);
22+
stash.push(callback);
23+
callbacks.set(key, stash);
24+
}
25+
26+
function getCallbacksByKey<T>(key: string): Array<PromiseCallback<T>> {
27+
return callbacks.get(key) ?? [];
28+
}
29+
30+
async function enqueue<T>(key: string): Promise<T> {
31+
return new Promise<T>((resolve, reject) => {
32+
const callback: PromiseCallback<T> = {resolve, reject};
33+
addCallbackToKey(key, callback);
34+
});
35+
}
36+
37+
function dequeue<T>(key: string): Array<PromiseCallback<T>> {
38+
const stash = getCallbacksByKey<T>(key);
39+
removeKey(key);
40+
return stash;
41+
}
42+
43+
function coalesce<T>(options: {key: string; error?: Error; result?: T}): void {
44+
const {key, error, result} = options;
45+
46+
for (const callback of dequeue(key)) {
47+
if (error) {
48+
/* c8 ignore next 3 */
49+
callback.reject(error);
50+
} else {
51+
callback.resolve(result);
52+
}
53+
}
54+
}
55+
56+
/**
57+
* Enqueue a promise for the group identified by `key`.
58+
*
59+
* All requests received for the same key while a request for that key
60+
* is already being executed will wait. Once the running request settles
61+
* then all the waiting requests in the group will settle, too.
62+
* This minimizes how many times the function itself runs at the same time.
63+
* This function resolves or rejects according to the given function argument.
64+
*
65+
* @url https://github.com/douglascayers/promise-coalesce
66+
*/
67+
export async function coalesceAsync<T>(
68+
/**
69+
* Any identifier to group requests together.
70+
*/
71+
key: string,
72+
/**
73+
* The function to run.
74+
*/
75+
fnc: () => T | PromiseLike<T>,
76+
): Promise<T> {
77+
if (!hasKey(key)) {
78+
addKey(key);
79+
try {
80+
const result = await Promise.resolve(fnc());
81+
coalesce({key, result});
82+
return result;
83+
} catch (error: any) {
84+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
85+
coalesce({key, error});
86+
// eslint-disable-next-line @typescript-eslint/only-throw-error
87+
throw error;
88+
}
89+
}
90+
91+
return enqueue(key);
92+
}

packages/cacheable/src/wrap.ts

+17-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {hash} from './hash.js';
2+
import {coalesceAsync} from './coalesce-async.js';
23
import {type Cacheable, type CacheableMemory} from './index.js';
34

45
export type WrapFunctionOptions = {
@@ -39,15 +40,22 @@ export function wrap<T>(function_: AnyFunction, options: WrapOptions): AnyFuncti
3940
const {ttl, keyPrefix, cache} = options;
4041

4142
return async function (...arguments_: any[]) {
42-
const cacheKey = createWrapKey(function_, arguments_, keyPrefix);
43-
44-
let value = await cache.get(cacheKey) as T | undefined;
45-
46-
if (value === undefined) {
47-
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
48-
value = await function_(...arguments_) as T;
49-
50-
await cache.set(cacheKey, value, ttl);
43+
let value;
44+
try {
45+
const cacheKey = createWrapKey(function_, arguments_, keyPrefix);
46+
47+
value = await cache.get(cacheKey) as T | undefined;
48+
49+
if (value === undefined) {
50+
value = await coalesceAsync(cacheKey, async () => {
51+
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
52+
const result = await function_(...arguments_) as T;
53+
await cache.set(cacheKey, result, ttl);
54+
return result;
55+
});
56+
}
57+
} catch {
58+
// ignore
5159
}
5260

5361
return value;

packages/cacheable/test/wrap.test.ts

+37-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
22
import {
3-
describe, it, expect,
3+
describe, it, expect, vi,
44
} from 'vitest';
5-
import {Cacheable, CacheableMemory} from '../src/index.js';
5+
import {Cacheable, CacheableMemory, KeyvCacheableMemory} from '../src/index.js';
66
import {
77
wrap, createWrapKey, wrapSync, type WrapOptions, type WrapSyncOptions,
88
} from '../src/wrap.js';
@@ -162,3 +162,38 @@ describe('wrap function', () => {
162162
expect(cacheResult).toBe(undefined);
163163
});
164164
});
165+
166+
describe('wrap function with stampede protection', () => {
167+
it('should only execute the wrapped function once when called concurrently with the same key', async () => {
168+
const cache = new Cacheable();
169+
const mockFunction = vi.fn().mockResolvedValue('result');
170+
const mockedKey = createWrapKey(mockFunction, ['arg1'], 'test');
171+
const wrappedFunction = wrap(mockFunction, {cache, keyPrefix: 'test'});
172+
173+
// Call the wrapped function concurrently
174+
const [result1, result2, result3, result4] = await Promise.all([wrappedFunction('arg1'), wrappedFunction('arg1'), wrappedFunction('arg2'), wrappedFunction('arg2')]);
175+
176+
// Verify that the wrapped function was only called two times do to arg1 and arg2
177+
expect(mockFunction).toHaveBeenCalledTimes(2);
178+
179+
// Verify that both calls returned the same result
180+
expect(result1).toBe('result');
181+
expect(result2).toBe('result');
182+
expect(result3).toBe('result');
183+
184+
// Verify that the result was cached
185+
expect(await cache.has(mockedKey)).toBe(true);
186+
});
187+
188+
it('should handle error if the function fails', async () => {
189+
const cache = new Cacheable();
190+
const mockFunction = vi.fn().mockRejectedValue(new Error('Function failed'));
191+
const mockedKey = createWrapKey(mockFunction, ['arg1'], 'test');
192+
const wrappedFunction = wrap(mockFunction, {cache, keyPrefix: 'test'});
193+
194+
await wrappedFunction('arg1');
195+
196+
// Verify that the wrapped function was only called once
197+
expect(mockFunction).toHaveBeenCalledTimes(1);
198+
});
199+
});

0 commit comments

Comments
 (0)