Skip to content

Commit adf3a9f

Browse files
Add support to SQS
1 parent 7c8cdf4 commit adf3a9f

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

src/idempotency-sqs-wrapper.ts

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import SQS from "aws-sdk/clients/sqs";
2+
import { Providers } from "./providers";
3+
import { DynamoDB } from "./providers/dynamoDB";
4+
5+
const DEFAULT_TTL = 5;
6+
7+
interface IdempotencySQSOptionsProviderDynamoDB {
8+
name: Providers.DynamoDB;
9+
endpoint?: string;
10+
region: string;
11+
tableName: string;
12+
}
13+
14+
interface IdempotencySQSOptionsQueue {
15+
region: string;
16+
url: string;
17+
}
18+
19+
export interface IdempotencySQSOptions {
20+
handler: CallableFunction;
21+
provider: IdempotencySQSOptionsProviderDynamoDB;
22+
queue: IdempotencySQSOptionsQueue;
23+
ttl?: number;
24+
}
25+
26+
export const idempotencySQSWrapper = (
27+
options: IdempotencySQSOptions
28+
): CallableFunction => {
29+
return async (event: any, context: any): Promise<any> => {
30+
const provider = new DynamoDB({
31+
tableName: options.provider.tableName,
32+
region: options.provider.region,
33+
endpoint: options.provider.endpoint,
34+
ttl: options.ttl ?? DEFAULT_TTL,
35+
});
36+
37+
const newRecords = [];
38+
39+
for (let record of event.Records) {
40+
const messageId = record.messageId;
41+
42+
const hasProcessed = await provider.isProcessing(messageId);
43+
44+
if (!hasProcessed) {
45+
newRecords.push(record);
46+
continue;
47+
}
48+
49+
const sqs = new SQS({
50+
region: options.queue.region,
51+
});
52+
await sqs
53+
.deleteMessage({
54+
QueueUrl: options.queue.url,
55+
ReceiptHandle: record.receiptHandle,
56+
})
57+
.promise();
58+
}
59+
60+
event.records = newRecords;
61+
62+
return options.handler(event, context);
63+
};
64+
};

src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export { idempotencyHttpWrapper } from "./idempotency-http-wrapper";
2+
export { idempotencySQSWrapper } from "./idempotency-sqs-wrapper";
23
export { Providers } from "./providers";

0 commit comments

Comments
 (0)