Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 43 additions & 77 deletions packages/memcache-client/src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
CommandContext,
SingleServerEntry,
PackedData,
CompressorLibrary,
MultiServerManager,
} from "../types";
import { MemcacheConnection } from "./connection";
Expand All @@ -33,6 +32,10 @@ type StoreCommandOptions = CommonCommandOption & { ignoreNotStored?: boolean } &
compress?: boolean;
}>;

type RetrieveCommands = "get" | "gets" | "mg";
type StoreCommands = "set" | "add" | "replace" | "append" | "prepend" | "cas";
type Command = RetrieveCommands | StoreCommands;

// Exported for testing
export type CasCommandOptions = CommonCommandOption &
StoreCommandOptions &
Expand Down Expand Up @@ -175,7 +178,7 @@ export class MemcacheClient extends EventEmitter {
this.options = options;
this.socketID = 1;
this._packer = new ValuePacker(
options.compressor || (Zstd as CompressorLibrary),
options.compressor || Zstd,
options.assumeBuffer || false
);
this._logger = options.logger !== undefined ? options.logger : nullLogger;
Expand Down Expand Up @@ -215,15 +218,10 @@ export class MemcacheClient extends EventEmitter {
send<ValueType>(
data: StoreParams | SocketCallback,
key: string,
options?: CommonCommandOption | ErrorFirstCallback,
options?: CommonCommandOption,
callback?: ErrorFirstCallback
): Promise<ValueType> {
if (typeof options === "function") {
callback = options;
options = {};
} else if (options === undefined) {
options = {};
}
options = options || {};

return this._callbackSend(data, key, options, callback);
}
Expand Down Expand Up @@ -266,57 +264,56 @@ export class MemcacheClient extends EventEmitter {
set(
key: string,
value: StoreParams,
options?: StoreCommandOptions | OperationCallback<Error, string[]>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
options = options || {};
if ((options as StoreCommandOptions).ignoreNotStored === undefined) {
(options as StoreCommandOptions).ignoreNotStored = this.options.ignoreNotStored;
if (options.ignoreNotStored === undefined) {
options.ignoreNotStored = this.options.ignoreNotStored;
}
// it's tricky to threat optional object as callback
return this.store("set", key, value, options as StoreCommandOptions, callback);
return this.store("set", key, value, options, callback);
}

// "add" means "store this data, but only if the server *doesn't* already
// hold data for this key".
add(
key: string,
value: StoreParams,
options?: StoreCommandOptions | OperationCallback<Error, string[]>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
return this.store("add", key, value, options as StoreCommandOptions, callback);
return this.store("add", key, value, options, callback);
}

// "replace" means "store this data, but only if the server *does*
// already hold data for this key".
replace(
key: string,
value: StoreParams,
options?: StoreCommandOptions | OperationCallback<Error, string[]>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
return this.store("replace", key, value, options as StoreCommandOptions, callback);
return this.store("replace", key, value, options, callback);
}

// "append" means "add this data to an existing key after existing data".
append(
key: string,
value: StoreParams,
options?: StoreCommandOptions | OperationCallback<Error, string[]>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
return this.store("append", key, value, options as StoreCommandOptions, callback);
return this.store("append", key, value, options, callback);
}

// "prepend" means "add this data to an existing key before existing data".
prepend(
key: string,
value: StoreParams,
options?: StoreCommandOptions | OperationCallback<Error, string[]>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
return this.store("prepend", key, value, options as StoreCommandOptions, callback);
return this.store("prepend", key, value, options, callback);
}

// "cas" is a check and set operation which means "store this data but
Expand All @@ -337,60 +334,40 @@ export class MemcacheClient extends EventEmitter {
// delete key, fire & forget with options.noreply
delete(
key: string,
options?: CommonCommandOption | OperationCallback<Error, string[]>,
options?: CommonCommandOption,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
return this.cmd(
`delete ${key}`,
key,
options as CommonCommandOption,
callback
);
return this.cmd(`delete ${key}`, key, options, callback);
}

// incr key by value, fire & forget with options.noreply
incr(
key: string,
value: number,
options?: StoreCommandOptions | OperationCallback<Error, string>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string>
): Promise<string> {
return this.cmd(
`incr ${key} ${value}`,
key,
options as StoreCommandOptions,
callback
);
return this.cmd(`incr ${key} ${value}`, key, options, callback);
}

// decrease key by value, fire & forget with options.noreply
decr(
key: string,
value: number,
options?: StoreCommandOptions | OperationCallback<Error, string>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, string>
): Promise<string> {
return this.cmd(
`decr ${key} ${value}`,
key,
options as StoreCommandOptions,
callback
);
return this.cmd(`decr ${key} ${value}`, key, options, callback);
}

// touch key with exp time, fire & forget with options.noreply
touch(
key: string,
exptime: string | number,
options?: CommonCommandOption | OperationCallback<Error, string[]>,
options?: CommonCommandOption,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
return this.cmd(
`touch ${key} ${exptime}`,
key,
options as CommonCommandOption,
callback
);
return this.cmd(`touch ${key} ${exptime}`, key, options, callback);
}

// get version of server
Expand All @@ -401,11 +378,11 @@ export class MemcacheClient extends EventEmitter {
// flush all keys from the server, optionally after a delay in seconds
flush(
exptime?: number,
options?: CommonCommandOption | OperationCallback<Error, string[]>,
options?: CommonCommandOption,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
const cmd = exptime !== undefined ? `flush_all ${exptime}` : "flush_all";
return this.cmd(cmd, "", options as CommonCommandOption, callback) as unknown as Promise<string[]>;
return this.cmd(cmd, "", options, callback) as Promise<string[]>;
}

async versionAll(
Expand Down Expand Up @@ -439,18 +416,13 @@ export class MemcacheClient extends EventEmitter {

// a generic API for issuing one of the store commands
store(
cmd: string,
cmd: StoreCommands,
key: string,
value: StoreParams,
options?: Partial<CasCommandOptions> | OperationCallback<Error, string[]>,
options?: Partial<CasCommandOptions>,
callback?: OperationCallback<Error, string[]>
): Promise<string[]> {
if (typeof options === "function") {
callback = options;
options = {};
} else if (options === undefined) {
options = {};
}
options = options || {};

const lifetime =
options.lifetime !== undefined ? options.lifetime : this.options.lifetime || 60;
Expand All @@ -462,10 +434,7 @@ export class MemcacheClient extends EventEmitter {
// <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n
//
const _data: SocketCallback = (socket?: Socket) => {
const packed = this._packer.pack(
value,
(options as Partial<CasCommandOptions>)?.compress === true
);
const packed = this._packer.pack(value, options.compress === true);
const bytes = Buffer.byteLength(packed.data);
socket?.write(
Buffer.concat([
Expand All @@ -476,12 +445,12 @@ export class MemcacheClient extends EventEmitter {
);
};

return this._callbackSend(_data, key, options, callback) as unknown as Promise<string[]>;
return this._callbackSend(_data, key, options, callback) as Promise<string[]>;
}

get<ValueType>(
key: string | string[],
options?: StoreCommandOptions | OperationCallback<Error, MultiRetrieval<ValueType>>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, MultiRetrieval<ValueType>>
): Promise<MultiRetrieval<ValueType>> {
return this.retrieve("get", key, options, callback);
Expand Down Expand Up @@ -524,7 +493,7 @@ export class MemcacheClient extends EventEmitter {

gets<ValueType>(
key: string | string[],
options?: StoreCommandOptions | OperationCallback<Error, MultiCasRetrieval<ValueType>>,
options?: StoreCommandOptions,
callback?: OperationCallback<Error, MultiCasRetrieval<ValueType>>
): Promise<MultiCasRetrieval<ValueType>> {
return this.retrieve("gets", key, options, callback);
Expand Down Expand Up @@ -554,22 +523,19 @@ export class MemcacheClient extends EventEmitter {

// A generic API for issuing get or gets command
retrieve<T>(
cmd: string,
cmd: RetrieveCommands,
key: string[] | string,
options?: StoreCommandOptions | OperationCallback<Error, T>,
options?: StoreCommandOptions,
callback?: ErrorFirstCallback,
metaFlags?: string
): Promise<T> {
if (typeof options === "function") {
callback = options;
options = {};
}
return nodeify(this.xretrieve(cmd, key, options, metaFlags), callback) as unknown as Promise<T>;
options = options || {};
return nodeify(this.xretrieve(cmd, key, options, metaFlags), callback) as Promise<T>;
}

// the promise only version of retrieve
xretrieve(
cmd: string,
cmd: RetrieveCommands,
key: string | string[],
options?: StoreCommandOptions,
metaFlags?: string
Expand All @@ -596,7 +562,7 @@ export class MemcacheClient extends EventEmitter {

// retrieve one or more keys from a single server
_xretrieverByServer(
cmd: string,
cmd: RetrieveCommands,
key: string | string[],
options?: StoreCommandOptions,
metaFlags?: string
Expand Down Expand Up @@ -631,7 +597,7 @@ export class MemcacheClient extends EventEmitter {
// the promise only version of retrieve that catches errors per-server
// instead of failing fast, allowing partial results to be returned
async xretrieveWithErrors<Keys extends string>(
cmd: string,
cmd: RetrieveCommands,
keys: Keys[],
options?: StoreCommandOptions
): Promise<MultiCasRetrievalWithErrorsResponse<unknown, Keys>> {
Expand Down
72 changes: 34 additions & 38 deletions packages/memcache-client/src/lib/cmd-actions.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,50 @@
/* eslint-disable no-shadow */
enum ActionTypes {
ACTION_OK = "OK",
ACTION_ERROR = "ERROR",
ACTION_RESULT = "RESULT",
ACTION_SINGLE_RESULT = "SINGLE_RESULT",
ACTION_SELF = "SELF",
}
export const ActionTypes = ["OK", "ERROR", "RESULT", "SINGLE_RESULT", "SELF"] as const;

const CmdActions: Record<string, string> = {
OK: ActionTypes.ACTION_OK,
END: ActionTypes.ACTION_SELF,
DELETED: ActionTypes.ACTION_OK,
TOUCHED: ActionTypes.ACTION_OK,
STORED: ActionTypes.ACTION_OK,
export type ActionType = (typeof ActionTypes)[number];

const CmdActions = {
OK: "OK",
END: "SELF",
DELETED: "OK",
TOUCHED: "OK",
STORED: "OK",
//
VALUE: ActionTypes.ACTION_SELF,
STAT: ActionTypes.ACTION_RESULT,
VERSION: ActionTypes.ACTION_SINGLE_RESULT,
VALUE: "SELF",
STAT: "RESULT",
VERSION: "SINGLE_RESULT",
//
NOT_STORED: ActionTypes.ACTION_ERROR,
EXISTS: ActionTypes.ACTION_ERROR,
NOT_FOUND: ActionTypes.ACTION_ERROR,
NOT_STORED: "ERROR",
EXISTS: "ERROR",
NOT_FOUND: "ERROR",
//
ERROR: ActionTypes.ACTION_ERROR,
CLIENT_ERROR: ActionTypes.ACTION_ERROR,
SERVER_ERROR: ActionTypes.ACTION_ERROR,
ERROR: "ERROR",
CLIENT_ERROR: "ERROR",
SERVER_ERROR: "ERROR",
// Slabs Reassign error responses
// - "BUSY [message]" to indicate a page is already being processed, try again
// later.
// - "BUSY [message]" to indicate the crawler is already processing a request.
BUSY: ActionTypes.ACTION_ERROR,
BUSY: "ERROR",
// - "BADCLASS [message]" a bad class id was specified
BADCLASS: ActionTypes.ACTION_ERROR,
BADCLASS: "ERROR",
// - "NOSPARE [message]" source class has no spare pages
NOSPARE: ActionTypes.ACTION_ERROR,
NOSPARE: "ERROR",
// - "NOTFULL [message]" dest class must be full to move new pages to it
NOTFULL: ActionTypes.ACTION_ERROR,
NOTFULL: "ERROR",
// - "UNSAFE [message]" source class cannot move a page right now
UNSAFE: ActionTypes.ACTION_ERROR,
UNSAFE: "ERROR",
// - "SAME [message]" must specify different source/dest ids.
SAME: ActionTypes.ACTION_ERROR,
SAME: "ERROR",
// Meta Protocol commands uses two letter codes
HD: ActionTypes.ACTION_OK,
VA: ActionTypes.ACTION_SELF,
EN: ActionTypes.ACTION_SELF,
ME: ActionTypes.ACTION_SELF,
NS: ActionTypes.ACTION_ERROR,
EX: ActionTypes.ACTION_ERROR,
NF: ActionTypes.ACTION_ERROR,
MN: ActionTypes.ACTION_SELF,
};
HD: "OK",
VA: "SELF",
EN: "SELF",
ME: "SELF",
NS: "ERROR",
EX: "ERROR",
NF: "ERROR",
MN: "SELF",
} as const;

export type CommandAction = keyof typeof CmdActions;
export default CmdActions;
Loading