Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 52 additions & 25 deletions packages/queues/src/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ export type QueueErrorCode = "ERR_CONSUMER_ALREADY_SET";

export class QueueError extends MiniflareError<QueueErrorCode> {}

export const MAX_ATTEMPTS = 3;
const kShouldAttemptRetry = Symbol("kShouldAttemptRetry");
const kGetPendingRetry = Symbol("kGetPendingRetry");
const kPrepareForRetry = Symbol("kPrepareForRetry");
const kGetFailedAttempts = Symbol("kGetFailedAttempts");

export class Message<Body = unknown> implements MessageInterface<Body> {
readonly body: Body;
Expand Down Expand Up @@ -48,24 +49,17 @@ export class Message<Body = unknown> implements MessageInterface<Body> {
this.#pendingRetry = true;
}

[kShouldAttemptRetry](): boolean {
if (!this.#pendingRetry) {
return false;
}

[kPrepareForRetry]() {
this.#pendingRetry = false;
this.#failedAttempts++;
if (this.#failedAttempts >= MAX_ATTEMPTS) {
this.#log?.warn(
`Dropped message "${this.id}" after ${
this.#failedAttempts
} failed attempts!`
);
return false;
}
}

this.#log?.debug(`Retrying message "${this.id}"...`);
this.#pendingRetry = false;
return true;
[kGetPendingRetry](): boolean {
return this.#pendingRetry;
}

[kGetFailedAttempts](): number {
return this.#failedAttempts;
}
}

