diff --git a/package-lock.json b/package-lock.json index 7b2bb8307d..4282677333 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12058,6 +12058,20 @@ "node": ">=14" } }, + "node_modules/@redis/client": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.0.1.tgz", + "integrity": "sha512-k0EJvlMGEyBqUD3orKe0UMZ66fPtfwqPIr+ZSd853sXj2EyhNtPXSx+J6sENXJNgAlEBhvD+57Dwt0qTisKB0A==", + "license": "MIT", + "optional": true, + "peer": true, + "dependencies": { + "cluster-key-slot": "1.1.2" + }, + "engines": { + "node": ">= 18" + } + }, "node_modules/@rollup/rollup-android-arm-eabi": { "version": "4.40.2", "resolved": "https://registry.npmjs.org/@rollup/rollup-android-arm-eabi/-/rollup-android-arm-eabi-4.40.2.tgz", @@ -15601,6 +15615,17 @@ "semver": "bin/semver" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "optional": true, + "peer": true, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/cmd-shim": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/cmd-shim/-/cmd-shim-6.0.1.tgz", @@ -25047,7 +25072,8 @@ "peerDependencies": { "@aws-sdk/client-dynamodb": ">=3.x", "@aws-sdk/lib-dynamodb": ">=3.x", - "@middy/core": "4.x || 5.x || 6.x" + "@middy/core": "4.x || 5.x || 6.x", + "@redis/client": "^5.0.1" }, "peerDependenciesMeta": { "@aws-sdk/client-dynamodb": { @@ -25058,6 +25084,9 @@ }, "@middy/core": { "optional": true + }, + "@redis/client": { + "optional": true } } }, diff --git a/packages/idempotency/package.json b/packages/idempotency/package.json index db6698845a..f2fbea652f 100644 --- a/packages/idempotency/package.json +++ b/packages/idempotency/package.json @@ -52,6 +52,14 @@ "import": "./lib/esm/types/DynamoDBPersistence.js", "require": "./lib/cjs/types/DynamoDBPersistence.js" }, + "./cache": { + "import": "./lib/esm/persistence/CachePersistenceLayer.js", + "require": "./lib/cjs/persistence/CachePersistenceLayer.js" + }, + "./cache/types": { + "import": "./lib/esm/types/CachePersistence.js", + "require": "./lib/cjs/types/CachePersistence.js" + }, "./middleware": { "import": "./lib/esm/middleware/makeHandlerIdempotent.js", "require": "./lib/cjs/middleware/makeHandlerIdempotent.js" @@ -75,6 +83,14 @@ "lib/cjs/types/DynamoDBPersistence.d.ts", "lib/esm/types/DynamoDBPersistence.d.ts" ], + "cache": [ + "lib/cjs/persistence/CachePersistenceLayer.d.ts", + "lib/esm/persistence/CachePersistenceLayer.d.ts" + ], + "cache/types": [ + "lib/cjs/types/CachePersistence.d.ts", + "lib/esm/types/CachePersistence.d.ts" + ], "middleware": [ "lib/cjs/middleware/makeHandlerIdempotent.d.ts", "lib/esm/middleware/makeHandlerIdempotent.d.ts" @@ -104,7 +120,9 @@ "peerDependencies": { "@aws-sdk/client-dynamodb": ">=3.x", "@aws-sdk/lib-dynamodb": ">=3.x", - "@middy/core": "4.x || 5.x || 6.x" + "@middy/core": "4.x || 5.x || 6.x", + "@redis/client": "^5.0.1", + "@valkey/valkey-glide": "^1.3.4" }, "peerDependenciesMeta": { "@aws-sdk/client-dynamodb": { @@ -115,6 +133,12 @@ }, "@middy/core": { "optional": true + }, + "@redis/client": { + "optional": true + }, + "@valkey/valkey-glide": { + "optional": true } }, "keywords": [ diff --git a/packages/idempotency/src/constants.ts b/packages/idempotency/src/constants.ts index da6efbf7a3..404a0bf95f 100644 --- a/packages/idempotency/src/constants.ts +++ b/packages/idempotency/src/constants.ts @@ -1,3 +1,4 @@ +import type { BasePersistenceAttributes } from './types/BasePersistenceLayer.js'; /** * Number of times to retry a request in case of `IdempotencyInconsistentStateError` * @@ -20,4 +21,22 @@ const IdempotencyRecordStatus = { EXPIRED: 'EXPIRED', } as const; -export { IdempotencyRecordStatus, MAX_RETRIES }; +/** + * Base persistence attribute key names for persistence layers + */ +const PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS: Record< + keyof Required, + string +> = { + statusAttr: 'status', + expiryAttr: 'expiration', + inProgressExpiryAttr: 'in_progress_expiration', + dataAttr: 'data', + validationKeyAttr: 'validation', +} as const; + +export { + IdempotencyRecordStatus, + MAX_RETRIES, + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS, +}; diff --git a/packages/idempotency/src/errors.ts b/packages/idempotency/src/errors.ts index ba2a0b77ce..0acd58d576 100644 --- a/packages/idempotency/src/errors.ts +++ b/packages/idempotency/src/errors.ts @@ -113,6 +113,16 @@ class IdempotencyKeyError extends IdempotencyUnknownError { } } +/** + * Error with the persistence layer's consistency, needs to be removed + */ +class IdempotencyPersistenceConsistencyError extends IdempotencyUnknownError { + public constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = 'IdempotencyPersistenceConsistencyError'; + } +} + export { IdempotencyUnknownError, IdempotencyItemAlreadyExistsError, @@ -122,5 +132,6 @@ export { IdempotencyValidationError, IdempotencyInconsistentStateError, IdempotencyPersistenceLayerError, + IdempotencyPersistenceConsistencyError, IdempotencyKeyError, }; diff --git a/packages/idempotency/src/index.ts b/packages/idempotency/src/index.ts index 9d96f93163..6a76aac37a 100644 --- a/packages/idempotency/src/index.ts +++ b/packages/idempotency/src/index.ts @@ -12,4 +12,7 @@ export { export { IdempotencyConfig } from './IdempotencyConfig.js'; export { makeIdempotent } from './makeIdempotent.js'; export { idempotent } from './idempotencyDecorator.js'; -export { IdempotencyRecordStatus } from './constants.js'; +export { + IdempotencyRecordStatus, + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS, +} from './constants.js'; diff --git a/packages/idempotency/src/persistence/BasePersistenceLayer.ts b/packages/idempotency/src/persistence/BasePersistenceLayer.ts index 0c13a6d753..cc29ac6e4b 100644 --- a/packages/idempotency/src/persistence/BasePersistenceLayer.ts +++ b/packages/idempotency/src/persistence/BasePersistenceLayer.ts @@ -30,7 +30,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface { // envVarsService is always initialized in the constructor private envVarsService!: EnvironmentVariablesService; private eventKeyJmesPath?: string; - private expiresAfterSeconds: number = 60 * 60; // 1 hour default + protected expiresAfterSeconds: number = 60 * 60; // 1 hour default private hashFunction = 'md5'; private payloadValidationEnabled = false; private throwOnNoIdempotencyKey = false; diff --git a/packages/idempotency/src/persistence/CachePersistenceLayer.ts b/packages/idempotency/src/persistence/CachePersistenceLayer.ts new file mode 100644 index 0000000000..a8b2e8037a --- /dev/null +++ b/packages/idempotency/src/persistence/CachePersistenceLayer.ts @@ -0,0 +1,340 @@ +import { + IdempotencyRecordStatus, + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS, +} from '../constants.js'; +import { + IdempotencyItemAlreadyExistsError, + IdempotencyItemNotFoundError, + IdempotencyPersistenceConsistencyError, + IdempotencyUnknownError, +} from '../errors.js'; +import type { + CacheClient, + CachePersistenceOptions, +} from '../types/CachePersistence.js'; +import type { IdempotencyRecordStatusValue } from '../types/IdempotencyRecord.js'; +import { BasePersistenceLayer } from './BasePersistenceLayer.js'; +import { IdempotencyRecord } from './IdempotencyRecord.js'; + +/** + * Valkey and Redis OOS-compatible persistence layer for idempotency records. + * + * This class uses a cache client to write and read idempotency records. It supports any client that + * implements the {@link CacheClient | `CacheClient`} interface. + * + * There are various options to configure the persistence layer, such as attribute names for storing + * status, expiry, data, and validation keys in the cache. + * + * You must provide your own connected client instance by passing it through the `client` option. + * + * See the {@link https://docs.powertools.aws.dev/lambda/typescript/latest/utilities/idempotency/ Idempotency documentation} + * for more details on the configuration and usage patterns. + * + * **Using Valkey Glide Client** + * + * @example + * ```ts + * import { GlideClient } from '@valkey/valkey-glide'; + * import { CachePersistenceLayer } from '@aws-lambda-powertools/idempotency/cache'; + * + * const client = await GlideClient.createClient({ + * addresses: [{ + * host: process.env.CACHE_ENDPOINT, + * port: Number(process.env.CACHE_PORT), + * }], + * useTLS: true, + * }); + * + * const persistence = new CachePersistenceLayer({ + * client, + * }); + * + * // ... your function handler here + * ``` + * + * **Using Redis Client** + * + * @example + * ```ts + * import { createClient } from '@redis/client'; + * import { CachePersistenceLayer } from '@aws-lambda-powertools/idempotency/cache'; + * + * const client = await createClient({ + * url: `rediss://${process.env.CACHE_ENDPOINT}:${process.env.CACHE_PORT}`, + * username: 'default', + * }).connect(); + * + * const persistence = new CachePersistenceLayer({ + * client, + * }); + * + * // ... your function handler here + * ``` + * + * @category Persistence Layer + */ +class CachePersistenceLayer extends BasePersistenceLayer { + readonly #client: CacheClient; + readonly #dataAttr: string; + readonly #expiryAttr: string; + readonly #inProgressExpiryAttr: string; + readonly #statusAttr: string; + readonly #validationKeyAttr: string; + readonly #orphanLockTimeout: number; + + public constructor(options: CachePersistenceOptions) { + super(); + + this.#statusAttr = + options.statusAttr ?? PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.statusAttr; + this.#expiryAttr = + options.expiryAttr ?? PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.expiryAttr; + this.#inProgressExpiryAttr = + options.inProgressExpiryAttr ?? + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.inProgressExpiryAttr; + this.#dataAttr = + options.dataAttr ?? PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.dataAttr; + this.#validationKeyAttr = + options.validationKeyAttr ?? + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.validationKeyAttr; + this.#orphanLockTimeout = Math.min(10, this.expiresAfterSeconds); + this.#client = options.client; + } + + /** + * Deletes the idempotency record associated with a given record from the persistence store. + * + * This function is designed to be called after a Lambda handler invocation has completed processing. + * It ensures that the idempotency key associated with the record is removed from the cache to + * prevent future conflicts and to maintain the idempotency integrity. + * + * Note: it is essential that the idempotency key is not empty, as that would indicate the Lambda + * handler has not been invoked or the key was not properly set. + * + * @param record + */ + protected async _deleteRecord(record: IdempotencyRecord): Promise { + await this.#client.del([record.idempotencyKey]); + } + + protected async _putRecord(record: IdempotencyRecord): Promise { + if (record.getStatus() === IdempotencyRecordStatus.INPROGRESS) { + await this.#putInProgressRecord(record); + } else { + throw new IdempotencyUnknownError( + 'Only INPROGRESS records can be inserted with _putRecord' + ); + } + } + + protected async _getRecord( + idempotencyKey: string + ): Promise { + const response = await this.#client.get(idempotencyKey); + + if (response === null) { + throw new IdempotencyItemNotFoundError( + 'Item does not exist in persistence store' + ); + } + try { + const item = JSON.parse(response as string); + return new IdempotencyRecord({ + idempotencyKey: idempotencyKey, + status: item[this.#statusAttr] as IdempotencyRecordStatusValue, + expiryTimestamp: item[this.#expiryAttr] as number | undefined, + inProgressExpiryTimestamp: item[this.#inProgressExpiryAttr] as + | number + | undefined, + responseData: item[this.#dataAttr], + payloadHash: item[this.#validationKeyAttr] as string | undefined, + }); + } catch (error) { + throw new IdempotencyPersistenceConsistencyError( + 'Idempotency persistency consistency error, needs to be removed', + error as Error + ); + } + } + + protected async _updateRecord(record: IdempotencyRecord): Promise { + const item: Record = { + [this.#statusAttr]: record.getStatus(), + [this.#expiryAttr]: record.expiryTimestamp, + [this.#dataAttr]: record.responseData, + }; + + const encodedItem = JSON.stringify(item); + const ttl = this.#getExpirySeconds(record.expiryTimestamp); + // Need to set ttl again, if we don't set `EX` here the record will not have a ttl + await this.#client.set(record.idempotencyKey, encodedItem, { + EX: ttl, + }); + } + + /** + * Put a record in the persistence store with a status of "INPROGRESS". + * + * The method guards against concurrent execution by using conditional write operations. + */ + async #putInProgressRecord(record: IdempotencyRecord): Promise { + const item: Record = { + [this.#statusAttr]: record.getStatus(), + [this.#expiryAttr]: record.expiryTimestamp, + }; + + if (record.inProgressExpiryTimestamp !== undefined) { + item[this.#inProgressExpiryAttr] = record.inProgressExpiryTimestamp; + } + + if (this.isPayloadValidationEnabled() && record.payloadHash !== undefined) { + item[this.#validationKeyAttr] = record.payloadHash; + } + + const encodedItem = JSON.stringify(item); + const ttl = this.#getExpirySeconds(record.expiryTimestamp); + + try { + /** + * | LOCKED | RETRY if status = "INPROGRESS" | RETRY + * |----------------|-------------------------------------------------------|-------------> .... (time) + * | Lambda Idempotency Record + * | Timeout Timeout + * | (in_progress_expiry) (expiry) + * + * Conditions to successfully save a record: + * + * The idempotency key does not exist: + * - first time that this invocation key is used + * - previous invocation with the same key was deleted due to TTL + * - SET see {@link https://valkey.io/commands/set/ | Valkey SET command} + */ + const response = await this.#client.set( + record.idempotencyKey, + encodedItem, + { + EX: ttl, + NX: true, + } + ); + + /** + * If response is not `null`, the SET operation was successful and the idempotency key was not + * previously set. This indicates that we can safely proceed to the handler execution phase. + * Most invocations should successfully proceed past this point. + */ + if (response !== null) { + return; + } + + /** + * If response is `null`, it indicates an existing record in the cache for the given idempotency key. + * + * This could be due to: + * - An active idempotency record from a previous invocation that has not yet expired. + * - An orphan record where a previous invocation has timed out. + * - An expired idempotency record that has not been deleted yet. + * + * In any case, we proceed to retrieve the record for further inspection. + */ + const existingRecord = await this._getRecord(record.idempotencyKey); + + /** + * If the status of the idempotency record is `COMPLETED` and the record has not expired + * then a valid completed record exists. We raise an error to prevent duplicate processing + * of a request that has already been completed successfully. + */ + if ( + existingRecord.getStatus() === IdempotencyRecordStatus.COMPLETED && + !existingRecord.isExpired() + ) { + throw new IdempotencyItemAlreadyExistsError( + `Failed to put record for already existing idempotency key: ${record.idempotencyKey}`, + existingRecord + ); + } + + /** + * If the idempotency record has a status of 'INPROGRESS' and has a valid `inProgressExpiryTimestamp` + * (meaning the timestamp is greater than the current timestamp in milliseconds), then we have encountered + * a valid in-progress record. This indicates that another process is currently handling the request, and + * to maintain idempotency, we raise an error to prevent concurrent processing of the same request. + */ + if ( + existingRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS && + existingRecord.inProgressExpiryTimestamp && + existingRecord.inProgressExpiryTimestamp > Date.now() + ) { + throw new IdempotencyItemAlreadyExistsError( + `Failed to put record for in-progress idempotency key: ${record.idempotencyKey}`, + existingRecord + ); + } + + /** + * Reaching this point indicates that the idempotency record found is an orphan record. An orphan record is + * one that is neither completed nor in-progress within its expected time frame. It may result from a + * previous invocation that has timed out or an expired record that has yet to be cleaned up from the cache. + * We raise an error to handle this exceptional scenario appropriately. + */ + throw new IdempotencyPersistenceConsistencyError( + 'Orphaned record detected' + ); + } catch (error) { + if (error instanceof IdempotencyPersistenceConsistencyError) { + /** + * Handle an orphan record by attempting to acquire a lock, which by default lasts for 10 seconds. + * The purpose of acquiring the lock is to prevent race conditions with other processes that might + * also be trying to handle the same orphan record. Once the lock is acquired, we set a new value + * for the idempotency record in the cache with the appropriate time-to-live (TTL). + */ + await this.#acquireLock(record.idempotencyKey); + + await this.#client.set(record.idempotencyKey, encodedItem, { + EX: ttl, + }); + } else { + throw error; + } + } + } + + /** + * Calculates the number of seconds remaining until a specified expiry timestamp + */ + #getExpirySeconds(expiryTimestamp?: number): number { + if (expiryTimestamp) { + return expiryTimestamp - Math.floor(Date.now() / 1000); + } + return this.expiresAfterSeconds; + } + + /** + * Attempt to acquire a lock for a specified resource name, with a default timeout. + * This method attempts to set a lock to prevent concurrent access to a resource + * identified by 'idempotencyKey'. It uses the 'NX' flag to ensure that the lock is only + * set if it does not already exist, thereby enforcing mutual exclusion. + * + * @param idempotencyKey - The key to create a lock for + */ + async #acquireLock(idempotencyKey: string): Promise { + const lockKey = `${idempotencyKey}:lock`; + const lockValue = 'true'; + + const acquired = await this.#client.set(lockKey, lockValue, { + EX: this.#orphanLockTimeout, + NX: true, + }); + + if (acquired) return; + /** If the lock acquisition fails, it suggests a race condition has occurred. In this case, instead of + * proceeding, we log the event and raise an error to indicate that the current operation should be + * retried after the lock is released by the process that currently holds it. + */ + throw new IdempotencyItemAlreadyExistsError( + 'Lock acquisition failed, raise to retry' + ); + } +} + +export { CachePersistenceLayer }; diff --git a/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts b/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts index 8354f6032e..e72cb0a5e5 100644 --- a/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts +++ b/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts @@ -7,13 +7,15 @@ import { ConditionalCheckFailedException, DeleteItemCommand, DynamoDBClient, - type DynamoDBClientConfig, GetItemCommand, PutItemCommand, UpdateItemCommand, } from '@aws-sdk/client-dynamodb'; import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'; -import { IdempotencyRecordStatus } from '../constants.js'; +import { + IdempotencyRecordStatus, + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS, +} from '../constants.js'; import { IdempotencyItemAlreadyExistsError, IdempotencyItemNotFoundError, @@ -49,7 +51,6 @@ import { IdempotencyRecord } from './IdempotencyRecord.js'; */ class DynamoDBPersistenceLayer extends BasePersistenceLayer { private client: DynamoDBClient; - private clientConfig: DynamoDBClientConfig = {}; private dataAttr: string; private expiryAttr: string; private inProgressExpiryAttr: string; @@ -65,12 +66,18 @@ class DynamoDBPersistenceLayer extends BasePersistenceLayer { this.tableName = config.tableName; this.keyAttr = config.keyAttr ?? 'id'; - this.statusAttr = config.statusAttr ?? 'status'; - this.expiryAttr = config.expiryAttr ?? 'expiration'; + this.statusAttr = + config.statusAttr ?? PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.statusAttr; + this.expiryAttr = + config.expiryAttr ?? PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.expiryAttr; this.inProgressExpiryAttr = - config.inProgressExpiryAttr ?? 'in_progress_expiration'; - this.dataAttr = config.dataAttr ?? 'data'; - this.validationKeyAttr = config.validationKeyAttr ?? 'validation'; + config.inProgressExpiryAttr ?? + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.inProgressExpiryAttr; + this.dataAttr = + config.dataAttr ?? PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.dataAttr; + this.validationKeyAttr = + config.validationKeyAttr ?? + PERSISTENCE_ATTRIBUTE_KEY_MAPPINGS.validationKeyAttr; if (config.sortKeyAttr === this.keyAttr) { throw new Error( `keyAttr [${this.keyAttr}] and sortKeyAttr [${config.sortKeyAttr}] cannot be the same!` diff --git a/packages/idempotency/src/persistence/IdempotencyRecord.ts b/packages/idempotency/src/persistence/IdempotencyRecord.ts index b0aa037a63..9d83a17bb8 100644 --- a/packages/idempotency/src/persistence/IdempotencyRecord.ts +++ b/packages/idempotency/src/persistence/IdempotencyRecord.ts @@ -13,7 +13,7 @@ import type { DynamoDBPersistenceLayer } from './DynamoDBPersistenceLayer.js'; */ class IdempotencyRecord { /** - * The expiry timestamp of the record in milliseconds UTC. + * The expiry timestamp of the record in seconds UTC. */ public expiryTimestamp?: number; /** diff --git a/packages/idempotency/src/types/BasePersistenceLayer.ts b/packages/idempotency/src/types/BasePersistenceLayer.ts index c3c7f19e55..cef0821407 100644 --- a/packages/idempotency/src/types/BasePersistenceLayer.ts +++ b/packages/idempotency/src/types/BasePersistenceLayer.ts @@ -16,4 +16,26 @@ interface BasePersistenceLayerInterface { getRecord(data: unknown): Promise; } -export type { BasePersistenceLayerOptions, BasePersistenceLayerInterface }; +/** + * Base attributes used by the persistence layer i.e. DynamoDB, Redis, etc. + * + * @interface + * @property {string} [expiryAttr] - The attribute name for expiry timestamp. Defaults to 'expiration'. + * @property {string} [inProgressExpiryAttr] - The attribute name for in-progress expiry timestamp. Defaults to 'in_progress_expiration'. + * @property {string} [statusAttr] - The attribute name for status. Defaults to 'status'. + * @property {string} [dataAttr] - The attribute name for response data. Defaults to 'data'. + * @property {string} [validationKeyAttr] - The attribute name for hashed representation of the parts of the event used for validation. Defaults to 'validation'. + */ +interface BasePersistenceAttributes { + expiryAttr?: string; + inProgressExpiryAttr?: string; + statusAttr?: string; + dataAttr?: string; + validationKeyAttr?: string; +} + +export type { + BasePersistenceLayerOptions, + BasePersistenceLayerInterface, + BasePersistenceAttributes, +}; diff --git a/packages/idempotency/src/types/CachePersistence.ts b/packages/idempotency/src/types/CachePersistence.ts new file mode 100644 index 0000000000..0741f984d1 --- /dev/null +++ b/packages/idempotency/src/types/CachePersistence.ts @@ -0,0 +1,55 @@ +import type { CachePersistenceLayer } from './../persistence/CachePersistenceLayer.js'; +import type { BasePersistenceAttributes } from './BasePersistenceLayer.js'; + +type CacheValue = string | Uint8Array; + +/** + * Interface for clients compatible with Valkey and Redis-OSS operations. + * + * This interface defines the minimum set of operations that must be implemented + * by a client to be used with the cache persistence layer. + * + * It supports basic key-value operations like get, set, and delete. + */ +interface CacheClient { + /** + * Retrieves the value associated with the given key. + * + * @param name - The key to get the value for + */ + get(name: string): Promise; + + /** + * Sets the value for the specified key with optional parameters. + * + * @param name - The key to set + * @param value - The value to set + * @param options - Optional parameters for setting the value + */ + set( + name: CacheValue, + value: unknown, + options?: unknown + ): Promise; + + /** + * Deletes the specified keys from the cache. + * + * @param keys - The keys to delete + */ + del(keys: string[]): Promise; +} + +/** + * Options for the {@link CachePersistenceLayer | `CachePersistenceLayer`} class constructor. + * + * @see {@link BasePersistenceAttributes} for full list of properties. + * + * @interface + * @property client - The client must be properly initialized and connected + */ +interface CachePersistenceOptions extends BasePersistenceAttributes { + client: CacheClient; +} + +export type { CacheClient, CachePersistenceOptions }; diff --git a/packages/idempotency/src/types/DynamoDBPersistence.ts b/packages/idempotency/src/types/DynamoDBPersistence.ts index 15a3f7ab4f..ab8f71394f 100644 --- a/packages/idempotency/src/types/DynamoDBPersistence.ts +++ b/packages/idempotency/src/types/DynamoDBPersistence.ts @@ -2,29 +2,22 @@ import type { DynamoDBClient, DynamoDBClientConfig, } from '@aws-sdk/client-dynamodb'; +import type { BasePersistenceAttributes } from './BasePersistenceLayer.js'; /** * Base interface for DynamoPersistenceOptions. * + * @see {@link BasePersistenceAttributes} for full list of properties. + * * @interface * @property {string} tableName - The DynamoDB table name. * @property {string} [keyAttr] - The DynamoDB table key attribute name. Defaults to 'id'. - * @property {string} [expiryAttr] - The DynamoDB table expiry attribute name. Defaults to 'expiration'. - * @property {string} [inProgressExpiryAttr] - The DynamoDB table in progress expiry attribute name. Defaults to 'in_progress_expiry_attr'. - * @property {string} [statusAttr] - The DynamoDB table status attribute name. Defaults to 'status'. - * @property {string} [dataAttr] - The DynamoDB table data attribute name. Defaults to 'data'. - * @property {string} [validationKeyAttr] - The DynamoDB table validation key attribute name. Defaults to 'validation'. * @property {string} [sortKeyAttr] - The DynamoDB table sort key attribute name, use only when table has one. Defaults to undefined. * @property {string} [staticPkValue] - The DynamoDB table static partition key value, use only with sortKeyAttr. Defaults to `idempotency#{LAMBDA_FUNCTION_NAME}`. */ -interface DynamoDBPersistenceOptionsBase { +interface DynamoDBPersistenceOptionsBase extends BasePersistenceAttributes { tableName: string; keyAttr?: string; - expiryAttr?: string; - inProgressExpiryAttr?: string; - statusAttr?: string; - dataAttr?: string; - validationKeyAttr?: string; sortKeyAttr?: string; staticPkValue?: string; } @@ -64,16 +57,12 @@ interface DynamoDBPersistenceOptionsWithClientInstance /** * Options for the {@link persistence/DynamoDBPersistenceLayer.DynamoDBPersistenceLayer | DynamoDBPersistenceLayer} class constructor. * - * @see {@link DynamoDBPersistenceOptionsBase}, {@link DynamoDBPersistenceOptionsWithClientConfig}, and {@link DynamoDBPersistenceOptionsWithClientInstance} for full list of properties. + * @see {@link BasePersistenceAttributes}, {@link DynamoDBPersistenceOptionsBase}, {@link DynamoDBPersistenceOptionsWithClientConfig}, + * {@link DynamoDBPersistenceOptionsWithClientInstance} for full list of properties. * * @type DynamoDBPersistenceOptions * @property {string} tableName - The DynamoDB table name. * @property {string} [keyAttr] - The DynamoDB table key attribute name. Defaults to 'id'. - * @property {string} [expiryAttr] - The DynamoDB table expiry attribute name. Defaults to 'expiration'. - * @property {string} [inProgressExpiryAttr] - The DynamoDB table in progress expiry attribute name. Defaults to 'in_progress_expiry_attr'. - * @property {string} [statusAttr] - The DynamoDB table status attribute name. Defaults to 'status'. - * @property {string} [dataAttr] - The DynamoDB table data attribute name. Defaults to 'data'. - * @property {string} [validationKeyAttr] - The DynamoDB table validation key attribute name. Defaults to 'validation'. * @property {string} [sortKeyAttr] - The DynamoDB table sort key attribute name, use only when table has one. Defaults to undefined. * @property {string} [staticPkValue] - The DynamoDB table static partition key value, use only with sortKeyAttr. Defaults to `idempotency#{LAMBDA_FUNCTION_NAME}`. * @property {DynamoDBClientConfig} [clientConfig] - Optional configuration to pass during client initialization, e.g. AWS region. Mutually exclusive with awsSdkV3Client. diff --git a/packages/idempotency/src/types/IdempotencyRecord.ts b/packages/idempotency/src/types/IdempotencyRecord.ts index 2ba7a5a490..ee1fed1223 100644 --- a/packages/idempotency/src/types/IdempotencyRecord.ts +++ b/packages/idempotency/src/types/IdempotencyRecord.ts @@ -26,7 +26,7 @@ type IdempotencyRecordOptions = { */ status: IdempotencyRecordStatusValue; /** - * The expiry timestamp of the record in milliseconds UTC. + * The expiry timestamp of the record in seconds UTC. */ expiryTimestamp?: number; /** diff --git a/packages/idempotency/src/types/index.ts b/packages/idempotency/src/types/index.ts index 0158d7855b..193571d092 100644 --- a/packages/idempotency/src/types/index.ts +++ b/packages/idempotency/src/types/index.ts @@ -5,6 +5,7 @@ export type { export type { BasePersistenceLayerInterface, BasePersistenceLayerOptions, + BasePersistenceAttributes, } from './BasePersistenceLayer.js'; export type { IdempotencyConfigOptions, @@ -20,3 +21,7 @@ export type { DynamoDBPersistenceOptionsWithClientConfig, DynamoDBPersistenceOptionsWithClientInstance, } from './DynamoDBPersistence.js'; +export type { + CacheClient, + CachePersistenceOptions, +} from './CachePersistence.js'; diff --git a/packages/idempotency/tests/helpers/idempotencyUtils.ts b/packages/idempotency/tests/helpers/idempotencyUtils.ts index 3c57281814..09fd41699a 100644 --- a/packages/idempotency/tests/helpers/idempotencyUtils.ts +++ b/packages/idempotency/tests/helpers/idempotencyUtils.ts @@ -1,5 +1,6 @@ import { vi } from 'vitest'; import { BasePersistenceLayer } from '../../src/persistence/BasePersistenceLayer.js'; +import { CachePersistenceLayer } from '../../src/persistence/CachePersistenceLayer.js'; import { DynamoDBPersistenceLayer } from '../../src/persistence/DynamoDBPersistenceLayer.js'; import type { IdempotencyRecord } from '../../src/persistence/IdempotencyRecord.js'; @@ -38,4 +39,31 @@ class DynamoDBPersistenceLayerTestClass extends DynamoDBPersistenceLayer { } } -export { PersistenceLayerTestClass, DynamoDBPersistenceLayerTestClass }; +/** + * Dummy class to test the abstract class `CachePersistenceLayer`. + * + * This class is used in the unit tests. + */ +class CachePersistenceLayerTestClass extends CachePersistenceLayer { + public _deleteRecord(record: IdempotencyRecord): Promise { + return super._deleteRecord(record); + } + + public _getRecord(idempotencyKey: string): Promise { + return super._getRecord(idempotencyKey); + } + + public _putRecord(_record: IdempotencyRecord): Promise { + return super._putRecord(_record); + } + + public _updateRecord(record: IdempotencyRecord): Promise { + return super._updateRecord(record); + } +} + +export { + PersistenceLayerTestClass, + DynamoDBPersistenceLayerTestClass, + CachePersistenceLayerTestClass, +}; diff --git a/packages/idempotency/tests/unit/persistence/CachePersistenceLayer.test.ts b/packages/idempotency/tests/unit/persistence/CachePersistenceLayer.test.ts new file mode 100644 index 0000000000..a4e7e99b54 --- /dev/null +++ b/packages/idempotency/tests/unit/persistence/CachePersistenceLayer.test.ts @@ -0,0 +1,405 @@ +import { + afterAll, + afterEach, + beforeAll, + describe, + expect, + it, + vi, +} from 'vitest'; +import { IdempotencyRecordStatus } from '../../../src/constants.js'; +import { + IdempotencyItemNotFoundError, + IdempotencyPersistenceConsistencyError, +} from '../../../src/errors.js'; +import { IdempotencyRecord } from '../../../src/persistence/IdempotencyRecord.js'; +import { CachePersistenceLayerTestClass } from '../../helpers/idempotencyUtils.js'; + +const getFutureTimestamp = (seconds: number): number => + Math.floor(Date.now() / 1000) + seconds; +const getFutureTimestampInMillis = (seconds: number): number => + getFutureTimestamp(seconds) * 1000; + +const dummyKey = 'someKey'; +const client = { + get: vi.fn(), + set: vi.fn(), + del: vi.fn(), +}; +const persistenceLayer = new CachePersistenceLayerTestClass({ + client, +}); + +describe('Class: CachePersistenceLayerTestClass', () => { + beforeAll(() => { + vi.useFakeTimers().setSystemTime(new Date()); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + afterAll(() => { + vi.useRealTimers(); + }); + + describe('Method: _putRecord', () => { + it('puts a record with INPROGRESS status into the cache', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: getFutureTimestamp(10), + }); + client.set.mockResolvedValue('OK'); + + // Act + await persistenceLayer._putRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status: IdempotencyRecordStatus.INPROGRESS, + expiration: record.expiryTimestamp, + }), + { EX: 10, NX: true } + ); + }); + + it('puts the record in the cache when using an in progress expiry timestamp', async () => { + // Prepare + const status = IdempotencyRecordStatus.INPROGRESS; + const expiryTimestamp = getFutureTimestamp(10); + const inProgressExpiryTimestamp = getFutureTimestampInMillis(5); + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status, + expiryTimestamp, + inProgressExpiryTimestamp, + }); + + // Act + await persistenceLayer._putRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status, + expiration: expiryTimestamp, + in_progress_expiration: inProgressExpiryTimestamp, + }), + { EX: 10, NX: true } + ); + }); + + it('puts record in the cache when using payload validation', async () => { + // Prepare + const persistenceLayerSpy = vi + .spyOn(persistenceLayer, 'isPayloadValidationEnabled') + .mockReturnValue(true); + const expiryTimestamp = getFutureTimestamp(10); + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp, + payloadHash: 'someHash', + }); + + // Act + await persistenceLayer._putRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status: IdempotencyRecordStatus.INPROGRESS, + expiration: expiryTimestamp, + validation: 'someHash', + }), + { EX: 10, NX: true } + ); + persistenceLayerSpy.mockRestore(); + }); + + it('puts record in the cache with default expiry timestamp', async () => { + // Prepare + const status = IdempotencyRecordStatus.INPROGRESS; + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status, + }); + + // Act + await persistenceLayer._putRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status, + }), + { EX: 60 * 60, NX: true } + ); + }); + + it('handles orphaned records by acquiring a lock and updating', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: getFutureTimestamp(10), + }); + + client.set.mockResolvedValueOnce(null).mockResolvedValueOnce('OK'); + client.get.mockResolvedValueOnce( + JSON.stringify({ + status: IdempotencyRecordStatus.INPROGRESS, + expiration: getFutureTimestamp(-10), + }) + ); + + // Act + await persistenceLayer._putRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + `${dummyKey}:lock`, + 'true', + expect.objectContaining({ EX: 10, NX: true }) + ); + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status: IdempotencyRecordStatus.INPROGRESS, + expiration: record.expiryTimestamp, + }), + { EX: 10 } + ); + }); + + it('handles orphaned records by acquiring a lock but it fails and throw error', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: getFutureTimestamp(10), + }); + + client.set + .mockResolvedValueOnce(null) // First attempt to set fails + .mockResolvedValueOnce(null); // Lock acquisition fails + client.get.mockResolvedValueOnce( + JSON.stringify({ + status: IdempotencyRecordStatus.INPROGRESS, + expiration: getFutureTimestamp(-10), + }) + ); + + // Act & Assess + await expect(persistenceLayer._putRecord(record)).rejects.toThrow( + 'Lock acquisition failed, raise to retry' + ); + }); + + it('throws error when item already exists and not expired', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: getFutureTimestamp(10), + }); + client.set.mockResolvedValue(null); + client.get.mockResolvedValueOnce( + JSON.stringify({ + status: IdempotencyRecordStatus.COMPLETED, + expiration: getFutureTimestamp(10), + }) + ); + + // Act & Assess + await expect(persistenceLayer._putRecord(record)).rejects.toThrow( + `Failed to put record for already existing idempotency key: ${dummyKey}` + ); + }); + + it('throws error when item is in progress', async () => { + // Prepare + const inProgressExpiryTimestamp = getFutureTimestampInMillis(10); + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.INPROGRESS, + expiryTimestamp: getFutureTimestamp(10), + }); + client.set.mockResolvedValue(null); + client.get.mockResolvedValueOnce( + JSON.stringify({ + status: IdempotencyRecordStatus.INPROGRESS, + in_progress_expiration: inProgressExpiryTimestamp, + }) + ); + + // Act & Assess + await expect(persistenceLayer._putRecord(record)).rejects.toThrow( + `Failed to put record for in-progress idempotency key: ${dummyKey}` + ); + }); + + it('throws error when trying to put a non-INPROGRESS record', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.COMPLETED, + expiryTimestamp: getFutureTimestamp(10), + }); + + // Act & Assess + await expect(persistenceLayer._putRecord(record)).rejects.toThrow( + 'Only INPROGRESS records can be inserted with _putRecord' + ); + }); + }); + + describe('Method: _deleteRecord', () => { + it('deletes a record from the cache', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.COMPLETED, + expiryTimestamp: getFutureTimestamp(15), + }); + + // Act + await persistenceLayer._deleteRecord(record); + + // Assess + expect(client.del).toHaveBeenCalledWith([dummyKey]); + }); + }); + + describe('Method: _getRecord', () => { + it('gets a record from the cache', async () => { + // Prepare + const status = IdempotencyRecordStatus.INPROGRESS; + const expiryTimestamp = getFutureTimestamp(15); + const inProgressExpiryTimestamp = getFutureTimestampInMillis(15); + client.get.mockResolvedValue( + JSON.stringify({ + status, + expiration: expiryTimestamp, + in_progress_expiration: inProgressExpiryTimestamp, + validation: 'someHash', + data: { some: 'data' }, + }) + ); + + // Act + const record = await persistenceLayer._getRecord(dummyKey); + + // Assess + expect(client.get).toHaveBeenCalledWith(dummyKey); + expect(record.getStatus()).toEqual(status); + expect(record.expiryTimestamp).toEqual(expiryTimestamp); + expect(record.inProgressExpiryTimestamp).toEqual( + inProgressExpiryTimestamp + ); + expect(record.payloadHash).toEqual('someHash'); + expect(record.getResponse()).toEqual({ some: 'data' }); + }); + + it('throws IdempotencyItemNotFoundError when record does not exist', async () => { + // Prepare + client.get.mockResolvedValue(null); + + // Act & Assess + await expect(persistenceLayer._getRecord(dummyKey)).rejects.toThrow( + IdempotencyItemNotFoundError + ); + }); + + it('throws IdempotencyPersistenceConsistencyError when record is invalid JSON', async () => { + // Prepare + client.get.mockResolvedValue('invalid-json'); + + // Act & Assess + await expect(persistenceLayer._getRecord(dummyKey)).rejects.toThrow( + IdempotencyPersistenceConsistencyError + ); + }); + }); + + describe('Method: _updateRecord', () => { + it('updates a record in the cache', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.COMPLETED, + expiryTimestamp: getFutureTimestamp(15), + }); + client.set.mockResolvedValue('OK'); + + // Act + await persistenceLayer._updateRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status: 'COMPLETED', + expiration: record.expiryTimestamp, + }), + expect.objectContaining({ EX: expect.any(Number) }) + ); + }); + + it('updates a record with null responseData', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.COMPLETED, + expiryTimestamp: getFutureTimestamp(15), + responseData: undefined, + }); + client.set.mockResolvedValue('OK'); + + // Act + await persistenceLayer._updateRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status: 'COMPLETED', + expiration: record.expiryTimestamp, + }), + expect.objectContaining({ EX: expect.any(Number) }) + ); + }); + + it('updates a record with valid responseData', async () => { + // Prepare + const record = new IdempotencyRecord({ + idempotencyKey: dummyKey, + status: IdempotencyRecordStatus.COMPLETED, + expiryTimestamp: getFutureTimestamp(15), + responseData: { key: 'value' }, + }); + client.set.mockResolvedValue('OK'); + + // Act + await persistenceLayer._updateRecord(record); + + // Assess + expect(client.set).toHaveBeenCalledWith( + dummyKey, + JSON.stringify({ + status: 'COMPLETED', + expiration: record.expiryTimestamp, + data: record.responseData, + }), + expect.objectContaining({ EX: expect.any(Number) }) + ); + }); + }); +}); diff --git a/packages/idempotency/typedoc.json b/packages/idempotency/typedoc.json index 938073260b..73da1534e3 100644 --- a/packages/idempotency/typedoc.json +++ b/packages/idempotency/typedoc.json @@ -5,7 +5,9 @@ "./src/types/index.ts", "./src/middleware/makeHandlerIdempotent.ts", "./src/persistence/index.ts", - "./src/persistence/DynamoDBPersistenceLayer.ts" + "./src/persistence/DynamoDBPersistenceLayer.ts", + "./src/persistence/CachePersistenceLayer.ts", + "./src/types/CachePersistence.ts", ], "readme": "README.md" }