Skip to content

Commit 6e1674a

Browse files
authored
refactor: move command queue from MuxExecutor to RedisConnection (#383)
1 parent b5dee5e commit 6e1674a

3 files changed

Lines changed: 72 additions & 70 deletions

File tree

connection.ts

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import { exponentialBackoff } from "./backoff.ts";
55
import { ErrorReplyError, isRetriableError } from "./errors.ts";
66
import { BufReader } from "./vendor/https/deno.land/std/io/buf_reader.ts";
77
import { BufWriter } from "./vendor/https/deno.land/std/io/buf_writer.ts";
8+
import {
9+
Deferred,
10+
deferred,
11+
} from "./vendor/https/deno.land/std/async/deferred.ts";
812
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";
913
type Closer = Deno.Closer;
1014

@@ -50,6 +54,12 @@ export class RedisConnection implements Connection {
5054
private _isConnected = false;
5155
private backoff: Backoff;
5256

57+
private commandQueue: {
58+
name: string;
59+
args: RedisValue[];
60+
promise: Deferred<RedisReply>;
61+
}[] = [];
62+
5363
get isClosed(): boolean {
5464
return this._isClosed;
5565
}
@@ -104,48 +114,20 @@ export class RedisConnection implements Connection {
104114
await this.sendCommand("SELECT", [db]);
105115
}
106116

107-
async sendCommand(
117+
sendCommand(
108118
command: string,
109119
args?: Array<RedisValue>,
110120
): Promise<RedisReply> {
111-
try {
112-
const reply = await sendCommand(
113-
this.writer,
114-
this.reader,
115-
command,
116-
args ?? kEmptyRedisArgs,
117-
);
118-
return reply;
119-
} catch (error) {
120-
if (
121-
!isRetriableError(error) ||
122-
this.isManuallyClosedByUser()
123-
) {
124-
throw error;
125-
}
126-
127-
for (let i = 0; i < this.maxRetryCount; i++) {
128-
// Try to reconnect to the server and retry the command
129-
this.close();
130-
try {
131-
await this.connect();
132-
133-
const reply = await sendCommand(
134-
this.writer,
135-
this.reader,
136-
command,
137-
args ?? kEmptyRedisArgs,
138-
);
139-
140-
return reply;
141-
} catch { // TODO: use `AggregateError`?
142-
const backoff = this.backoff(i);
143-
await delay(backoff);
144-
}
145-
}
146-
147-
throw error;
121+
const promise = deferred<RedisReply>();
122+
this.commandQueue.push({
123+
name: command,
124+
args: args ?? kEmptyRedisArgs,
125+
promise,
126+
});
127+
if (this.commandQueue.length === 1) {
128+
this.processCommandQueue();
148129
}
130+
return promise;
149131
}
150132

151133
/**
@@ -221,6 +203,53 @@ export class RedisConnection implements Connection {
221203
}
222204
}
223205

206+
private async processCommandQueue() {
207+
const [command] = this.commandQueue;
208+
if (!command) return;
209+
210+
try {
211+
const reply = await sendCommand(
212+
this.writer,
213+
this.reader,
214+
command.name,
215+
command.args,
216+
);
217+
command.promise.resolve(reply);
218+
} catch (error) {
219+
if (
220+
!isRetriableError(error) ||
221+
this.isManuallyClosedByUser()
222+
) {
223+
return command.promise.reject(error);
224+
}
225+
226+
for (let i = 0; i < this.maxRetryCount; i++) {
227+
// Try to reconnect to the server and retry the command
228+
this.close();
229+
try {
230+
await this.connect();
231+
232+
const reply = await sendCommand(
233+
this.writer,
234+
this.reader,
235+
command.name,
236+
command.args,
237+
);
238+
239+
return command.promise.resolve(reply);
240+
} catch { // TODO: use `AggregateError`?
241+
const backoff = this.backoff(i);
242+
await delay(backoff);
243+
}
244+
}
245+
246+
command.promise.reject(error);
247+
} finally {
248+
this.commandQueue.shift();
249+
this.processCommandQueue();
250+
}
251+
}
252+
224253
private isManuallyClosedByUser(): boolean {
225254
return this._isClosed && !this._isConnected;
226255
}

executor.ts

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
import type { Connection } from "./connection.ts";
2-
import {
3-
Deferred,
4-
deferred,
5-
} from "./vendor/https/deno.land/std/async/deferred.ts";
62
import type { RedisReply, RedisValue } from "./protocol/mod.ts";
73

84
export interface CommandExecutor {
@@ -18,40 +14,17 @@ export interface CommandExecutor {
1814
close(): void;
1915
}
2016

21-
export class MuxExecutor implements CommandExecutor {
17+
export class DefaultExecutor implements CommandExecutor {
2218
constructor(readonly connection: Connection) {}
2319

24-
private queue: {
25-
command: string;
26-
args: RedisValue[];
27-
d: Deferred<RedisReply>;
28-
}[] = [];
29-
3020
exec(
3121
command: string,
3222
...args: RedisValue[]
3323
): Promise<RedisReply> {
34-
const d = deferred<RedisReply>();
35-
this.queue.push({ command, args, d });
36-
if (this.queue.length === 1) {
37-
this.dequeue();
38-
}
39-
return d;
24+
return this.connection.sendCommand(command, args);
4025
}
4126

4227
close(): void {
4328
this.connection.close();
4429
}
45-
46-
private dequeue(): void {
47-
const [e] = this.queue;
48-
if (!e) return;
49-
this.connection.sendCommand(e.command, e.args)
50-
.then(e.d.resolve)
51-
.catch(e.d.reject)
52-
.finally(() => {
53-
this.queue.shift();
54-
this.dequeue();
55-
});
56-
}
5730
}

redis.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import type {
4444
import { RedisConnection } from "./connection.ts";
4545
import type { Connection } from "./connection.ts";
4646
import type { RedisConnectionOptions } from "./connection.ts";
47-
import { CommandExecutor, MuxExecutor } from "./executor.ts";
47+
import { CommandExecutor, DefaultExecutor } from "./executor.ts";
4848
import type {
4949
Binary,
5050
Bulk,
@@ -2361,7 +2361,7 @@ export interface RedisConnectOptions extends RedisConnectionOptions {
23612361
export async function connect(options: RedisConnectOptions): Promise<Redis> {
23622362
const connection = createRedisConnection(options);
23632363
await connection.connect();
2364-
const executor = new MuxExecutor(connection);
2364+
const executor = new DefaultExecutor(connection);
23652365
return create(executor);
23662366
}
23672367

@@ -2439,7 +2439,7 @@ function createLazyExecutor(connection: Connection): CommandExecutor {
24392439
},
24402440
async exec(command, ...args) {
24412441
if (!executor) {
2442-
executor = new MuxExecutor(connection);
2442+
executor = new DefaultExecutor(connection);
24432443
if (!connection.isConnected) {
24442444
await connection.connect();
24452445
}

0 commit comments

Comments
 (0)