Expand Down Expand Up @@ -96,6 +90,7 @@ enum FlushType {
export const kSetFlushCallback = Symbol("kSetFlushCallback");

export class Queue<Body = unknown> implements QueueInterface<Body> {
readonly #broker: QueueBroker;
readonly #queueName: string;
readonly #log?: Log;

Expand All @@ -109,7 +104,8 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
// A callback to run after a flush() has been executed: useful for testing.
#flushCallback?: () => void;

constructor(queueName: string, log?: Log) {
constructor(broker: QueueBroker, queueName: string, log?: Log) {
this.#broker = broker;
this.#queueName = queueName;
this.#log = log;

Expand Down Expand Up @@ -202,6 +198,8 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
if (!this.#consumer) {
return;
}
const maxAttempts = this.#consumer.maxRetries + 1;
const deadLetterQueueName = this.#consumer.deadLetterQueue;

// Create a batch and execute the queue event handler
const batch = new MessageBatch<Body>(this.#queueName, [...this.#messages]);
Expand All @@ -216,13 +214,42 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
// Reset state and check for any messages to retry
this.#pendingFlush = FlushType.NONE;
this.#timeout = undefined;
const messagesToRetry = batch.messages.filter((msg) =>
msg[kShouldAttemptRetry]()
);
this.#messages.push(...messagesToRetry);
if (this.#messages.length > 0) {

const toRetry: Message<Body>[] = [];
const toDLQ: Message<Body>[] = [];
batch.messages.forEach((msg) => {
if (!msg[kGetPendingRetry]()) {
return;
}

msg[kPrepareForRetry]();
if (msg[kGetFailedAttempts]() < maxAttempts) {
this.#log?.debug(`Retrying message "${msg.id}"...`);
toRetry.push(msg);
} else if (deadLetterQueueName) {
this.#log?.warn(
`Moving message "${msg.id}" to dead letter queue "${deadLetterQueueName}"...`
);
toDLQ.push(msg);
} else {
this.#log?.warn(
`Dropped message "${msg.id}" after ${maxAttempts} failed attempts!`
);
}
});

if (toRetry.length) {
this.#messages.push(...toRetry);
this.#ensurePendingFlush();
}

if (deadLetterQueueName) {
const deadLetterQueue =
this.#broker.getOrCreateQueue(deadLetterQueueName);
toDLQ.forEach((msg) => {
deadLetterQueue.send(msg.body);
});
}
}

[kSetFlushCallback](callback: () => void) {
Expand All @@ -242,7 +269,7 @@ export class QueueBroker implements QueueBrokerInterface {
getOrCreateQueue(name: string): Queue {
let queue = this.#queues.get(name);
if (queue === undefined) {
this.#queues.set(name, (queue = new Queue(name, this.#log)));
this.#queues.set(name, (queue = new Queue(this, name, this.#log)));
}
return queue;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/queues/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {

export const DEFAULT_BATCH_SIZE = 5;
export const DEFAULT_WAIT_MS = 1000;
export const DEFAULT_RETRIES = 2;

export interface BindingOptions {
name: string;
Expand Down Expand Up @@ -106,6 +107,8 @@ export class QueuesPlugin
queueName: opts.queueName,
maxBatchSize: opts.maxBatchSize ?? DEFAULT_BATCH_SIZE,
maxWaitMs: opts.maxWaitMs ?? DEFAULT_WAIT_MS,
maxRetries: opts.maxRetries ?? DEFAULT_RETRIES,
deadLetterQueue: opts.deadLetterQueue,
dispatcher: this.ctx.queueEventDispatcher,
};

Expand Down
78 changes: 71 additions & 7 deletions packages/queues/test/broker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import {
MAX_ATTEMPTS,
QueueBroker,
kSetFlushCallback,
} from "@miniflare/queues";
import { QueueBroker, kSetFlushCallback } from "@miniflare/queues";
import {
Consumer,
LogLevel,
Expand All @@ -19,6 +15,7 @@ test("QueueBroker: flushes partial batches", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -109,6 +106,7 @@ test("QueueBroker: flushes full batches", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -193,6 +191,7 @@ test("QueueBroker: supports message retry()", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -241,6 +240,7 @@ test("QueueBroker: automatic retryAll() on consumer error", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 2,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand Down Expand Up @@ -299,6 +299,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 4,
dispatcher: async (_batch) => {},
};
q[kSetConsumer](sub);
Expand All @@ -312,7 +313,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
// Expect the queue to flush() the maximum number of times
q.send("message1");

for (let i = 0; i < MAX_ATTEMPTS; i++) {
for (let i = 0; i < 5; i++) {
const prom = new Promise<void>((resolve) => {
q[kSetFlushCallback](() => resolve());
});
Expand All @@ -323,7 +324,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
// Check last log message is warning that message dropped
t.deepEqual(log.logs[log.logs.length - 1], [
LogLevel.WARN,
'Dropped message "myQueue-0" after 3 failed attempts!',
'Dropped message "myQueue-0" after 5 failed attempts!',
]);

// To check that "message1" is dropped:
Expand All @@ -338,3 +339,66 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
});
await prom;
});

test("QueueBroker: dead letter queue support", async (t) => {
const log = new TestLog();
log.error = (message) =>
log.logWithLevel(LogLevel.ERROR, message?.stack ?? "");

const broker = new QueueBroker(log);

// Setup the original queue
const q = broker.getOrCreateQueue("myQueue");
const originalConsumer: Consumer = {
queueName: "myQueue",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 1,
deadLetterQueue: "myDLQ",
dispatcher: async (_batch) => {},
};
q[kSetConsumer](originalConsumer);

const dlq = broker.getOrCreateQueue("myDLQ");
const dlqConsumer: Consumer = {
queueName: "myDLQ",
maxBatchSize: 5,
maxWaitMs: 1,
maxRetries: 0,
dispatcher: async (_batch) => {},
};
dlq[kSetConsumer](dlqConsumer);

// Set up the consumer for the original queue
let originalInvocations = 0;
originalConsumer.dispatcher = async (batch: MessageBatch) => {
batch.messages[0].retry();
originalInvocations++;
};

// Set up the consumer for the dead letter queue
let dlqInvocations = 0;
dlqConsumer.dispatcher = async (_batch: MessageBatch) => {
dlqInvocations++;
};

const originalQProm = new Promise<void>((resolve) => {
q[kSetFlushCallback](() => resolve());
});
q.send("message1");
await originalQProm;

const dlqProm = new Promise<void>((resolve) => {
dlq[kSetFlushCallback](() => resolve());
});
await dlqProm;

t.deepEqual(originalInvocations, 2);
t.deepEqual(dlqInvocations, 1);

// Check last log message is warning that message dropped
t.deepEqual(log.logs[log.logs.length - 1], [
LogLevel.WARN,
'Moving message "myQueue-0" to dead letter queue "myDLQ"...',
]);
});
2 changes: 2 additions & 0 deletions packages/shared/src/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export interface Consumer {
queueName: string;
maxBatchSize: number;
maxWaitMs: number;
maxRetries: number;
deadLetterQueue?: string;
dispatcher: QueueEventDispatcher;
}

Expand Down