From c753a1801887df8cb2c1e04a687cb2adbab09b46 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 31 Oct 2024 18:39:21 +0200 Subject: [PATCH 1/9] CSC POC ontop of Parser --- packages/client/index.ts | 3 + packages/client/lib/client/commands-queue.ts | 21 ++++++- packages/client/lib/client/index.ts | 60 ++++++++++++++---- packages/client/lib/client/linked-list.ts | 10 ++- packages/client/lib/client/parser.ts | 5 ++ packages/client/lib/client/pool.ts | 61 ++++++++++++++++--- packages/client/lib/client/socket.ts | 7 +++ packages/client/lib/cluster/cluster-slots.ts | 18 +++++- packages/client/lib/cluster/index.ts | 49 +++++++++++---- packages/client/lib/commands/GEOSEARCH.ts | 5 -- .../client/lib/commands/GEOSEARCHSTORE.ts | 7 ++- packages/client/lib/sentinel/index.spec.ts | 33 +++++++++- packages/client/lib/sentinel/index.ts | 23 ++++++- packages/client/lib/sentinel/test-util.ts | 14 +++-- packages/client/lib/sentinel/types.ts | 5 ++ packages/client/lib/sentinel/utils.ts | 2 +- 16 files changed, 270 insertions(+), 53 deletions(-) diff --git a/packages/client/index.ts b/packages/client/index.ts index 56cdf703ca3..d4dceb357f2 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -20,6 +20,9 @@ import RedisSentinel from './lib/sentinel'; export { RedisSentinelOptions, RedisSentinelType } from './lib/sentinel/types'; export const createSentinel = RedisSentinel.create; +import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache'; +export { BasicClientSideCache, BasicPooledClientSideCache }; + // export { GeoReplyWith } from './lib/commands/generic-transformers'; // export { SetOptions } from './lib/commands/SET'; diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 15e8a747b98..0199aa92c11 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -56,6 +56,8 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } + #invalidateCallback?: (key: RedisArgument | null) => unknown; + constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -109,13 +111,30 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { - + switch (push[0].toString()) { + case "invalidate": { + if (this.#invalidateCallback) { + if (push[1] !== null) { + for (const key of push[1]) { + this.#invalidateCallback(key); + } + } else { + this.#invalidateCallback(null); + } + } + break; + } + } } }, getTypeMapping: () => this.#getTypeMapping() }); } + setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { + this.#invalidateCallback = callback; + } + addCommand( args: ReadonlyArray, options?: CommandOptions diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5dae1271ecb..331a37dc9e1 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisPoolOptions, RedisClientPool } from './pool'; import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers'; +import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache'; import { BasicCommandParser, CommandParser } from './parser'; export interface RedisClientOptions< @@ -80,6 +81,10 @@ export interface RedisClientOptions< * TODO */ commandOptions?: CommandOptions; + /** + * TODO + */ + clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; } type WithCommands< @@ -300,9 +305,8 @@ export default class RedisClient< private _self = this; private _commandOptions?: CommandOptions; #dirtyWatch?: string; - #epoch: number; #watchEpoch?: number; - + #clientSideCache?: ClientSideCacheProvider; #credentialsSubscription: Disposable | null = null; get options(): RedisClientOptions | undefined { @@ -321,6 +325,11 @@ export default class RedisClient< return this._self.#queue.isPubSubActive; } + get socketEpoch() { + return this._self.#socket.socketEpoch; + } + + get isWatching() { return this._self.#watchEpoch !== undefined; } @@ -331,10 +340,20 @@ export default class RedisClient< constructor(options?: RedisClientOptions) { super(); + this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); - this.#epoch = 0; + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof ClientSideCacheProvider) { + this.#clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = new BasicClientSideCache(cscConfig); + } + this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache)); + } } #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { @@ -495,6 +514,13 @@ export default class RedisClient< ); } + if (this.#clientSideCache) { + const tracking = this.#clientSideCache.trackingOn(); + if (tracking) { + commands.push(tracking); + } + } + return commands; } @@ -548,6 +574,7 @@ export default class RedisClient< }) .on('error', err => { this.emit('error', err); + this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { @@ -556,7 +583,6 @@ export default class RedisClient< }) .on('connect', () => this.emit('connect')) .on('ready', () => { - this.#epoch++; this.emit('ready'); this.#setPingTimer(); this.#maybeScheduleWrite(); @@ -684,13 +710,21 @@ export default class RedisClient< commandOptions: CommandOptions | undefined, transformReply: TransformReply | undefined, ) { - const reply = await this.sendCommand(parser.redisArgs, commandOptions); + const csc = this._self.#clientSideCache; + const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; - if (transformReply) { - return transformReply(reply, parser.preserve, commandOptions?.typeMapping); - } + const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) }; - return reply; + if (csc && command.CACHEABLE && defaultTypeMapping) { + return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping); + } else { + const reply = await fn(); + + if (transformReply) { + return transformReply(reply, parser.preserve, commandOptions?.typeMapping); + } + return reply; + } } /** @@ -855,7 +889,7 @@ export default class RedisClient< const reply = await this._self.sendCommand( pushVariadicArguments(['WATCH'], key) ); - this._self.#watchEpoch ??= this._self.#epoch; + this._self.#watchEpoch ??= this._self.socketEpoch; return reply as unknown as ReplyWithTypeMapping, TYPE_MAPPING>; } @@ -922,7 +956,7 @@ export default class RedisClient< } const chainId = Symbol('Pipeline Chain'), - promise = Promise.all( + promise = Promise.allSettled( commands.map(({ args }) => this._self.#queue.addCommand(args, { chainId, typeMapping: this._commandOptions?.typeMapping @@ -958,7 +992,7 @@ export default class RedisClient< throw new WatchError(dirtyWatch); } - if (watchEpoch && watchEpoch !== this._self.#epoch) { + if (watchEpoch && watchEpoch !== this._self.socketEpoch) { throw new WatchError('Client reconnected after WATCH'); } @@ -1182,6 +1216,7 @@ export default class RedisClient< return new Promise(resolve => { clearTimeout(this._self.#pingTimer); this._self.#socket.close(); + this._self.#clientSideCache?.onClose(); if (this._self.#queue.isEmpty()) { this._self.#socket.destroySocket(); @@ -1208,6 +1243,7 @@ export default class RedisClient< clearTimeout(this._self.#pingTimer); this._self.#queue.flushAll(new DisconnectsClientError()); this._self.#socket.destroy(); + this._self.#clientSideCache?.onClose(); this._self.#credentialsSubscription?.dispose(); this._self.#credentialsSubscription = null; } diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index ac1d021be91..29678f027b5 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -114,6 +114,7 @@ export class DoublyLinkedList { export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; + removed: boolean; } export class SinglyLinkedList { @@ -140,7 +141,8 @@ export class SinglyLinkedList { const node = { value, - next: undefined + next: undefined, + removed: false }; if (this.#head === undefined) { @@ -151,6 +153,9 @@ export class SinglyLinkedList { } remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + if (node.removed) { + throw new Error("node already removed"); + } --this.#length; if (this.#head === node) { @@ -165,6 +170,8 @@ export class SinglyLinkedList { } else { parent!.next = node.next; } + + node.removed = true; } shift() { @@ -177,6 +184,7 @@ export class SinglyLinkedList { this.#head = node.next; } + node.removed = true; return node.value; } diff --git a/packages/client/lib/client/parser.ts b/packages/client/lib/client/parser.ts index 12eec457739..76251ea67a7 100644 --- a/packages/client/lib/client/parser.ts +++ b/packages/client/lib/client/parser.ts @@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser { return this.#keys[0]; } + get cacheKey() { + let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_'); + return cacheKey + '_' + this.#redisArgs.join('_'); + } + push(...arg: Array) { this.#redisArgs.push(...arg); }; diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index a08377e3d38..400268f4bb5 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,6 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache'; import { BasicCommandParser } from './parser'; export interface RedisPoolOptions { @@ -26,6 +27,10 @@ export interface RedisPoolOptions { * TODO */ cleanupDelay: number; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; /** * TODO */ @@ -117,7 +122,7 @@ export class RedisClientPool< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping = {} >( - clientOptions?: RedisClientOptions, + clientOptions?: Omit, "clientSideCache">, options?: Partial ) { const Pool = attachConfig({ @@ -135,7 +140,7 @@ export class RedisClientPool< // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create( new Pool( - RedisClient.factory(clientOptions).bind(undefined, clientOptions), + clientOptions, options ) ) as RedisClientPoolType; @@ -209,22 +214,39 @@ export class RedisClientPool< return this._self.#isClosing; } + #clientSideCache?: PooledClientSideCacheProvider; + /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... */ constructor( - clientFactory: () => RedisClientType, + clientOptions?: RedisClientOptions, options?: Partial ) { super(); - this.#clientFactory = clientFactory; this.#options = { ...RedisClientPool.#DEFAULTS, ...options }; + if (options?.clientSideCache) { + if (clientOptions === undefined) { + clientOptions = {}; + } + + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = clientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); + this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); + } + } + + this.#clientFactory = RedisClient.factory(clientOptions).bind(undefined, clientOptions) as () => RedisClientType; } private _self = this; @@ -288,9 +310,15 @@ export class RedisClientPool< async connect() { if (this._self.#isOpen) return; // TODO: throw error? - this._self.#isOpen = true; + try { + this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory); + } catch (err) { + this.destroy(); + throw err; + } + const promises = []; while (promises.length < this._self.#options.minimum) { promises.push(this._self.#create()); @@ -298,21 +326,27 @@ export class RedisClientPool< try { await Promise.all(promises); - return this as unknown as RedisClientPoolType; } catch (err) { this.destroy(); throw err; } + + return this as unknown as RedisClientPoolType; } - async #create() { + async #create(redirect?: boolean) { const node = this._self.#clientsInUse.push( this._self.#clientFactory() .on('error', (err: Error) => this.emit('error', err)) ); try { - await node.value.connect(); + const client = node.value; + if (this._self.#clientSideCache) { + this._self.#clientSideCache.addClient(node.value); + } + + await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); throw err; @@ -401,7 +435,9 @@ export class RedisClientPool< const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop - this.#idleClients.shift()!.destroy(); + const client = this.#idleClients.shift()! + this.#clientSideCache?.removeClient(client); + client.destroy(); } } @@ -439,8 +475,10 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { promises.push(client.close()); } - + await Promise.all(promises); + + this.#clientSideCache?.onPoolClose(); this._self.#idleClients.reset(); this._self.#clientsInUse.reset(); @@ -460,6 +498,9 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { client.destroy(); } + + this._self.#clientSideCache?.onPoolClose(); + this._self.#clientsInUse.reset(); this._self.#isOpen = false; diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 36afa36c04a..603416cf9ed 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -72,6 +72,12 @@ export default class RedisSocket extends EventEmitter { #isSocketUnrefed = false; + #socketEpoch = 0; + + get socketEpoch() { + return this.#socketEpoch; + } + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -212,6 +218,7 @@ export default class RedisSocket extends EventEmitter { throw err; } this.#isReady = true; + this.#socketEpoch++; this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 3a4adff73c4..a727dc4c1e1 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -6,6 +6,7 @@ import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pu import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; import calculateSlot from 'cluster-key-slot'; import { RedisSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface NodeAddress { host: string; @@ -111,6 +112,7 @@ export default class RedisClusterSlots< replicas = new Array>(); readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; + clientSideCache?: PooledClientSideCacheProvider; #isOpen = false; @@ -123,7 +125,16 @@ export default class RedisClusterSlots< emit: EventEmitter['emit'] ) { this.#options = options; - this.#clientFactory = RedisClient.factory(options); + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.clientSideCache = options.clientSideCache; + } else { + this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache) + } + } + + this.#clientFactory = RedisClient.factory(this.#options); this.#emit = emit; } @@ -164,6 +175,8 @@ export default class RedisClusterSlots< } async #discover(rootNode: RedisClusterClientOptions) { + this.clientSideCache?.clear(); + this.clientSideCache?.disable(); try { const addressesInUse = new Set(), promises: Array> = [], @@ -219,6 +232,7 @@ export default class RedisClusterSlots< } await Promise.all(promises); + this.clientSideCache?.enable(); return true; } catch (err) { @@ -314,6 +328,8 @@ export default class RedisClusterSlots< #createClient(node: ShardNode, readonly = node.readonly) { return this.#clientFactory( this.#clientOptionsDefaults({ + clientSideCache: this.clientSideCache, + RESP: this.#options.RESP, socket: this.#getNodeAddress(node.address) ?? { host: node.host, port: node.port diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 12928e71f12..cfe67193989 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -9,9 +9,9 @@ import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi- import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; import { RedisTcpSocketOptions } from '../client/socket'; -import ASKING from '../commands/ASKING'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; import { BasicCommandParser } from '../client/parser'; -import { parseArgs } from '../commands/generic-transformers'; +import { ASKING_CMD } from '../commands/ASKING'; interface ClusterCommander< M extends RedisModules, @@ -66,6 +66,10 @@ export interface RedisClusterOptions< * Useful when the cluster is running on another network */ nodeAddressMap?: NodeAddressMap; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } // remove once request & response policies are ready @@ -148,6 +152,7 @@ export default class RedisCluster< > extends EventEmitter { static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); + return async function (this: ProxyCluster, ...args: Array) { const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); @@ -382,6 +387,27 @@ export default class RedisCluster< // return this._commandOptionsProxy('policies', policies); // } + #handleAsk( + fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise + ) { + return async (client: RedisClientType, options?: ClusterCommandOptions) => { + const chainId = Symbol("asking chain"); + const opts = options ? {...options} : {}; + opts.chainId = chainId; + + + + const ret = await Promise.all( + [ + client.sendCommand([ASKING_CMD], {chainId: chainId}), + fn(client, opts) + ] + ); + + return ret[1]; + }; + } + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, @@ -391,14 +417,15 @@ export default class RedisCluster< const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = await this.#slots.getClient(firstKey, isReadonly); let i = 0; - let myOpts = options; + + let myFn = fn; while (true) { try { - return await fn(client, myOpts); + return await myFn(client, options); } catch (err) { - // reset to passed in options, if changed by an ask request - myOpts = options; + myFn = fn; + // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; @@ -417,13 +444,7 @@ export default class RedisCluster< } client = redirectTo; - - const chainId = Symbol('Asking Chain'); - myOpts = options ? {...options} : {}; - myOpts.chainId = chainId; - - client.sendCommand(parseArgs(ASKING), {chainId: chainId}).catch(err => { console.log(`Asking Failed: ${err}`) } ); - + myFn = this.#handleAsk(fn); continue; } @@ -574,10 +595,12 @@ export default class RedisCluster< } close() { + this.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.close(); } destroy() { + this.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.destroy(); } diff --git a/packages/client/lib/commands/GEOSEARCH.ts b/packages/client/lib/commands/GEOSEARCH.ts index 8c77fd89239..869dc60bec9 100644 --- a/packages/client/lib/commands/GEOSEARCH.ts +++ b/packages/client/lib/commands/GEOSEARCH.ts @@ -29,12 +29,7 @@ export function parseGeoSearchArguments( from: GeoSearchFrom, by: GeoSearchBy, options?: GeoSearchOptions, - store?: RedisArgument ) { - if (store !== undefined) { - parser.pushKey(store); - } - parser.pushKey(key); if (typeof from === 'string' || from instanceof Buffer) { diff --git a/packages/client/lib/commands/GEOSEARCHSTORE.ts b/packages/client/lib/commands/GEOSEARCHSTORE.ts index eb8e12abe6d..34c6e0988e2 100644 --- a/packages/client/lib/commands/GEOSEARCHSTORE.ts +++ b/packages/client/lib/commands/GEOSEARCHSTORE.ts @@ -17,7 +17,12 @@ export default { options?: GeoSearchStoreOptions ) { parser.push('GEOSEARCHSTORE'); - parseGeoSearchArguments(parser, source, from, by, options, destination); + + if (destination !== undefined) { + parser.pushKey(destination); + } + + parseGeoSearchArguments(parser, source, from, by, options); if (options?.STOREDIST) { parser.push('STOREDIST'); diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index be5522bdd8d..0cf8d80abe1 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -10,11 +10,12 @@ import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, import { promisify } from 'node:util'; import { exec } from 'node:child_process'; import { RESP_TYPES } from '../RESP/decoder'; -import { defineScript } from '../lua-script'; import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; import RedisBloomModules from '@redis/bloom'; +import { BasicPooledClientSideCache } from '../client/cache'; import { RedisTcpSocketOptions } from '../client/socket'; import { SQUARE_SCRIPT } from '../client/index.spec'; +import { once } from 'node:events'; const execAsync = promisify(exec); @@ -78,7 +79,7 @@ async function steadyState(frame: SentinelFramework) { } ["redis-sentinel-test-password", undefined].forEach(function (password) { - describe.skip(`Sentinel - password = ${password}`, () => { + describe(`Sentinel - password = ${password}`, () => { const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password }; const frame = new SentinelFramework(config); let tracer = new Array(); @@ -1136,6 +1137,34 @@ async function steadyState(frame: SentinelFramework) { tracer.push("added node and waiting on added promise"); await nodeAddedPromise; }) + + it('with client side caching', async function() { + this.timeout(30000); + const csc = new BasicPooledClientSideCache(); + + sentinel = frame.getSentinelClient({nodeClientOptions: {RESP: 3}, clientSideCache: csc, masterPoolSize: 5}); + await sentinel.connect(); + + await sentinel.set('x', 1); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(1, csc.cacheMisses()); + assert.equal(3, csc.cacheHits()); + + const invalidatePromise = once(csc, 'invalidate'); + await sentinel.set('x', 2); + await invalidatePromise; + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(csc.cacheMisses(), 2); + assert.equal(csc.cacheHits(), 6); + }) }) describe('Sentinel Factory', function () { diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 92a87fbb145..3e358765583 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -16,6 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers'; import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache'; interface ClientInfo { id: number; @@ -590,6 +591,11 @@ class RedisSentinelInternal< #trace: (msg: string) => unknown = () => { }; + #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this.#clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -602,11 +608,22 @@ class RedisSentinelInternal< this.#scanInterval = options.scanInterval ?? 0; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; - this.#nodeClientOptions = options.nodeClientOptions ? Object.assign({} as RedisClientOptions, options.nodeClientOptions) : {}; + this.#nodeClientOptions = options.nodeClientOptions ? {...options.nodeClientOptions} : {}; if (this.#nodeClientOptions.url !== undefined) { throw new Error("invalid nodeClientOptions for Sentinel"); } + if (options.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); + } + } + this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions, options.sentinelClientOptions) : {}; this.#sentinelClientOptions.modules = RedisSentinelModule; @@ -827,6 +844,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; @@ -875,6 +894,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 25dd4c4371a..de6c90a70b0 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -188,18 +188,22 @@ export class SentinelFramework extends DockerBase { } const options: RedisSentinelOptions = { + ...opts, name: this.config.sentinelName, sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }), passthroughClientErrorEvents: errors } if (this.config.password !== undefined) { - options.nodeClientOptions = {password: this.config.password}; - options.sentinelClientOptions = {password: this.config.password}; - } + if (!options.nodeClientOptions) { + options.nodeClientOptions = {}; + } + options.nodeClientOptions.password = this.config.password; - if (opts) { - Object.assign(options, opts); + if (!options.sentinelClientOptions) { + options.sentinelClientOptions = {}; + } + options.sentinelClientOptions = {password: this.config.password}; } return RedisSentinel.create(options); diff --git a/packages/client/lib/sentinel/types.ts b/packages/client/lib/sentinel/types.ts index 428e7bccd66..da1e1c81f7e 100644 --- a/packages/client/lib/sentinel/types.ts +++ b/packages/client/lib/sentinel/types.ts @@ -4,6 +4,7 @@ import { CommandSignature, CommanderConfig, RedisFunctions, RedisModules, RedisS import COMMANDS from '../commands'; import RedisSentinel, { RedisSentinelClient } from '.'; import { RedisTcpSocketOptions } from '../client/socket'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; export interface RedisNode { host: string; @@ -61,6 +62,10 @@ export interface RedisSentinelOptions< * When `false`, the sentinel object will wait for the first available client from the pool. */ reserveClient?: boolean; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } export interface SentinelCommander< diff --git a/packages/client/lib/sentinel/utils.ts b/packages/client/lib/sentinel/utils.ts index 90b789ddca9..7e2404c2f7a 100644 --- a/packages/client/lib/sentinel/utils.ts +++ b/packages/client/lib/sentinel/utils.ts @@ -1,5 +1,5 @@ -import { BasicCommandParser } from '../client/parser'; import { ArrayReply, Command, RedisFunction, RedisScript, RespVersions, UnwrapReply } from '../RESP/types'; +import { BasicCommandParser } from '../client/parser'; import { RedisSocketOptions, RedisTcpSocketOptions } from '../client/socket'; import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode } from './types'; From 35fb2b8e454fce72495cf7fdb0ea2c83bff0d68d Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Mon, 4 Nov 2024 17:59:21 +0200 Subject: [PATCH 2/9] add csc file that weren't merged after patch --- packages/client/lib/client/README-cache.md | 64 +++ packages/client/lib/client/cache.spec.ts | 460 ++++++++++++++++++ packages/client/lib/client/cache.ts | 533 +++++++++++++++++++++ 3 files changed, 1057 insertions(+) create mode 100644 packages/client/lib/client/README-cache.md create mode 100644 packages/client/lib/client/cache.spec.ts create mode 100644 packages/client/lib/client/cache.ts diff --git a/packages/client/lib/client/README-cache.md b/packages/client/lib/client/README-cache.md new file mode 100644 index 00000000000..01a43cf4594 --- /dev/null +++ b/packages/client/lib/client/README-cache.md @@ -0,0 +1,64 @@ +# Client Side Caching Support + +Client Side Caching enables Redis Servers and Clients to work together to enable a client to cache results from command sent to a server and be informed by the server when the cached result is no longer valid. + +## Usage + +node-redis supports two ways of instantiating client side caching support + +Note: Client Side Caching is only supported with RESP3. + +### Anonymous Cache + +```javascript +const client = createClient({RESP: 3, clientSideCache: {ttl: 0, maxEntries: 0, lru: false}}) +``` + +In this instance, the cache is opaque to the user, and they have no control over it. + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicClientSideCache(ttl, maxEntries, lru); +const client = createClient({RESP: 3, clientSideCache: cache}); +``` + +In this instance, the user has full control over the cache, as they have access to the cache object. + +They can manually invalidate keys + +```javascript +cache.invalidate(key); +``` + +they can clear the entire cache +g +```javascript +cache.clear(); +``` + +as well as get cache metrics + +```typescript +const hits: number = cache.cacheHits(); +const misses: number = cache.cacheMisses(); +``` + +## Pooled Caching + +Similar to individual clients, node-redis also supports caching for its pooled client object, with the cache being able to be instantiated in an anonymous manner or a controllable manner. + +### Anonymous Cache + +```javascript +const client = createClientPool({RESP: 3}, {clientSideCache: {ttl: 0, maxEntries: 0, lru: false}, minimum: 8}); +``` + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicPooledClientSideCache(ttl, maxEntries, lru); +const client = createClientPool({RESP: 3}, {clientSideCache: cache, minimum: 8}); +``` \ No newline at end of file diff --git a/packages/client/lib/client/cache.spec.ts b/packages/client/lib/client/cache.spec.ts new file mode 100644 index 00000000000..ee08d3a69f4 --- /dev/null +++ b/packages/client/lib/client/cache.spec.ts @@ -0,0 +1,460 @@ +import assert from "assert"; +import testUtils, { GLOBAL } from "../test-utils" +import { BasicClientSideCache, BasicPooledClientSideCache } from "./cache" +import { REDIS_FLUSH_MODES } from "../commands/FLUSHALL"; +import { once } from 'events'; + +describe("Client Side Cache", () => { + describe('Basic Cache', () => { + const csc = new BasicClientSideCache({ maxEntries: 10 }); + + /* cacheNotEmpty */ + testUtils.testWithClient('Basic Cache Miss', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* cacheUsedTest */ + testUtils.testWithClient('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 1, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Max Cache Entries', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.cacheMisses(), 12, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('LRU works correctly', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.cacheMisses(), 11, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.clear(); + await client.get("x"); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Null Invalidate acts as clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.invalidate(null); + await client.get("x"); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('flushdb causes an invalidate null', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.flushDb(REDIS_FLUSH_MODES.SYNC); + assert.equal(await client.get("x"), null); + + assert.equal(csc.cacheMisses(), 2, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1', 'first get'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2', 'second get'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3', 'third get'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* immutableCacheEntriesTest */ + testUtils.testWithClient("Cached Replies Don't Mutate", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + ret1[0] = '4'; + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + ret2[0] = '8'; + const ret3 = await client.mGet(['x', 'y']); + assert.deepEqual(ret3, ['1', '2'], 'third mGet'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* clearOnDisconnectTest */ + testUtils.testWithClient("Cached cleared on disconnect", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + + assert.equal(csc.cacheMisses(), 1, "first Cache Misses"); + assert.equal(csc.cacheHits(), 0, "first Cache Hits"); + + await client.close(); + + await client.connect(); + + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 1, "second Cache Misses"); + assert.equal(csc.cacheHits(), 0, "second Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + }); + + describe.only("Pooled Cache", () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithClient('Virtual Pool Disconnect', async client1 => { + const client2 = client1.duplicate(); + await client2.connect() + + assert.equal(await client2.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(1, csc.cacheMisses(), "Cache Misses"); + assert.equal(1, csc.cacheHits(), "Cache Hits"); + + await client2.close(); + + assert.equal(await client1.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(2, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* cacheNotEmpty */ + testUtils.testWithClientPool('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + + assert.equal(1, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* cacheUsedTest */ + testUtils.testWithClientPool('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* invalidationTest 1 */ + testUtils.testWithClientPool('Basic Cache Manually Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + + assert.equal(await client.get("x"), '1', 'first get'); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 2); + let [i] = await p; + + assert.equal(await client.get("x"), '2', 'second get'); + + p = once(csc, 'invalidate'); + await client.set("x", 3); + [i] = await p; + + assert.equal(await client.get("x"), '3'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* invalidationTest 2 */ + testUtils.testWithClientPool('Basic Cache Invalidate via message', async client => { + csc.clear(); + + await client.set('x', 1); + await client.set('y', 2); + + assert.deepEqual(await client.mGet(['x', 'y']), ['1', '2'], 'first mGet'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 3); + let [i] = await p; + + assert.equal(i, 'x'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '2'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 2, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + + p = once(csc, 'invalidate'); + await client.set("y", 4); + [i] = await p; + + assert.equal(i, 'y'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '4'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + }); + + describe('Cluster Caching', () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithCluster('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + await client.set("y", 1); + await client.get("y"); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(4, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3'); + + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + await client.set("y", 2); + assert.equal(await client.get("y"), '2'); + await client.set("y", 3); + assert.equal(await client.get("y"), '3'); + + assert.equal(6, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + }); +}); \ No newline at end of file diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts new file mode 100644 index 00000000000..3a8fd951b40 --- /dev/null +++ b/packages/client/lib/client/cache.ts @@ -0,0 +1,533 @@ +import { EventEmitter } from 'stream'; +import RedisClient, { RedisClientType } from '.'; +import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types'; +import { BasicCommandParser } from './parser'; + +type CachingClient = RedisClient; +type CachingClientType = RedisClientType; +type CmdFunc = () => Promise; + +export interface ClientSideCacheConfig { + ttl?: number; + maxEntries?: number; + lru?: boolean; +} + +type CacheCreator = { + epoch: number; + client: CachingClient; +}; + +interface ClientSideCacheEntry { + invalidate(): void; + validate(): boolean; +} + +abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { + #invalidated = false; + readonly #expireTime: number; + + constructor(ttl: number) { + if (ttl == 0) { + this.#expireTime = 0; + } else { + this.#expireTime = Date.now() + ttl; + } + } + + invalidate(): void { + this.#invalidated = true; + } + + validate(): boolean { + return !this.#invalidated && (this.#expireTime == 0 || (Date.now() < this.#expireTime)) + } +} + +export class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { + readonly #value: any; + + get value() { + return this.#value; + } + + constructor(ttl: number, value: any) { + super(ttl); + this.#value = value; + } +} + +export class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { + readonly #sendCommandPromise: Promise; + + get promise() { + return this.#sendCommandPromise; + } + + constructor(ttl: number, sendCommandPromise: Promise) { + super(ttl); + this.#sendCommandPromise = sendCommandPromise; + } +} + +export abstract class ClientSideCacheProvider extends EventEmitter { + abstract handleCache(client: CachingClient, parser: BasicCommandParser, fn: CmdFunc, transformReply: TransformReply | undefined, typeMapping: TypeMapping | undefined): Promise; + abstract trackingOn(): Array; + abstract invalidate(key: RedisArgument | null): void; + abstract clear(): void; + abstract cacheHits(): number; + abstract cacheMisses(): number; + abstract onError(): void; + abstract onClose(): void; +} + +export class BasicClientSideCache extends ClientSideCacheProvider { + #cacheKeyToEntryMap: Map; + #keyToCacheKeySetMap: Map>; + readonly ttl: number; + readonly #maxEntries: number; + readonly #lru: boolean; + #cacheHits = 0; + #cacheMisses = 0; + + constructor(config?: ClientSideCacheConfig) { + super(); + + this.#cacheKeyToEntryMap = new Map(); + this.#keyToCacheKeySetMap = new Map>(); + this.ttl = config?.ttl ?? 0; + this.#maxEntries = config?.maxEntries ?? 0; + this.#lru = config?.lru ?? true; + } + + /* logic of how caching works: + + 1. commands use a CommandParser + it enables us to define/retrieve + cacheKey - a unique key that corresponds to this command and its arguments + redisKeys - an array of redis keys as strings that if the key is modified, will cause redis to invalidate this result when cached + 2. check if cacheKey is in our cache + 2b1. if its a value cacheEntry - return it + 2b2. if it's a promise cache entry - wait on promise and then go to 3c. + 3. if cacheEntry is not in cache + 3a. send the command save the promise into a a cacheEntry and then wait on result + 3b. transform reply (if required) based on transformReply + 3b. check the cacheEntry is still valid - in cache and hasn't been deleted) + 3c. if valid - overwrite with value entry + 4. return previously non cached result + */ + override async handleCache( + client: CachingClient, + parser: BasicCommandParser, + fn: CmdFunc, + transformReply: TransformReply | undefined, + typeMapping: TypeMapping | undefined + ) { + let reply: ReplyUnion; + + const cacheKey = parser.cacheKey; + + // "2" + let cacheEntry = this.get(cacheKey); + if (cacheEntry) { + // If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters. + if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1" + this.#cacheHit(); + + return structuredClone(cacheEntry.value); + } else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2 + // unsure if this should be considered a cache hit, a miss, or neither? + reply = await cacheEntry.promise; + } else { + throw new Error("unknown cache entry type"); + } + } else { // 3/3a + this.#cacheMiss(); + + const promise = fn(); + + cacheEntry = this.createPromiseEntry(client, promise); + this.set(cacheKey, cacheEntry, parser.keys); + + try { + reply = await promise; + } catch (err) { + if (cacheEntry.validate()) { // on error, have to remove promise from cache + this.delete(cacheKey!); + } + throw err; + } + } + + // 3b + let val; + if (transformReply) { + val = transformReply(reply, parser.preserve, typeMapping); + } else { + val = reply; + } + + // 3c + if (cacheEntry.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated) + // 3d + cacheEntry = this.createValueEntry(client, val); + this.set(cacheKey, cacheEntry, parser.keys); + this.emit("cached-key", cacheKey); + } else { +// console.log("cache entry for key got invalidated between execution and saving, so not saving"); + } + + return structuredClone(val); + } + + override trackingOn() { + return ['CLIENT', 'TRACKING', 'ON']; + } + + override invalidate(key: RedisArgument | null) { + if (key === null) { + this.clear(false); + this.emit("invalidate", key); + + return; + } + + const keySet = this.#keyToCacheKeySetMap.get(key.toString()); + if (keySet) { + for (const cacheKey of keySet) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + } + this.#cacheKeyToEntryMap.delete(cacheKey); + } + this.#keyToCacheKeySetMap.delete(key.toString()); + } + + this.emit('invalidate', key); + } + + override clear(reset = true) { + this.#cacheKeyToEntryMap.clear(); + this.#keyToCacheKeySetMap.clear(); + if (reset) { + this.#cacheHits = 0; + this.#cacheMisses = 0; + } + } + + get(cacheKey?: string | undefined) { + if (cacheKey === undefined) { + return undefined + } + + const val = this.#cacheKeyToEntryMap.get(cacheKey); + + if (val && !val.validate()) { + this.delete(cacheKey); + this.emit("invalidate", cacheKey); + + return undefined; + } + + if (val !== undefined && this.#lru) { + this.#cacheKeyToEntryMap.delete(cacheKey); + this.#cacheKeyToEntryMap.set(cacheKey, val); + } + + return val; + } + + delete(cacheKey: string) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + this.#cacheKeyToEntryMap.delete(cacheKey); + } + } + + has(cacheKey: string) { + return this.#cacheKeyToEntryMap.has(cacheKey); + } + + set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array) { + let count = this.#cacheKeyToEntryMap.size; + const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey); + if (oldEntry) { + count--; // overwriting, so not incrementig + oldEntry.invalidate(); + } + + if (this.#maxEntries > 0 && count >= this.#maxEntries) { + this.deleteOldest(); + } + + this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry); + + for (const key of keys) { + if (!this.#keyToCacheKeySetMap.has(key.toString())) { + this.#keyToCacheKeySetMap.set(key.toString(), new Set()); + } + + const cacheKeySet = this.#keyToCacheKeySetMap.get(key.toString()); + cacheKeySet!.add(cacheKey); + } + } + + size() { + return this.#cacheKeyToEntryMap.size; + } + + createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + return new ClientSideCacheEntryValue(this.ttl, value); + } + + createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + return new ClientSideCacheEntryPromise(this.ttl, sendCommandPromise); + } + + #cacheHit(): void { + this.#cacheHits++; + } + + #cacheMiss(): void { + this.#cacheMisses++; + } + + override cacheHits(): number { + return this.#cacheHits; + } + + override cacheMisses(): number { + return this.#cacheMisses; + } + + override onError(): void { + this.clear(); + } + + override onClose() { + this.clear(); + } + + /** + * @internal + */ + deleteOldest() { + const it = this.#cacheKeyToEntryMap[Symbol.iterator](); + const n = it.next(); + if (!n.done) { + this.#cacheKeyToEntryMap.delete(n.value[0]); + } + } + + /** + * @internal + */ + entryEntries() { + return this.#cacheKeyToEntryMap.entries(); + } + + /** + * @internal + */ + keySetEntries() { + return this.#keyToCacheKeySetMap.entries(); + } +} + +export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { + #disabled = false; + + abstract updateRedirect(id: number): void; + abstract addClient(client: CachingClientType): void; + abstract removeClient(client: CachingClientType): void; + + disable() { + this.#disabled = true; + } + + enable() { + this.#disabled = false; + } + + override get(cacheKey: string) { + if (this.#disabled) { + return undefined; + } + + return super.get(cacheKey); + } + + override has(cacheKey: string) { + if (this.#disabled) { + return false; + } + + return super.has(cacheKey); + } + + onPoolConnect(factory: () => CachingClientType) {}; + + onPoolClose() { + this.clear(); + }; +} + +// doesn't do anything special in pooling, clears cache on every client disconnect +export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { + + override updateRedirect(id: number): void { + return; + } + + override addClient(client: CachingClientType): void { + return; + } + override removeClient(client: CachingClientType): void { + return; + } + + override onError() { + this.clear(false); + } + + override onClose() { + this.clear(false); + } +} + +class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, value: any) { + super(ttl, value); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, sendCommandPromise: Promise) { + super(ttl, sendCommandPromise); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +// Doesn't clear cache on client disconnect, validates entries on retrieval +export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache { + override createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryValue(this.ttl, creator, value); + } + + override createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise); + } + + // don't clear cache on error here + override onError() {} + + override onClose() {} +} + +// Only clears cache on "management"/"redirect" client disconnect +export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider { + #id?: number; + #clients: Set = new Set(); + #redirectClient?: CachingClientType; + + constructor(config: ClientSideCacheConfig) { + super(config); + this.disable(); + } + + override trackingOn(): string[] { + if (this.#id) { + return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()]; + } else { + return []; + } + } + + override updateRedirect(id: number) { + this.#id = id; + for (const client of this.#clients) { + client.sendCommand(this.trackingOn()).catch(() => {}); + } + } + + override addClient(client: CachingClientType) { + this.#clients.add(client); + } + + override removeClient(client: CachingClientType) { + this.#clients.delete(client); + } + + override onError(): void {}; + + override async onPoolConnect(factory: () => CachingClientType) { + const client = factory(); + this.#redirectClient = client; + + client.on("error", () => { + this.disable(); + this.clear(); + }).on("ready", async () => { + const clientId = await client.withTypeMapping({}).clientId(); + this.updateRedirect(clientId); + this.enable(); + }) + + try { + await client.connect(); + } catch (err) { + throw err; + } + } + + override onClose() {}; + + override onPoolClose() { + super.onPoolClose(); + + if (this.#redirectClient) { + this.#id = undefined; + const client = this.#redirectClient; + this.#redirectClient = undefined; + + return client.close(); + } + } +} From 0483e7ef07c660068fad7408c525af4a259f1faf Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 5 Nov 2024 11:48:50 +0200 Subject: [PATCH 3/9] address review comments --- packages/client/lib/client/cache.ts | 107 +++---------------- packages/client/lib/client/commands-queue.ts | 1 + packages/client/lib/client/index.ts | 14 +-- packages/client/lib/client/parser.ts | 10 +- packages/client/lib/client/pool.ts | 22 ++-- packages/client/lib/cluster/index.ts | 4 + packages/client/lib/sentinel/index.ts | 11 +- 7 files changed, 47 insertions(+), 122 deletions(-) diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts index 3a8fd951b40..03eee829b6c 100644 --- a/packages/client/lib/client/cache.ts +++ b/packages/client/lib/client/cache.ts @@ -1,10 +1,9 @@ import { EventEmitter } from 'stream'; -import RedisClient, { RedisClientType } from '.'; +import RedisClient from '.'; import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types'; import { BasicCommandParser } from './parser'; type CachingClient = RedisClient; -type CachingClientType = RedisClientType; type CmdFunc = () => Promise; export interface ClientSideCacheConfig { @@ -23,6 +22,17 @@ interface ClientSideCacheEntry { validate(): boolean; } +function generateCacheKey(redisArgs: ReadonlyArray): string { + const tmp = new Array(redisArgs.length*2); + + for (let i = 0; i < redisArgs.length; i++) { + tmp[i] = redisArgs[i].length; + tmp[i+redisArgs.length] = redisArgs[i]; + } + + return tmp.join('_'); +} + abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { #invalidated = false; readonly #expireTime: number; @@ -125,7 +135,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { ) { let reply: ReplyUnion; - const cacheKey = parser.cacheKey; + const cacheKey = generateCacheKey(parser.redisArgs); // "2" let cacheEntry = this.get(cacheKey); @@ -339,10 +349,6 @@ export class BasicClientSideCache extends ClientSideCacheProvider { export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { #disabled = false; - abstract updateRedirect(id: number): void; - abstract addClient(client: CachingClientType): void; - abstract removeClient(client: CachingClientType): void; - disable() { this.#disabled = true; } @@ -367,8 +373,6 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache return super.has(cacheKey); } - onPoolConnect(factory: () => CachingClientType) {}; - onPoolClose() { this.clear(); }; @@ -376,18 +380,6 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache // doesn't do anything special in pooling, clears cache on every client disconnect export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { - - override updateRedirect(id: number): void { - return; - } - - override addClient(client: CachingClientType): void { - return; - } - override removeClient(client: CachingClientType): void { - return; - } - override onError() { this.clear(false); } @@ -459,75 +451,4 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache override onError() {} override onClose() {} -} - -// Only clears cache on "management"/"redirect" client disconnect -export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider { - #id?: number; - #clients: Set = new Set(); - #redirectClient?: CachingClientType; - - constructor(config: ClientSideCacheConfig) { - super(config); - this.disable(); - } - - override trackingOn(): string[] { - if (this.#id) { - return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()]; - } else { - return []; - } - } - - override updateRedirect(id: number) { - this.#id = id; - for (const client of this.#clients) { - client.sendCommand(this.trackingOn()).catch(() => {}); - } - } - - override addClient(client: CachingClientType) { - this.#clients.add(client); - } - - override removeClient(client: CachingClientType) { - this.#clients.delete(client); - } - - override onError(): void {}; - - override async onPoolConnect(factory: () => CachingClientType) { - const client = factory(); - this.#redirectClient = client; - - client.on("error", () => { - this.disable(); - this.clear(); - }).on("ready", async () => { - const clientId = await client.withTypeMapping({}).clientId(); - this.updateRedirect(clientId); - this.enable(); - }) - - try { - await client.connect(); - } catch (err) { - throw err; - } - } - - override onClose() {}; - - override onPoolClose() { - super.onPoolClose(); - - if (this.#redirectClient) { - this.#id = undefined; - const client = this.#redirectClient; - this.#redirectClient = undefined; - - return client.close(); - } - } -} +} \ No newline at end of file diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 0199aa92c11..0af81135065 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -111,6 +111,7 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { + // currently only supporting "invalidate" over RESP3 push messages switch (push[0].toString()) { case "invalidate": { if (this.#invalidateCallback) { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 331a37dc9e1..f34c12b2ab7 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -305,9 +305,13 @@ export default class RedisClient< private _self = this; private _commandOptions?: CommandOptions; #dirtyWatch?: string; - #watchEpoch?: number; + #watchEpoch?: number; #clientSideCache?: ClientSideCacheProvider; #credentialsSubscription: Disposable | null = null; + get clientSideCache() { + return this._self.#clientSideCache; + } + get options(): RedisClientOptions | undefined { return this._self.#options; @@ -329,7 +333,6 @@ export default class RedisClient< return this._self.#socket.socketEpoch; } - get isWatching() { return this._self.#watchEpoch !== undefined; } @@ -515,10 +518,7 @@ export default class RedisClient< } if (this.#clientSideCache) { - const tracking = this.#clientSideCache.trackingOn(); - if (tracking) { - commands.push(tracking); - } + commands.push(this.#clientSideCache.trackingOn()); } return commands; @@ -956,7 +956,7 @@ export default class RedisClient< } const chainId = Symbol('Pipeline Chain'), - promise = Promise.allSettled( + promise = Promise.all( commands.map(({ args }) => this._self.#queue.addCommand(args, { chainId, typeMapping: this._commandOptions?.typeMapping diff --git a/packages/client/lib/client/parser.ts b/packages/client/lib/client/parser.ts index 76251ea67a7..3e820230429 100644 --- a/packages/client/lib/client/parser.ts +++ b/packages/client/lib/client/parser.ts @@ -34,8 +34,14 @@ export class BasicCommandParser implements CommandParser { } get cacheKey() { - let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_'); - return cacheKey + '_' + this.#redisArgs.join('_'); + const tmp = new Array(this.#redisArgs.length*2); + + for (let i = 0; i < this.#redisArgs.length; i++) { + tmp[i] = this.#redisArgs[i].length; + tmp[i+this.#redisArgs.length] = this.#redisArgs[i]; + } + + return tmp.join('_'); } push(...arg: Array) { diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 400268f4bb5..07553775ec5 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,7 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; -import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache'; import { BasicCommandParser } from './parser'; export interface RedisPoolOptions { @@ -215,6 +215,9 @@ export class RedisClientPool< } #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this._self.#clientSideCache; + } /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, @@ -241,8 +244,7 @@ export class RedisClientPool< } else { const cscConfig = options.clientSideCache; this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); - this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); - this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); +// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); } } @@ -312,13 +314,6 @@ export class RedisClientPool< if (this._self.#isOpen) return; // TODO: throw error? this._self.#isOpen = true; - try { - this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory); - } catch (err) { - this.destroy(); - throw err; - } - const promises = []; while (promises.length < this._self.#options.minimum) { promises.push(this._self.#create()); @@ -334,7 +329,7 @@ export class RedisClientPool< return this as unknown as RedisClientPoolType; } - async #create(redirect?: boolean) { + async #create() { const node = this._self.#clientsInUse.push( this._self.#clientFactory() .on('error', (err: Error) => this.emit('error', err)) @@ -342,10 +337,6 @@ export class RedisClientPool< try { const client = node.value; - if (this._self.#clientSideCache) { - this._self.#clientSideCache.addClient(node.value); - } - await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); @@ -436,7 +427,6 @@ export class RedisClientPool< for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop const client = this.#idleClients.shift()! - this.#clientSideCache?.removeClient(client); client.destroy(); } } diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index cfe67193989..fdb13df35fa 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -270,6 +270,10 @@ export default class RedisCluster< return this._self.#slots.slots; } + get clientSideCache() { + return this._self.#slots.clientSideCache; + } + /** * An array of the cluster masters. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 3e358765583..3e322dd22e6 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -16,7 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers'; import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; -import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface ClientInfo { id: number; @@ -266,6 +266,10 @@ export default class RedisSentinel< #masterClientCount = 0; #masterClientInfo?: ClientInfo; + get clientSideCache() { + return this._self.#internal.clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -558,7 +562,7 @@ class RedisSentinelInternal< readonly #name: string; readonly #nodeClientOptions: RedisClientOptions; - readonly #sentinelClientOptions: RedisClientOptions; + readonly #sentinelClientOptions: RedisClientOptions; readonly #scanInterval: number; readonly #passthroughClientErrorEvents: boolean; @@ -619,8 +623,7 @@ class RedisSentinelInternal< } else { const cscConfig = options.clientSideCache; this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); - this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); - this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); +// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); } } From faa403c9e985692bca68510d84c14a3571375974 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 5 Nov 2024 12:05:58 +0200 Subject: [PATCH 4/9] nits to try and fix github --- packages/client/lib/client/cache.spec.ts | 2 +- packages/client/lib/client/cache.ts | 16 ++++++++-------- packages/client/lib/cluster/index.ts | 4 ++-- packages/client/lib/sentinel/index.ts | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/client/lib/client/cache.spec.ts b/packages/client/lib/client/cache.spec.ts index ee08d3a69f4..258104ed4c4 100644 --- a/packages/client/lib/client/cache.spec.ts +++ b/packages/client/lib/client/cache.spec.ts @@ -227,7 +227,7 @@ describe("Client Side Cache", () => { }); }); - describe.only("Pooled Cache", () => { + describe("Pooled Cache", () => { const csc = new BasicPooledClientSideCache(); testUtils.testWithClient('Virtual Pool Disconnect', async client1 => { diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts index 03eee829b6c..73e08587b1b 100644 --- a/packages/client/lib/client/cache.ts +++ b/packages/client/lib/client/cache.ts @@ -54,7 +54,7 @@ abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { } } -export class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { +class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { readonly #value: any; get value() { @@ -67,7 +67,7 @@ export class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { } } -export class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { +class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { readonly #sendCommandPromise: Promise; get promise() { @@ -95,8 +95,8 @@ export class BasicClientSideCache extends ClientSideCacheProvider { #cacheKeyToEntryMap: Map; #keyToCacheKeySetMap: Map>; readonly ttl: number; - readonly #maxEntries: number; - readonly #lru: boolean; + readonly maxEntries: number; + readonly lru: boolean; #cacheHits = 0; #cacheMisses = 0; @@ -106,8 +106,8 @@ export class BasicClientSideCache extends ClientSideCacheProvider { this.#cacheKeyToEntryMap = new Map(); this.#keyToCacheKeySetMap = new Map>(); this.ttl = config?.ttl ?? 0; - this.#maxEntries = config?.maxEntries ?? 0; - this.#lru = config?.lru ?? true; + this.maxEntries = config?.maxEntries ?? 0; + this.lru = config?.lru ?? true; } /* logic of how caching works: @@ -240,7 +240,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { return undefined; } - if (val !== undefined && this.#lru) { + if (val !== undefined && this.lru) { this.#cacheKeyToEntryMap.delete(cacheKey); this.#cacheKeyToEntryMap.set(cacheKey, val); } @@ -268,7 +268,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { oldEntry.invalidate(); } - if (this.#maxEntries > 0 && count >= this.#maxEntries) { + if (this.maxEntries > 0 && count >= this.maxEntries) { this.deleteOldest(); } diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index fdb13df35fa..ae1cec0f08a 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -599,12 +599,12 @@ export default class RedisCluster< } close() { - this.#slots.clientSideCache?.onPoolClose(); + this._self.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.close(); } destroy() { - this.#slots.clientSideCache?.onPoolClose(); + this._self.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.destroy(); } diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 3e322dd22e6..50b8a2b06b3 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -612,7 +612,7 @@ class RedisSentinelInternal< this.#scanInterval = options.scanInterval ?? 0; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; - this.#nodeClientOptions = options.nodeClientOptions ? {...options.nodeClientOptions} : {}; + this.#nodeClientOptions = (options.nodeClientOptions ? {...options.nodeClientOptions} : {}) as RedisClientOptions; if (this.#nodeClientOptions.url !== undefined) { throw new Error("invalid nodeClientOptions for Sentinel"); } From 286b6efecaad524f4565313df77c75ca4d605ddc Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 7 Nov 2024 10:55:14 +0200 Subject: [PATCH 5/9] last change from review --- packages/client/lib/client/cache.ts | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts index 73e08587b1b..f17743ff071 100644 --- a/packages/client/lib/client/cache.ts +++ b/packages/client/lib/client/cache.ts @@ -6,10 +6,12 @@ import { BasicCommandParser } from './parser'; type CachingClient = RedisClient; type CmdFunc = () => Promise; +type EvictionPolicy = "LRU" | "FIFO" + export interface ClientSideCacheConfig { ttl?: number; maxEntries?: number; - lru?: boolean; + evictPolocy?: EvictionPolicy; } type CacheCreator = { @@ -107,7 +109,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { this.#keyToCacheKeySetMap = new Map>(); this.ttl = config?.ttl ?? 0; this.maxEntries = config?.maxEntries ?? 0; - this.lru = config?.lru ?? true; + this.lru = config?.evictPolocy !== "FIFO" } /* logic of how caching works: @@ -165,6 +167,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { if (cacheEntry.validate()) { // on error, have to remove promise from cache this.delete(cacheKey!); } + throw err; } } @@ -217,25 +220,21 @@ export class BasicClientSideCache extends ClientSideCacheProvider { this.emit('invalidate', key); } - override clear(reset = true) { + override clear(resetStats = true) { this.#cacheKeyToEntryMap.clear(); this.#keyToCacheKeySetMap.clear(); - if (reset) { + if (resetStats) { this.#cacheHits = 0; this.#cacheMisses = 0; } } - get(cacheKey?: string | undefined) { - if (cacheKey === undefined) { - return undefined - } - + get(cacheKey: string) { const val = this.#cacheKeyToEntryMap.get(cacheKey); if (val && !val.validate()) { this.delete(cacheKey); - this.emit("invalidate", cacheKey); + this.emit("cache-evict", cacheKey); return undefined; } @@ -263,6 +262,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array) { let count = this.#cacheKeyToEntryMap.size; const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey); + if (oldEntry) { count--; // overwriting, so not incrementig oldEntry.invalidate(); @@ -419,11 +419,8 @@ class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { override validate(): boolean { let ret = super.validate(); - if (this.#creator) { - ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch - } - - return ret; + + return ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch } } From 3dd810b988e89fd2bbf2791b00bae1057427fba0 Mon Sep 17 00:00:00 2001 From: borislav ivanov Date: Mon, 17 Mar 2025 11:41:18 +0200 Subject: [PATCH 6/9] docs: RESP3 and client-side caching documentation --- docs/v4-to-v5.md | 2 + docs/v5.md | 165 ++++++++++++++++++++++++-- packages/client/lib/client/index.ts | 15 ++- packages/client/lib/sentinel/types.ts | 18 ++- 4 files changed, 187 insertions(+), 13 deletions(-) diff --git a/docs/v4-to-v5.md b/docs/v4-to-v5.md index 95c2230ce23..f1e87840220 100644 --- a/docs/v4-to-v5.md +++ b/docs/v4-to-v5.md @@ -225,6 +225,8 @@ In v5, any unwritten commands (in the same pipeline) will be discarded. - `FT.SUGDEL`: [^boolean-to-number] - `FT.CURSOR READ`: `cursor` type changed from `number` to `string` (in and out) to avoid issues when the number is bigger than `Number.MAX_SAFE_INTEGER`. See [here](https://github.com/redis/node-redis/issues/2561). +- `FT.PROFILE`: `profile` type is now `ReplyUnion`, which preserves the server's original response format. This change helps prevent issues with future updates to the debug response structure. +- `FT.SUGGET`: Since Redis 8, the server returns `[]` instead of `null` when there are no suggestions ### Time Series diff --git a/docs/v5.md b/docs/v5.md index 4a1bd817b9b..01260351c9b 100644 --- a/docs/v5.md +++ b/docs/v5.md @@ -1,6 +1,10 @@ # RESP3 Support -TODO +Redis Serialization Protocol version 3 (RESP3) is the newer protocol used for communication between Redis servers and clients. It offers more data types and richer semantics compared to RESP2. + +## Enabling RESP3 + +To use RESP3, simply set the `RESP` option to `3` when creating a client: ```javascript const client = createClient({ @@ -8,19 +12,160 @@ const client = createClient({ }); ``` -```javascript -// by default -await client.hGetAll('key'); // Record +## Type Mapping -await client.withTypeMapping({ - [TYPES.MAP]: Map -}).hGetAll('key'); // Map +Some [RESP types](./RESP.md) can be mapped to more than one JavaScript type. For example, "Blob String" can be mapped to `string` or `Buffer`. You can override the default type mapping using the `withTypeMapping` function: + +```javascript +await client.get('key'); // `string | null` -await client.withTypeMapping({ - [TYPES.MAP]: Map, +const proxyClient = client.withTypeMapping({ [TYPES.BLOB_STRING]: Buffer -}).hGetAll('key'); // Map +}); + +await proxyClient.get('key'); // `Buffer | null` +``` + +## Unstable RESP3 Support + +Some Redis modules (particularly the Search module) have responses that might change in future RESP3 implementations. These commands are marked with `unstableResp3: true` in the codebase. + +To use these commands with RESP3, you must explicitly enable unstable RESP3 support: + +```javascript +const client = createClient({ + RESP: 3, + unstableResp3: true +}); +``` + +If you attempt to use these commands with RESP3 without enabling the `unstableResp3` flag, the client will throw an error with a message like: + ``` +Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance +``` + +### Commands Using Unstable RESP3 + +The following Redis commands and modules use the `unstableResp3` flag: +- Many Search module commands (FT.*) +- Stream commands like XREAD, XREADGROUP +- Other modules with complex response structures + +If you're working with these commands and want to use RESP3, make sure to enable unstable RESP3 support in your client configuration. + +# Client-Side Caching + +Redis 6.0 introduced client-side caching, which allows clients to locally cache command results and receive invalidation notifications from the server. This significantly reduces network roundtrips and latency for frequently accessed data. + +### How It Works + +1. When a cacheable command is executed, the client checks if the result is already in the cache +2. If found and valid, it returns the cached result (no Redis server roundtrip) +3. If not found, it executes the command and caches the result +4. When Redis modifies keys, it sends invalidation messages to clients +5. The client removes the invalidated entries from its cache + +This mechanism ensures data consistency while providing significant performance benefits for read-heavy workloads. + +## Requirements + +Client-side caching in node-redis: +- Requires RESP3 protocol (`RESP: 3` in client configuration) +- Uses Redis server's invalidation mechanism to keep the cache in sync +- Is completely disabled when using RESP2 + +## Limitations of Client-Side Caching + +Currently, node-redis implements client-side caching only in "default" tracking mode. The implementation does not yet support the following Redis client-side caching modes: + +- **Opt-In Mode**: Where clients explicitly indicate which specific keys they want to cache using the `CLIENT CACHING YES` command before each cacheable command. + +- **Opt-Out Mode**: Where all keys are cached by default, and clients specify exceptions for keys they don't want to cache with `CLIENT UNTRACKING`. + +- **Broadcasting Mode**: Where clients subscribe to invalidation messages for specific key prefixes without the server tracking individual client-key relationships. + +These advanced caching modes offer more fine-grained control over caching behavior and may be supported in future node-redis releases. While node-redis doesn't implement these modes natively yet, the underlying Redis commands (`CLIENT TRACKING`, `CLIENT CACHING`, `CLIENT UNTRACKING`) are available if you need to implement these advanced tracking modes yourself. + + +## Basic Configuration + +To enable client-side caching with default settings: + +```javascript +const client = createClient({ + RESP: 3, + clientSideCache: { + // Cache configuration options + maxEntries: 1000, // Maximum number of entries in the cache (0 = unlimited) + ttl: 60000, // Time-to-live in milliseconds (0 = never expire) + evictPolicy: "LRU" // Eviction policy (LRU or FIFO) + } +}); +``` + +### Cache Control + +You can also create and control the cache instance directly: + +```javascript +// Import the cache provider +const { BasicClientSideCache } = require('redis'); + +// Create a configurable cache instance +const cache = new BasicClientSideCache({ + maxEntries: 5000, + ttl: 30000 +}); + +// Create client with this cache +const client = createClient({ + RESP: 3, + clientSideCache: cache +}); + +// Later you can: +// Get cache statistics +const hits = cache.cacheHits(); +const misses = cache.cacheMisses(); + +// Manually invalidate specific keys +cache.invalidate('my-key'); + +// Clear the entire cache +cache.clear(); + +// Listen for cache events +cache.on('invalidate', (key) => { + console.log(`Cache key invalidated: ${key}`); +}); +``` + +### Working with Connection Pools + +Client-side caching also works with connection pools: + +```javascript +const pool = createClientPool({ + RESP: 3 +}, { + clientSideCache: { + maxEntries: 10000, + ttl: 60000 + }, + minimum: 5 +}); +``` + +For pools, you can use specialized cache providers like `BasicPooledClientSideCache` or `PooledNoRedirectClientSideCache` that handle connection events appropriately. + +### Performance Considerations + +- Configure appropriate `maxEntries` and `ttl` values based on your application needs +- Monitor cache hit/miss rates to optimize settings +- Consider memory usage on the client side when using large caches +- Client-side caching works best for frequently accessed, relatively static data + # Sentinel Support diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index f34c12b2ab7..17a77466f48 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -82,7 +82,20 @@ export interface RedisClientOptions< */ commandOptions?: CommandOptions; /** - * TODO + * Client-side Caching configuration + * + * Enables client-side caching functionality for the client to reduce network + * round-trips and improve performance for frequently accessed data. + * + * You can either: + * 1. Provide an instance that implements the `ClientSideCacheProvider` abstract class + * for complete control over cache behavior, or + * 2. Provide a configuration object (`ClientSideCacheConfig`) to customize the + * built-in cache implementation with your preferred settings + * + * + * @see {@link ClientSideCacheProvider} - Abstract class for implementing custom cache providers + * @see {@link ClientSideCacheConfig} - Configuration options for the built-in cache implementation */ clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; } diff --git a/packages/client/lib/sentinel/types.ts b/packages/client/lib/sentinel/types.ts index da1e1c81f7e..8da604c78df 100644 --- a/packages/client/lib/sentinel/types.ts +++ b/packages/client/lib/sentinel/types.ts @@ -62,10 +62,24 @@ export interface RedisSentinelOptions< * When `false`, the sentinel object will wait for the first available client from the pool. */ reserveClient?: boolean; + /** - * TODO + * Client-side Caching configuration + * + * Enables client-side caching functionality for the client to reduce network + * round-trips and improve performance for frequently accessed data. + * + * You can either: + * 1. Provide an instance that implements the `PooledClientSideCacheProvider` abstract class + * for complete control over cache behavior, or + * 2. Provide a configuration object (`ClientSideCacheConfig`) to customize the + * built-in cache implementation with your preferred settings + * + * + * @see {@link PooledClientSideCacheProvider} - Abstract class for implementing custom pooled cache providers + * @see {@link ClientSideCacheConfig} - Configuration options for the built-in cache implementation */ - clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } export interface SentinelCommander< From c53c68212c460b509af4ad640b14c0d81a6616a1 Mon Sep 17 00:00:00 2001 From: borislav ivanov Date: Mon, 17 Mar 2025 13:12:54 +0200 Subject: [PATCH 7/9] fix(test): error handling during pool destruction Add try-catch to handle "client is closed" error during pool destruction, with a warning message and TODO to investigate race condition --- packages/test-utils/lib/index.ts | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index b48f11b02c7..7f5df83a8e3 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -84,7 +84,7 @@ interface ClusterTestOptions< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping - // POLICIES extends CommandPolicies +// POLICIES extends CommandPolicies > extends CommonTestOptions { clusterConfiguration?: Partial>; numberOfMasters?: number; @@ -97,7 +97,7 @@ interface AllTestOptions< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping - // POLICIES extends CommandPolicies +// POLICIES extends CommandPolicies > { client: ClientTestOptions; cluster: ClusterTestOptions; @@ -335,7 +335,17 @@ export default class TestUtils { await fn(pool); } finally { await pool.flushAll(); - pool.destroy(); + try { + pool.destroy(); + } catch (destroyError) { + if (destroyError instanceof Error && + destroyError.message === 'The client is closed') { + //TODO figure out where this race condition between destroy and client close is happening + console.warn('Ignoring "client is closed" error during pool destruction'); + } else { + throw destroyError; + } + } } }); } @@ -346,7 +356,7 @@ export default class TestUtils { S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping - // POLICIES extends CommandPolicies + // POLICIES extends CommandPolicies >(cluster: RedisClusterType): Promise { return Promise.all( cluster.masters.map(async master => { @@ -363,7 +373,7 @@ export default class TestUtils { S extends RedisScripts = {}, RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} - // POLICIES extends CommandPolicies = {} + // POLICIES extends CommandPolicies = {} >( title: string, fn: (cluster: RedisClusterType) => unknown, @@ -387,7 +397,7 @@ export default class TestUtils { it(title, async function () { if (!dockersPromise) return this.skip(); - + const dockers = await dockersPromise, cluster = createCluster({ rootNodes: dockers.map(({ port }) => ({ @@ -417,7 +427,7 @@ export default class TestUtils { S extends RedisScripts = {}, RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} - // POLICIES extends CommandPolicies = {} + // POLICIES extends CommandPolicies = {} >( title: string, fn: (client: RedisClientType | RedisClusterType) => unknown, From bd8d1eea2af5f3c2b5cb17642a339c8110f62c05 Mon Sep 17 00:00:00 2001 From: borislav ivanov Date: Mon, 17 Mar 2025 14:09:01 +0200 Subject: [PATCH 8/9] tests: property-based tests for client side cache key generation --- package-lock.json | 212 +++++++++++++++++++++++ package.json | 3 +- packages/client/lib/client/cache.spec.ts | 153 +++++++++++++++- packages/client/lib/client/cache.ts | 26 +-- packages/test-utils/package.json | 3 +- 5 files changed, 381 insertions(+), 16 deletions(-) diff --git a/package-lock.json b/package-lock.json index 25a1dc9d51c..df8a2afd5ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "mocha": "^10.2.0", "nyc": "^15.1.0", "release-it": "^17.0.3", + "ts-node": "^10.9.2", "tsx": "^4.7.0", "typedoc": "^0.25.7", "typescript": "^5.3.3" @@ -664,6 +665,30 @@ "node": ">=6.9.0" } }, + "node_modules/@cspotcode/source-map-support": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", + "integrity": "sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@jridgewell/trace-mapping": "0.3.9" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/@cspotcode/source-map-support/node_modules/@jridgewell/trace-mapping": { + "version": "0.3.9", + "resolved": "https://registry.npmjs.org/@jridgewell/trace-mapping/-/trace-mapping-0.3.9.tgz", + "integrity": "sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@jridgewell/resolve-uri": "^3.0.3", + "@jridgewell/sourcemap-codec": "^1.4.10" + } + }, "node_modules/@esbuild/linux-x64": { "version": "0.19.12", "cpu": [ @@ -1164,6 +1189,34 @@ "dev": true, "license": "MIT" }, + "node_modules/@tsconfig/node10": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.11.tgz", + "integrity": "sha512-DcRjDCujK/kCk/cUe8Xz8ZSpm8mS3mNNpta+jGCA6USEDfktlNvm1+IuZ9eTcDbNk41BHwpHHeW+N1lKCz4zOw==", + "dev": true, + "license": "MIT" + }, + "node_modules/@tsconfig/node12": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.11.tgz", + "integrity": "sha512-cqefuRsh12pWyGsIoBKJA9luFu3mRxCA+ORZvA4ktLSzIuCUtWVxGIuXigEwO5/ywWFMZ2QEGKWvkZG1zDMTag==", + "dev": true, + "license": "MIT" + }, + "node_modules/@tsconfig/node14": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.3.tgz", + "integrity": "sha512-ysT8mhdixWK6Hw3i1V2AeRqZ5WfXg1G43mqoYlM2nc6388Fq5jcXyr5mRsqViLx/GJYdoL0bfXD8nmF+Zn/Iow==", + "dev": true, + "license": "MIT" + }, + "node_modules/@tsconfig/node16": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.4.tgz", + "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/body-parser": { "version": "1.19.5", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.5.tgz", @@ -1330,6 +1383,32 @@ "node": ">= 0.6" } }, + "node_modules/acorn": { + "version": "8.14.1", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.14.1.tgz", + "integrity": "sha512-OvQ/2pUDKmgfCg++xsTX1wGxfTaszcHVcTctW4UJB4hibJx2HXxxO5UmVgyjMa+ZDsiaf5wWLXYpRWMmBI0QHg==", + "dev": true, + "license": "MIT", + "bin": { + "acorn": "bin/acorn" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/acorn-walk": { + "version": "8.3.4", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.4.tgz", + "integrity": "sha512-ueEepnujpqee2o5aIYnvHU6C0A42MNdsIDeqy5BydrkuC5R1ZuUFnm27EeFJGoEHJQgn3uleRvmTXaJgfXbt4g==", + "dev": true, + "license": "MIT", + "dependencies": { + "acorn": "^8.11.0" + }, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/agent-base": { "version": "7.1.0", "license": "MIT", @@ -1448,6 +1527,13 @@ "dev": true, "license": "MIT" }, + "node_modules/arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "dev": true, + "license": "MIT" + }, "node_modules/argparse": { "version": "2.0.1", "dev": true, @@ -2315,6 +2401,13 @@ } } }, + "node_modules/create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "dev": true, + "license": "MIT" + }, "node_modules/cross-spawn": { "version": "7.0.3", "dev": true, @@ -3124,6 +3217,29 @@ "node": ">=4" } }, + "node_modules/fast-check": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/fast-check/-/fast-check-4.0.0.tgz", + "integrity": "sha512-aXLyLemZ7qhLNn2oq+YpjT2Xed21+i29WGAYuyrGbU4r8oinB3i4XR4e62O3NY6qmm5qHEDoc/7d+gMsri3AfA==", + "dev": true, + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/dubzzz" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fast-check" + } + ], + "license": "MIT", + "dependencies": { + "pure-rand": "^7.0.0" + }, + "engines": { + "node": ">=12.17.0" + } + }, "node_modules/fast-glob": { "version": "3.3.2", "dev": true, @@ -5088,6 +5204,13 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "dev": true, + "license": "ISC" + }, "node_modules/marked": { "version": "4.3.0", "dev": true, @@ -6399,6 +6522,23 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/pure-rand": { + "version": "7.0.1", + "resolved": "https://registry.npmjs.org/pure-rand/-/pure-rand-7.0.1.tgz", + "integrity": "sha512-oTUZM/NAZS8p7ANR3SHh30kXB+zK2r2BPcEn/awJIbOvq82WoMN4p62AWWp3Hhw50G0xMsw1mhIBLqHw64EcNQ==", + "dev": true, + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/dubzzz" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fast-check" + } + ], + "license": "MIT" + }, "node_modules/qs": { "version": "6.13.0", "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", @@ -7576,6 +7716,60 @@ "node": ">=0.8.0" } }, + "node_modules/ts-node": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", + "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@cspotcode/source-map-support": "^0.8.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "v8-compile-cache-lib": "^3.0.1", + "yn": "3.1.1" + }, + "bin": { + "ts-node": "dist/bin.js", + "ts-node-cwd": "dist/bin-cwd.js", + "ts-node-esm": "dist/bin-esm.js", + "ts-node-script": "dist/bin-script.js", + "ts-node-transpile-only": "dist/bin-transpile.js", + "ts-script": "dist/bin-script-deprecated.js" + }, + "peerDependencies": { + "@swc/core": ">=1.2.50", + "@swc/wasm": ">=1.2.50", + "@types/node": "*", + "typescript": ">=2.7" + }, + "peerDependenciesMeta": { + "@swc/core": { + "optional": true + }, + "@swc/wasm": { + "optional": true + } + } + }, + "node_modules/ts-node/node_modules/diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true, + "license": "BSD-3-Clause", + "engines": { + "node": ">=0.3.1" + } + }, "node_modules/tslib": { "version": "2.6.2", "license": "0BSD" @@ -7956,6 +8150,13 @@ "uuid": "dist/bin/uuid" } }, + "node_modules/v8-compile-cache-lib": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", + "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", + "dev": true, + "license": "MIT" + }, "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", @@ -8324,6 +8525,16 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=6" + } + }, "node_modules/yocto-queue": { "version": "0.1.0", "dev": true, @@ -8483,6 +8694,7 @@ "name": "@redis/test-utils", "devDependencies": { "@types/yargs": "^17.0.32", + "fast-check": "^4.0.0", "yargs": "^17.7.2" }, "peerDependencies": { diff --git a/package.json b/package.json index 0a29c71f831..2ff2d4825ae 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "release-it": "^17.0.3", "tsx": "^4.7.0", "typedoc": "^0.25.7", - "typescript": "^5.3.3" + "typescript": "^5.3.3", + "ts-node": "^10.9.2" } } diff --git a/packages/client/lib/client/cache.spec.ts b/packages/client/lib/client/cache.spec.ts index 258104ed4c4..6a77cb4a697 100644 --- a/packages/client/lib/client/cache.spec.ts +++ b/packages/client/lib/client/cache.spec.ts @@ -1,8 +1,11 @@ import assert from "assert"; +import { generateCacheKey } from "./cache"; +import { RedisArgument } from '../RESP/types'; import testUtils, { GLOBAL } from "../test-utils" import { BasicClientSideCache, BasicPooledClientSideCache } from "./cache" import { REDIS_FLUSH_MODES } from "../commands/FLUSHALL"; import { once } from 'events'; +import fc from 'fast-check'; describe("Client Side Cache", () => { describe('Basic Cache', () => { @@ -457,4 +460,152 @@ describe("Client Side Cache", () => { } }) }); -}); \ No newline at end of file +}); +describe("generateCacheKey property based tests", () => { + // We'll use a mix of ASCII and Unicode strings for thorough testing + const argumentArb = fc.oneof( + fc.string({ minLength: 0, maxLength: 20, unit: 'grapheme-ascii' }), // ASCII strings + fc.string({ minLength: 0, maxLength: 20, unit: 'grapheme' }) // Unicode strings with proper graphemes + ); + const argumentsArb = fc.array(argumentArb, { minLength: 1, maxLength: 10 }); + + it("should generate deterministic keys", () => { + fc.assert( + fc.property(argumentsArb, (args) => { + const key1 = generateCacheKey(args); + const key2 = generateCacheKey([...args]); // Create a copy + return key1 === key2; + }) + ); + }); + + it("should generate different keys for different commands", () => { + fc.assert( + fc.property( + argumentsArb, + argumentArb, + (args, differentArg) => { + fc.pre(args.length > 0 && !args.includes(differentArg)); + + // Create a modified copy with one different argument + const modifiedArgs = [...args]; + const randomIndex = Math.floor(Math.random() * args.length); + modifiedArgs[randomIndex] = differentArg; + + const key1 = generateCacheKey(args); + const key2 = generateCacheKey(modifiedArgs); + + return key1 !== key2; + } + ) + ); + }); + + it("should be sensitive to argument order", () => { + fc.assert( + fc.property( + fc.tuple( + fc.string({ minLength: 1, unit: 'grapheme' }), + fc.string({ minLength: 1, unit: 'grapheme' }) + ).filter(([a, b]) => a !== b), + ([str1, str2]) => { + const args1 = [str1, str2]; + const args2 = [str2, str1]; + + const key1 = generateCacheKey(args1); + const key2 = generateCacheKey(args2); + + return key1 !== key2; + } + ) + ); + });; + + it("should produce no collisions across different argument patterns", () => { + // This test tries to find a collision by generating many random argument sets + const cacheKeyMap = new Map(); + + fc.assert( + fc.property(argumentsArb, (args) => { + const key = generateCacheKey(args); + + if (cacheKeyMap.has(key)) { + const existingArgs = cacheKeyMap.get(key)!; + // Check if this is actually the same arguments + const argsEqual = args.length === existingArgs.length && + args.every((arg, i) => arg === existingArgs[i]); + + // If args are different but key is the same, we found a collision + if (!argsEqual) { + return false; // This will fail the test + } + } else { + cacheKeyMap.set(key, [...args]); + } + + return true; + }), + { + numRuns: 1000, + examples: [ + [["1", "22"]], // "1" has length 1, "22" has length 2 + [["12", "2"]], // "12" has length 2, "2" has length 1 + [["12_3", "1"]], // Edge case with underscore in string + [["1_2", "123"]], // Another underscore test case + [["_", "__"]], // Just underscores of different lengths + [["1", "1_"]], // String and string with underscore suffix + [["1__", "111"]], // Multiple underscores vs repeated digit + + ] + } + ); + }); + + describe("generateCacheKey 'hash' function ", () => { + const argumentArb = fc.oneof( + fc.string({ unit: 'grapheme-ascii' }), // string ascii values + fc.string({ unit: 'grapheme' }),// string unicode values + fc.integer().map(String), // numeric values as strings + fc.constant(""), + fc.constant("_") + ); + const argumentsArb = fc.array(argumentArb, { minLength: 1, maxLength: 10 }); + + it("should generate keys according to its specification", () => { + fc.assert( + fc.property(argumentsArb, (args) => { + const generatedKey = generateCacheKey(args); + + const tmp = new Array(args.length * 2); + for (let i = 0; i < args.length; i++) { + tmp[i] = String(args[i]).length; // Lengths first + tmp[i + args.length] = String(args[i]); // Then values + } + const expectedKey = tmp.join('_'); + + // Verify the generated key matches our expected key + return generatedKey === expectedKey; + }), + { + numRuns: 1000, // Run more tests for better coverage + examples: [ + [[""]], // Single empty string + [["0"]], // String that looks like a number + [["_"]], // String with just an underscore + [["_", ""]], // Underscore and empty string + [["a_b"]], // String containing an underscore + [["a", "b"]], // Multiple normal strings + [["", "", ""]], // Multiple empty strings + [["1", "22"]], // "1" has length 1, "22" has length 2 + [["12", "2"]], // "12" has length 2, "2" has length 1 + [["12_3", "1"]], // Edge case with underscore in string + [["1_2", "123"]], // Another underscore test case + [["_", "__"]], // Just underscores of different lengths + [["1", "1_"]], // String and string with underscore suffix + [["1__", "111"]], // Multiple underscores vs repeated digit + ] + } + ); + }); + }); +}); diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts index f17743ff071..812bc115b4e 100644 --- a/packages/client/lib/client/cache.ts +++ b/packages/client/lib/client/cache.ts @@ -24,12 +24,12 @@ interface ClientSideCacheEntry { validate(): boolean; } -function generateCacheKey(redisArgs: ReadonlyArray): string { - const tmp = new Array(redisArgs.length*2); +export function generateCacheKey(redisArgs: ReadonlyArray): string { + const tmp = new Array(redisArgs.length * 2); for (let i = 0; i < redisArgs.length; i++) { tmp[i] = redisArgs[i].length; - tmp[i+redisArgs.length] = redisArgs[i]; + tmp[i + redisArgs.length] = redisArgs[i]; } return tmp.join('_'); @@ -155,7 +155,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { } } else { // 3/3a this.#cacheMiss(); - + const promise = fn(); cacheEntry = this.createPromiseEntry(client, promise); @@ -167,7 +167,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { if (cacheEntry.validate()) { // on error, have to remove promise from cache this.delete(cacheKey!); } - + throw err; } } @@ -187,7 +187,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { this.set(cacheKey, cacheEntry, parser.keys); this.emit("cached-key", cacheKey); } else { -// console.log("cache entry for key got invalidated between execution and saving, so not saving"); + // console.log("cache entry for key got invalidated between execution and saving, so not saving"); } return structuredClone(val); @@ -206,7 +206,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { } const keySet = this.#keyToCacheKeySetMap.get(key.toString()); - if (keySet) { + if (keySet) { for (const cacheKey of keySet) { const entry = this.#cacheKeyToEntryMap.get(cacheKey); if (entry) { @@ -371,8 +371,8 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache } return super.has(cacheKey); - } - + } + onPoolClose() { this.clear(); }; @@ -419,7 +419,7 @@ class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { override validate(): boolean { let ret = super.validate(); - + return ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch } } @@ -445,7 +445,7 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache } // don't clear cache on error here - override onError() {} + override onError() { } - override onClose() {} -} \ No newline at end of file + override onClose() { } +} diff --git a/packages/test-utils/package.json b/packages/test-utils/package.json index f7373f6add1..bdfd785037b 100644 --- a/packages/test-utils/package.json +++ b/packages/test-utils/package.json @@ -11,6 +11,7 @@ }, "devDependencies": { "@types/yargs": "^17.0.32", - "yargs": "^17.7.2" + "yargs": "^17.7.2", + "fast-check": "^4.0.0" } } From d8345ce5337d3d604d64ed39546bef39b944d161 Mon Sep 17 00:00:00 2001 From: borislav ivanov Date: Mon, 17 Mar 2025 14:25:46 +0200 Subject: [PATCH 9/9] tests: bump test container to 8.0-M05-pre --- .github/workflows/tests.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7bcc72e5408..6970eaf4b9b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -7,22 +7,22 @@ on: - v4.0 - v5 paths-ignore: - - '**/*.md' + - "**/*.md" pull_request: branches: - master - v4.0 - v5 paths-ignore: - - '**/*.md' + - "**/*.md" jobs: tests: runs-on: ubuntu-latest strategy: fail-fast: false matrix: - node-version: [ '18', '20', '22' ] - redis-version: [ 'rs-7.2.0-v13', 'rs-7.4.0-v1', '8.0-M04-pre' ] + node-version: ["18", "20", "22"] + redis-version: ["rs-7.2.0-v13", "rs-7.4.0-v1", "8.0-M05-pre"] steps: - uses: actions/checkout@v4 with: