From 2360f14c756dccba8aec97903c26268c56cafa55 Mon Sep 17 00:00:00 2001 From: Aakash Raghav Date: Sun, 29 May 2022 15:23:47 +0530 Subject: [PATCH] feat(sqs): Adds support for sqs message attributes --- README.md | 11 ++++++++++ src/sqs-consumer.ts | 2 +- src/sqs-producer.ts | 15 +++++++++---- tests/sns-sqs.spec.ts | 50 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 70 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b71922b..d2bf472 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,17 @@ await sqsProducer.sendJSON({ }); ``` +The sendJSON method of sqs producer also supports an optional sqs message options argument. + +```ts +SqsMessageOptions { + DelaySeconds?: number; + MessageDeduplicationId?: string; + MessageGroupId?: string; + MessageAttributes?: MessageBodyAttributeMap; +} +``` + ### SQS Consumer ```ts diff --git a/src/sqs-consumer.ts b/src/sqs-consumer.ts index d8b2445..fc6d377 100644 --- a/src/sqs-consumer.ts +++ b/src/sqs-consumer.ts @@ -128,7 +128,7 @@ export class SqsConsumer { QueueUrl: this.queueUrl, MaxNumberOfMessages: this.batchSize, WaitTimeSeconds: this.waitTimeSeconds, - MessageAttributeNames: [SQS_LARGE_PAYLOAD_SIZE_ATTRIBUTE], + MessageAttributeNames: ['All'], }); if (!this.started) return; await this.handleSqsResponse(response); diff --git a/src/sqs-producer.ts b/src/sqs-producer.ts index f1790db..b3ff013 100644 --- a/src/sqs-producer.ts +++ b/src/sqs-producer.ts @@ -1,5 +1,6 @@ import * as aws from 'aws-sdk'; import { PromiseResult } from 'aws-sdk/lib/request'; +import { MessageBodyAttributeMap } from 'aws-sdk/clients/sqs'; import { v4 as uuid } from 'uuid'; import { S3PayloadMeta } from './types'; import { @@ -31,6 +32,7 @@ export interface SqsMessageOptions { DelaySeconds?: number; MessageDeduplicationId?: string; MessageGroupId?: string; + MessageAttributes?: MessageBodyAttributeMap } export class SqsProducer { @@ -122,6 +124,7 @@ export class SqsProducer { DelaySeconds: options.DelaySeconds, MessageDeduplicationId: options.MessageDeduplicationId, MessageGroupId: options.MessageGroupId, + MessageAttributes: options.MessageAttributes || {} }) .promise(); @@ -138,9 +141,13 @@ export class SqsProducer { msgSize?: number, options: SqsMessageOptions = {} ): Promise> { - const messageAttributes = this.extendedLibraryCompatibility - ? createExtendedCompatibilityAttributeMap(msgSize) - : {}; + const messageAttributes = { + ...(options.MessageAttributes || {}), + ...(this.extendedLibraryCompatibility + ? createExtendedCompatibilityAttributeMap(msgSize) + : {} + ) + }; return await this.sqs .sendMessage({ QueueUrl: this.queueUrl, @@ -150,7 +157,7 @@ export class SqsProducer { DelaySeconds: options.DelaySeconds, MessageDeduplicationId: options.MessageDeduplicationId, MessageGroupId: options.MessageGroupId, - MessageAttributes: messageAttributes, + MessageAttributes: messageAttributes }) .promise(); } diff --git a/tests/sns-sqs.spec.ts b/tests/sns-sqs.spec.ts index 766cb6e..1cfe7fb 100644 --- a/tests/sns-sqs.spec.ts +++ b/tests/sns-sqs.spec.ts @@ -7,8 +7,10 @@ import { SnsProducerOptions, SnsProducer, SqsMessage, + SqsMessageOptions } from '../src'; +import { MessageAttributeMap } from 'aws-sdk/clients/sns'; import * as aws from 'aws-sdk'; import { v4 as uuid } from 'uuid'; import { S3PayloadMeta } from '../src/types'; @@ -89,12 +91,12 @@ const getSqsProducer = (options: Partial = {}) => { }); }; -async function sendMessage(msg: any, options?: Partial) { +async function sendMessage(msg: any, options?: Partial, sqsMessageOptions?:SqsMessageOptions) { const sqsProducer = getSqsProducer(options); - return await sqsProducer.sendJSON(msg); + return await sqsProducer.sendJSON(msg, sqsMessageOptions); } -async function sendS3Payload(s3PayloadMeta: S3PayloadMeta, options: Partial) { +async function sendS3Payload(s3PayloadMeta: S3PayloadMeta, options: Partial, sqsMessageOptions?:SqsMessageOptions) { const sqsProducer = getSqsProducer(options); return await sqsProducer.sendS3Payload(s3PayloadMeta); } @@ -237,6 +239,27 @@ describe('sns-sqs-big-payload', () => { }); }); + describe('sending messages with MessageAttributes', () => { + it('should send and receive the message', async () => { + const message = { it: 'works' }; + const sqsMessageOptions = { + MessageAttributes: { + testAttribute: { + DataType: 'String', + StringValue: 'test', + StringListValues:[], + BinaryListValues:[] + } + } + }; + + await sendMessage(message, {}, sqsMessageOptions); + const [receivedMessage] = await receiveMessages(1); + expect(receivedMessage.payload).toEqual(message); + expect(receivedMessage.message.MessageAttributes).toEqual(sqsMessageOptions.MessageAttributes); + }); + }); + describe('events', () => { function getEventHandlers() { const handlers = Object.keys(SqsConsumerEvents).reduce((acc, key) => { @@ -446,6 +469,27 @@ describe('sns-sqs-big-payload', () => { expect(receivedMessage.s3PayloadMeta.Key).toBeDefined(); }); + it('should send message though s3 with MessageAttributes', async () => { + const message = { it: 'works' }; + const sqsMessageOptions = { + MessageAttributes: { + testAttribute: { + DataType: 'String', + StringValue: 'test', + StringListValues:[], + BinaryListValues:[] + } + } + }; + + await sendMessage(message, { allPayloadThoughS3: true, s3Bucket: TEST_BUCKET_NAME }, sqsMessageOptions); + const [receivedMessage] = await receiveMessages(1, { getPayloadFromS3: true }); + expect(receivedMessage.payload).toEqual(message); + expect(receivedMessage.message.MessageAttributes).toEqual(sqsMessageOptions.MessageAttributes); + expect(receivedMessage.s3PayloadMeta.Bucket).toEqual(TEST_BUCKET_NAME); + expect(receivedMessage.s3PayloadMeta.Key).toBeDefined(); + }); + it('should send large message through s3', async () => { const message = 'x'.repeat(256 * 1024 + 1); await sendMessage(message, { largePayloadThoughS3: true, s3Bucket: TEST_BUCKET_NAME });