Skip to content

Commit fcf5c9f

Browse files
authored
feat(api, application-generic): enhance workflow preferences retrieval with batch processing and caching improvements (#11646)
1 parent 364e659 commit fcf5c9f

5 files changed

Lines changed: 531 additions & 51 deletions

File tree

apps/api/src/app/subscribers/usecases/get-subscriber-preference/get-subscriber-preference.usecase.ts

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ export class GetSubscriberPreference {
4040
private notificationTemplateRepository: NotificationTemplateRepository,
4141
private preferencesRepository: PreferencesRepository,
4242
private featureFlagsService: FeatureFlagsService,
43-
private inMemoryLRUCacheService: InMemoryLRUCacheService
43+
private inMemoryLRUCacheService: InMemoryLRUCacheService,
44+
private getPreferences: GetPreferences
4445
) {}
4546

4647
@InstrumentUsecase()
@@ -296,27 +297,16 @@ export class GetSubscriberPreference {
296297
);
297298

298299
const [
299-
workflowResourcePreferences,
300-
workflowUserPreferences,
300+
{ workflowResourcePreferences, workflowUserPreferences },
301301
subscriberWorkflowPreferences,
302302
subscriberGlobalPreferences,
303303
] = await Promise.all([
304-
this.preferencesRepository.findForComputation(
305-
{
306-
...baseQuery,
307-
_templateId: { $in: workflowIds },
308-
type: PreferencesTypeEnum.WORKFLOW_RESOURCE,
309-
},
310-
readOptions
311-
),
312-
this.preferencesRepository.findForComputation(
313-
{
314-
...baseQuery,
315-
_templateId: { $in: workflowIds },
316-
type: PreferencesTypeEnum.USER_WORKFLOW,
317-
},
318-
readOptions
319-
),
304+
this.getWorkflowLevelPreferences({
305+
environmentId,
306+
organizationId,
307+
workflowIds,
308+
readOptions,
309+
}),
320310
this.preferencesRepository.findForComputation(
321311
{
322312
...baseQuery,
@@ -338,6 +328,52 @@ export class GetSubscriberPreference {
338328
};
339329
}
340330

331+
/**
332+
* Resolves the subscriber-independent workflow-level preferences (WORKFLOW_RESOURCE and
333+
* USER_WORKFLOW) for a set of workflows by delegating to the canonical, in-flight-coalesced
334+
* `GetPreferences.getWorkflowPreferencesByIds` reader (shared with the single-workflow path),
335+
* then flattens the per-workflow tuples into the two arrays the merge step consumes.
336+
*
337+
* The returned entries are shared cache references and must be treated as immutable; the merge
338+
* pipeline (`MergePreferences`) only reads them and produces fresh objects.
339+
*/
340+
@Instrument()
341+
private async getWorkflowLevelPreferences({
342+
environmentId,
343+
organizationId,
344+
workflowIds,
345+
readOptions,
346+
}: {
347+
environmentId: string;
348+
organizationId: string;
349+
workflowIds: string[];
350+
readOptions: { readPreference: 'secondaryPreferred' | 'primary' };
351+
}): Promise<{
352+
workflowResourcePreferences: PreferencesEntity[];
353+
workflowUserPreferences: PreferencesEntity[];
354+
}> {
355+
const tuplesByWorkflowId = await this.getPreferences.getWorkflowPreferencesByIds({
356+
environmentId,
357+
organizationId,
358+
workflowIds,
359+
readOptions,
360+
});
361+
362+
const workflowResourcePreferences: PreferencesEntity[] = [];
363+
const workflowUserPreferences: PreferencesEntity[] = [];
364+
365+
for (const [workflowResourcePreference, workflowUserPreference] of tuplesByWorkflowId.values()) {
366+
if (workflowResourcePreference) {
367+
workflowResourcePreferences.push(workflowResourcePreference);
368+
}
369+
if (workflowUserPreference) {
370+
workflowUserPreferences.push(workflowUserPreference);
371+
}
372+
}
373+
374+
return { workflowResourcePreferences, workflowUserPreferences };
375+
}
376+
341377
@Instrument()
342378
private async getActiveWorkflows({
343379
organizationId,

libs/application-generic/src/services/in-memory-lru-cache/in-memory-lru-cache.service.spec.ts

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,144 @@ describe('InMemoryLRUCacheService', () => {
249249
});
250250
});
251251

252+
describe('getMany', () => {
253+
afterEach(() => {
254+
service.invalidateAll(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES);
255+
});
256+
257+
const buildFetchMissing = () =>
258+
jest.fn(async (missingKeys: string[]) => {
259+
const fetched = new Map<string, any>();
260+
for (const key of missingKeys) {
261+
fetched.set(key, [{ id: `resource-${key}` }, { id: `user-${key}` }]);
262+
}
263+
264+
return fetched;
265+
});
266+
267+
it('returns an empty map without fetching when given no keys', async () => {
268+
const fetchMissing = buildFetchMissing();
269+
270+
const result = await service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, [], fetchMissing, {
271+
environmentId: 'env1',
272+
});
273+
274+
expect(result.size).toBe(0);
275+
expect(fetchMissing).not.toHaveBeenCalled();
276+
});
277+
278+
it('fetches all missing keys in a single batch call and caches them', async () => {
279+
featureFlagsService.getFlag.mockResolvedValue(true);
280+
const fetchMissing = buildFetchMissing();
281+
282+
const result = await service.getMany(
283+
InMemoryLRUCacheStore.WORKFLOW_PREFERENCES,
284+
['env1:wf_1', 'env1:wf_2'],
285+
fetchMissing,
286+
{ environmentId: 'env1', organizationId: 'org1' }
287+
);
288+
289+
expect(fetchMissing).toHaveBeenCalledTimes(1);
290+
expect(fetchMissing).toHaveBeenCalledWith(['env1:wf_1', 'env1:wf_2']);
291+
expect(featureFlagsService.getFlag).toHaveBeenCalledWith(
292+
expect.objectContaining({ key: FeatureFlagsKeysEnum.IS_LRU_CACHE_ENABLED })
293+
);
294+
expect(result.get('env1:wf_1')).toEqual([{ id: 'resource-env1:wf_1' }, { id: 'user-env1:wf_1' }]);
295+
expect(result.get('env1:wf_2')).toEqual([{ id: 'resource-env1:wf_2' }, { id: 'user-env1:wf_2' }]);
296+
});
297+
298+
it('serves cache hits without calling fetchMissing again', async () => {
299+
featureFlagsService.getFlag.mockResolvedValue(true);
300+
const fetchMissing = buildFetchMissing();
301+
302+
await service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1'], fetchMissing, {
303+
environmentId: 'env1',
304+
});
305+
const result = await service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1'], fetchMissing, {
306+
environmentId: 'env1',
307+
});
308+
309+
expect(fetchMissing).toHaveBeenCalledTimes(1);
310+
expect(result.get('env1:wf_1')).toEqual([{ id: 'resource-env1:wf_1' }, { id: 'user-env1:wf_1' }]);
311+
});
312+
313+
it('only fetches the missing keys when the cache is partially warm', async () => {
314+
featureFlagsService.getFlag.mockResolvedValue(true);
315+
const fetchMissing = buildFetchMissing();
316+
317+
await service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1'], fetchMissing, {
318+
environmentId: 'env1',
319+
});
320+
const result = await service.getMany(
321+
InMemoryLRUCacheStore.WORKFLOW_PREFERENCES,
322+
['env1:wf_1', 'env1:wf_2'],
323+
fetchMissing,
324+
{ environmentId: 'env1' }
325+
);
326+
327+
expect(fetchMissing).toHaveBeenCalledTimes(2);
328+
expect(fetchMissing).toHaveBeenNthCalledWith(2, ['env1:wf_2']);
329+
expect(result.size).toBe(2);
330+
});
331+
332+
it('coalesces concurrent callers for the same key into a single fetch', async () => {
333+
featureFlagsService.getFlag.mockResolvedValue(true);
334+
let fetchCount = 0;
335+
const fetchMissing = jest.fn(
336+
(missingKeys: string[]) =>
337+
new Promise<Map<string, any>>((resolve) => {
338+
setTimeout(() => {
339+
fetchCount++;
340+
const fetched = new Map<string, any>();
341+
for (const key of missingKeys) {
342+
fetched.set(key, [{ id: `resource-${fetchCount}` }, null]);
343+
}
344+
resolve(fetched);
345+
}, 10);
346+
})
347+
);
348+
349+
const [result1, result2, result3] = await Promise.all([
350+
service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1'], fetchMissing, {
351+
environmentId: 'env1',
352+
}),
353+
service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1'], fetchMissing, {
354+
environmentId: 'env1',
355+
}),
356+
service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1'], fetchMissing, {
357+
environmentId: 'env1',
358+
}),
359+
]);
360+
361+
expect(fetchMissing).toHaveBeenCalledTimes(1);
362+
expect(result1.get('env1:wf_1')).toEqual([{ id: 'resource-1' }, null]);
363+
expect(result2.get('env1:wf_1')).toEqual([{ id: 'resource-1' }, null]);
364+
expect(result3.get('env1:wf_1')).toEqual([{ id: 'resource-1' }, null]);
365+
});
366+
367+
it('bypasses the cache and fetches every key when the feature flag is disabled', async () => {
368+
featureFlagsService.getFlag.mockResolvedValue(false);
369+
const fetchMissing = buildFetchMissing();
370+
371+
const result = await service.getMany(
372+
InMemoryLRUCacheStore.WORKFLOW_PREFERENCES,
373+
['env1:wf_1', 'env1:wf_2'],
374+
fetchMissing,
375+
{ environmentId: 'env1' }
376+
);
377+
378+
expect(fetchMissing).toHaveBeenCalledTimes(1);
379+
expect(fetchMissing).toHaveBeenCalledWith(['env1:wf_1', 'env1:wf_2']);
380+
expect(result.size).toBe(2);
381+
382+
await service.getMany(InMemoryLRUCacheStore.WORKFLOW_PREFERENCES, ['env1:wf_1', 'env1:wf_2'], fetchMissing, {
383+
environmentId: 'env1',
384+
});
385+
386+
expect(fetchMissing).toHaveBeenCalledTimes(2);
387+
});
388+
});
389+
252390
describe('getIfCached', () => {
253391
it('should return undefined for non-existent key', () => {
254392
const result = service.getIfCached(InMemoryLRUCacheStore.WORKFLOW, 'nonexistent');

libs/application-generic/src/services/in-memory-lru-cache/in-memory-lru-cache.service.ts

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@ import { CacheStoreDataTypeMap, InMemoryLRUCacheStore, STORE_CONFIGS, StoreConfi
66

77
type EntityStore<T = unknown> = {
88
cache: LRUCache<string, T>;
9-
inflightRequests: Map<string, Promise<T>>;
9+
/**
10+
* In-flight fetches keyed by effective cache key, shared by `get()` and `getMany()` so
11+
* concurrent callers coalesce onto one fetch. The resolved value is `T | undefined` because
12+
* `getMany()` registers a promise per requested key and a batch fetch may legitimately omit a
13+
* key (resolving `undefined`); `get()` treats an `undefined` resolution as a miss and falls
14+
* back to its own fetch, so the two methods can safely share a store.
15+
*/
16+
inflightRequests: Map<string, Promise<T | undefined>>;
1017
config: StoreConfig;
1118
};
1219

@@ -45,7 +52,11 @@ export class InMemoryLRUCacheService {
4552

4653
const inflightRequest = store.inflightRequests.get(effectiveKey);
4754
if (inflightRequest) {
48-
return inflightRequest;
55+
const inflightResult = await inflightRequest;
56+
if (inflightResult !== undefined) {
57+
return inflightResult;
58+
}
59+
// A shared `getMany()` batch had no value for this key — fall through to our own fetch.
4960
}
5061

5162
const fetchPromise = fetchFn()
@@ -65,6 +76,93 @@ export class InMemoryLRUCacheService {
6576
return fetchPromise;
6677
}
6778

79+
/**
80+
* Batch variant of `get()` that resolves many keys at once while preserving the same
81+
* caching and in-flight coalescing semantics per key:
82+
* - cache hits are served from memory,
83+
* - keys with an in-flight fetch (from this or a concurrent call) reuse that promise,
84+
* - only the remaining keys are passed to `fetchMissing` in a single call.
85+
*
86+
* This keeps the cold-cache/TTL-expiry path stampede-safe (K concurrent callers for the same
87+
* key set trigger one `fetchMissing` per key, not K), which a manual `getIfCached()` + `set()`
88+
* loop would lose. `fetchMissing` is expected to return an entry for every requested key.
89+
*/
90+
async getMany<TStore extends InMemoryLRUCacheStore>(
91+
storeName: TStore,
92+
keys: string[],
93+
fetchMissing: (missingKeys: string[]) => Promise<Map<string, CacheStoreDataTypeMap[TStore]>>,
94+
opts?: GetOptions
95+
): Promise<Map<string, CacheStoreDataTypeMap[TStore]>> {
96+
const result = new Map<string, CacheStoreDataTypeMap[TStore]>();
97+
98+
if (keys.length === 0) {
99+
return result;
100+
}
101+
102+
const store = this.getOrCreateStore<CacheStoreDataTypeMap[TStore]>(storeName);
103+
const isCacheEnabled = await this.isCacheEnabled(store.config, opts);
104+
105+
if (!isCacheEnabled || opts?.skipCache) {
106+
return fetchMissing(keys);
107+
}
108+
109+
const pending: Array<{ key: string; promise: Promise<CacheStoreDataTypeMap[TStore] | undefined> }> = [];
110+
const missingKeys: string[] = [];
111+
112+
for (const key of keys) {
113+
const effectiveKey = this.resolveKey(key, opts?.cacheVariant);
114+
115+
const cached = store.cache.get(effectiveKey);
116+
if (cached !== undefined) {
117+
result.set(key, cached);
118+
continue;
119+
}
120+
121+
const inflightRequest = store.inflightRequests.get(effectiveKey);
122+
if (inflightRequest) {
123+
pending.push({ key, promise: inflightRequest });
124+
continue;
125+
}
126+
127+
missingKeys.push(key);
128+
}
129+
130+
if (missingKeys.length > 0) {
131+
const batchPromise = fetchMissing(missingKeys);
132+
133+
for (const key of missingKeys) {
134+
const effectiveKey = this.resolveKey(key, opts?.cacheVariant);
135+
136+
const perKeyPromise = batchPromise
137+
.then((fetched) => {
138+
const value = fetched.get(key);
139+
if (value !== null && value !== undefined) {
140+
store.cache.set(effectiveKey, value);
141+
}
142+
143+
return value;
144+
})
145+
.finally(() => {
146+
store.inflightRequests.delete(effectiveKey);
147+
});
148+
149+
store.inflightRequests.set(effectiveKey, perKeyPromise);
150+
pending.push({ key, promise: perKeyPromise });
151+
}
152+
}
153+
154+
await Promise.all(
155+
pending.map(async ({ key, promise }) => {
156+
const value = await promise;
157+
if (value !== undefined) {
158+
result.set(key, value);
159+
}
160+
})
161+
);
162+
163+
return result;
164+
}
165+
68166
getIfCached<TStore extends InMemoryLRUCacheStore>(
69167
storeName: TStore,
70168
key: string
@@ -124,7 +222,7 @@ export class InMemoryLRUCacheService {
124222
max: config.max,
125223
ttl: config.ttl,
126224
}),
127-
inflightRequests: new Map<string, Promise<T>>(),
225+
inflightRequests: new Map<string, Promise<T | undefined>>(),
128226
config,
129227
};
130228
STORES.set(storeName, store as EntityStore);

0 commit comments

Comments
 (0)