-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.ts
More file actions
112 lines (99 loc) · 2.73 KB
/
executor.ts
File metadata and controls
112 lines (99 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import type { Connection } from "./connection.ts";
import { ConnectionClosedError, EOFError } from "./errors.ts";
import { sendCommand } from "./protocol/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/mod.ts";
export interface CommandExecutor {
readonly connection: Connection;
exec(
command: string,
...args: RedisValue[]
): Promise<RedisReply>;
/**
* Closes a redis connection.
*/
close(): void;
}
type QueuedCommand = {
command: string;
args: RedisValue[];
promise: Promise<RedisReply>;
resolve: (value: RedisReply) => void;
reject: (reason?: unknown) => void;
};
function isRetriableError(error: unknown, connection: Connection): boolean {
if (connection.isClosed) {
// Connection was explicitly closed, so don't retry.
return false;
}
return (
error instanceof Deno.errors.BadResource ||
error instanceof Deno.errors.BrokenPipe ||
error instanceof Deno.errors.ConnectionAborted ||
error instanceof Deno.errors.ConnectionRefused ||
error instanceof Deno.errors.ConnectionReset ||
error instanceof EOFError
);
}
export class MuxExecutor implements CommandExecutor {
private queue: QueuedCommand[] = [];
private isProcessing = false;
constructor(readonly connection: Connection) {}
exec(
command: string,
...args: RedisValue[]
): Promise<RedisReply> {
if (this.connection.isClosed) {
return Promise.reject(new ConnectionClosedError("Connection is closed"));
}
const { promise, resolve, reject } = Promise.withResolvers<RedisReply>();
this.queue.push({ command, args, promise, resolve, reject });
if (!this.isProcessing) {
this.dequeue();
}
return promise;
}
close(): void {
this.connection.close();
}
private async dequeue(): Promise<void> {
if (this.isProcessing) return;
const item = this.queue[0];
if (!item) {
this.isProcessing = false;
return;
}
this.isProcessing = true;
try {
const reply = await sendCommand(
this.connection.writer,
this.connection.reader,
item.command,
...item.args,
);
item.resolve(reply);
this.queue.shift(); // Command succeeded, remove from queue.
} catch (error) {
if (
this.connection.maxRetryCount > 0 &&
isRetriableError(error, this.connection)
) {
try {
await this.connection.reconnect();
// Don't shift the item from the queue, we will retry it.
} catch (reconnectError) {
item.reject(reconnectError);
this.queue.shift(); // Reconnect failed, reject and remove.
}
} else {
item.reject(error);
this.queue.shift(); // Non-retriable error, reject and remove.
}
} finally {
this.isProcessing = false;
// Process next item in queue, which might be the retried item or a new one.
if (this.queue.length > 0) {
this.dequeue();
}
}
}
}