Skip to content

Commit 28ead06

Browse files
authored
fix: fix concurrent pipeline invocation results (#429)
Co-authored-by: yahiro <yahiro07@users.noreply.github.com>
1 parent 849c98e commit 28ead06

2 files changed

Lines changed: 60 additions & 29 deletions

File tree

connection.ts

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts";
2-
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
3-
import type { Command, Protocol } from "./protocol/shared/protocol.ts";
41
import type { Backoff } from "./backoff.ts";
52
import { exponentialBackoff } from "./backoff.ts";
63
import { ErrorReplyError, isRetriableError } from "./errors.ts";
@@ -10,6 +7,9 @@ import {
107
kUnstableReadReply,
118
kUnstableWriteCommand,
129
} from "./internal/symbols.ts";
10+
import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts";
11+
import type { Command, Protocol } from "./protocol/shared/protocol.ts";
12+
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
1313
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";
1414

1515
export interface SendCommandOptions {
@@ -73,11 +73,9 @@ export interface RedisConnectionOptions {
7373
export const kEmptyRedisArgs: Array<RedisValue> = [];
7474

7575
interface PendingCommand {
76-
name: string;
77-
args: RedisValue[];
76+
execute: () => Promise<RedisReply>;
7877
resolve: (reply: RedisReply) => void;
7978
reject: (error: unknown) => void;
80-
returnUint8Arrays?: boolean;
8179
}
8280

8381
export class RedisConnection implements Connection {
@@ -148,22 +146,29 @@ export class RedisConnection implements Connection {
148146
await this.sendCommand("SELECT", [db]);
149147
}
150148

149+
private enqueueCommand(
150+
command: PendingCommand,
151+
) {
152+
this.commandQueue.push(command);
153+
if (this.commandQueue.length === 1) {
154+
this.processCommandQueue();
155+
}
156+
}
157+
151158
sendCommand(
152159
command: string,
153160
args?: Array<RedisValue>,
154161
options?: SendCommandOptions,
155162
): Promise<RedisReply> {
156163
const { promise, resolve, reject } = Promise.withResolvers<RedisReply>();
157-
this.commandQueue.push({
158-
name: command,
159-
args: args ?? kEmptyRedisArgs,
160-
resolve,
161-
reject,
162-
returnUint8Arrays: options?.returnUint8Arrays,
163-
});
164-
if (this.commandQueue.length === 1) {
165-
this.processCommandQueue();
166-
}
164+
const execute = () =>
165+
this.#protocol.sendCommand(
166+
command,
167+
args ?? kEmptyRedisArgs,
168+
options?.returnUint8Arrays,
169+
);
170+
this.enqueueCommand({ execute, resolve, reject });
171+
167172
return promise;
168173
}
169174

@@ -172,7 +177,12 @@ export class RedisConnection implements Connection {
172177
}
173178

174179
[kUnstablePipeline](commands: Array<Command>) {
175-
return this.#protocol.pipeline(commands);
180+
const { promise, resolve, reject } = Promise.withResolvers<
181+
RedisReply[]
182+
>();
183+
const execute = () => this.#protocol.pipeline(commands);
184+
this.enqueueCommand({ execute, resolve, reject } as PendingCommand);
185+
return promise;
176186
}
177187

178188
[kUnstableWriteCommand](command: Command): Promise<void> {
@@ -256,11 +266,7 @@ export class RedisConnection implements Connection {
256266
if (!command) return;
257267

258268
try {
259-
const reply = await this.#protocol.sendCommand(
260-
command.name,
261-
command.args,
262-
command.returnUint8Arrays,
263-
);
269+
const reply = await command.execute();
264270
command.resolve(reply);
265271
} catch (error) {
266272
if (
@@ -275,13 +281,7 @@ export class RedisConnection implements Connection {
275281
this.close();
276282
try {
277283
await this.connect();
278-
279-
const reply = await this.#protocol.sendCommand(
280-
command.name,
281-
command.args,
282-
command.returnUint8Arrays,
283-
);
284-
284+
const reply = await command.execute();
285285
return command.resolve(reply);
286286
} catch { // TODO: use `AggregateError`?
287287
const backoff = this.backoff(i);

tests/commands/pipeline.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,37 @@ export function pipelineTests(
117117
}
118118
});
119119

120+
it("pipeline in concurrent, avoid redundant response mixup", async () => {
121+
{
122+
const opts = getOpts();
123+
const client = await connect(opts);
124+
125+
const randomValues = new Array(10)
126+
.fill(0)
127+
.map(() => new Array(10).fill(0).map(() => Math.random().toString()));
128+
129+
for (let i = 0; i < 10; i++) {
130+
const key = `list_${i}`;
131+
const values = randomValues[i];
132+
await client.del(key);
133+
await client.rpush(key, ...values);
134+
}
135+
136+
// deno-lint-ignore no-inner-declarations
137+
async function task() {
138+
const tx = client.pipeline();
139+
for (let i = 0; i < 10; i++) {
140+
tx.lrange(`list_${i}`, 0, -1);
141+
}
142+
return await tx.flush();
143+
}
144+
145+
const res = await Promise.all([task(), task()]);
146+
assertEquals(res, [randomValues, randomValues]);
147+
client.close();
148+
}
149+
});
150+
120151
it("error while pipeline", async () => {
121152
const opts = getOpts();
122153
const client = await connect(opts);

0 commit comments

Comments
 (0)