Skip to content

Commit 9bb2782

Browse files
committed
feat: receive message
1 parent 9b99788 commit 9bb2782

File tree

5 files changed

+215
-40
lines changed

5 files changed

+215
-40
lines changed

mod.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
export { SQSQueue } from "./src/queue.ts";
22
export type { SQSQueueConfig } from "./src/queue.ts";
3-
export type { SendMessageOptions } from "./src/types.ts";
3+
export type {
4+
SendMessageOptions,
5+
SendMessageResponse,
6+
ReceiveMessageOptions,
7+
ReceiveMessageResponse,
8+
Message,
9+
} from "./src/types.ts";
410
export { SQSError } from "./src/error.ts";

src/messages.ts

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { parseXML } from "../deps.ts";
2-
import type { SendMessageResponse } from "./types.ts";
2+
import type {
3+
SendMessageResponse,
4+
ReceiveMessageResponse,
5+
Message,
6+
} from "./types.ts";
37
import { SQSError } from "./error.ts";
48

59
interface Document {
@@ -17,40 +21,69 @@ interface Xml {
1721
}
1822

1923
export function parseSendMessageResponse(xml: string): SendMessageResponse {
20-
const data: Document = parseXML(xml);
21-
const { root } = data;
22-
if (!root || root.name !== "SendMessageResponse") {
23-
throw new SQSError(
24-
"Malformed sendMessage response. Missing SendMessageResponse field.",
25-
xml,
26-
);
27-
}
28-
const sendMessageResult = root.children.find((d) =>
29-
d.name === "SendMessageResult"
30-
);
31-
if (!sendMessageResult) {
24+
const doc: Document = parseXML(xml);
25+
const root = extractRoot(doc, "SendMessageResponse");
26+
const sendMessageResult = extractField(root, "SendMessageResult");
27+
28+
const messageID = extractContent(sendMessageResult, "MessageId");
29+
const md5OfBody = extractContent(sendMessageResult, "MD5OfMessageBody");
30+
31+
return { messageID, md5OfBody };
32+
}
33+
34+
export function parseReceiveMessageBody(xml: string): ReceiveMessageResponse {
35+
const doc: Document = parseXML(xml);
36+
const root = extractRoot(doc, "ReceiveMessageResponse");
37+
const receiveMessageResult = extractField(root, "ReceiveMessageResult");
38+
39+
const messages = receiveMessageResult.children.map<Message>((message) => {
40+
if (message.name !== "Message") {
41+
throw new SQSError(
42+
"Malformed field. Field type is not Message.",
43+
JSON.stringify(message, undefined, 2),
44+
);
45+
}
46+
47+
const messageID = extractContent(message, "MessageId");
48+
const receiptHandle = extractContent(message, "ReceiptHandle");
49+
const md5OfBody = extractContent(message, "MD5OfBody");
50+
const body = extractContent(message, "Body");
51+
52+
return { messageID, md5OfBody, receiptHandle, body };
53+
});
54+
55+
return { messages };
56+
}
57+
58+
function extractRoot(doc: Document, name: string): Xml {
59+
if (!doc.root || doc.root.name !== name) {
3260
throw new SQSError(
33-
"Malformed sendMessage response. Missing SendMessageResult field.",
34-
xml,
61+
`Malformed XML document. Missing ${name} field.`,
62+
JSON.stringify(doc, undefined, 2),
3563
);
3664
}
37-
const messageIDField = sendMessageResult.children.find((d) =>
38-
d.name === "MessageId"
39-
);
40-
if (!messageIDField) {
65+
return doc.root;
66+
}
67+
68+
function extractField(node: Xml, name: string): Xml {
69+
const bodyField = node.children.find((node) => node.name === name);
70+
if (!bodyField) {
4171
throw new SQSError(
42-
"Malformed sendMessage response. Missing MessageId field.",
43-
xml,
72+
`Missing ${name} field in ${node.name} node.`,
73+
JSON.stringify(node, undefined, 2),
4474
);
4575
}
46-
const messageID = messageIDField.content;
47-
if (!messageID) {
76+
return bodyField;
77+
}
78+
79+
function extractContent(node: Xml, name: string): string {
80+
const field = extractField(node, name);
81+
const content = field.content;
82+
if (!content) {
4883
throw new SQSError(
49-
"Malformed sendMessage response. Missing content in MessageId field.",
50-
xml,
84+
`Missing content in ${node.name} node.`,
85+
JSON.stringify(node, undefined, 2),
5186
);
5287
}
53-
return {
54-
messageID,
55-
};
88+
return content;
5689
}

src/queue.ts

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
import { AWSSignerV4, sha256, parseXML } from "../deps.ts";
2-
import { parseSendMessageResponse } from "./messages.ts";
2+
import {
3+
parseSendMessageResponse,
4+
parseReceiveMessageBody,
5+
} from "./messages.ts";
36
import { SQSError } from "./error.ts";
4-
import type { SendMessageOptions, SendMessageResponse } from "./types.ts";
7+
import type {
8+
SendMessageOptions,
9+
SendMessageResponse,
10+
ReceiveMessageOptions,
11+
ReceiveMessageResponse,
12+
} from "./types.ts";
513

614
export interface SQSQueueConfig {
715
queueURL: string;
@@ -11,27 +19,41 @@ export interface SQSQueueConfig {
1119
sessionToken?: string;
1220
}
1321

22+
interface Params {
23+
[key: string]: string;
24+
}
25+
1426
export class SQSQueue {
1527
#signer: AWSSignerV4;
16-
#host: string;
28+
#queueURL: string;
1729

1830
constructor(config: SQSQueueConfig) {
1931
this.#signer = new AWSSignerV4(config.region, {
2032
awsAccessKeyId: config.accessKeyID,
2133
awsSecretKey: config.secretKey,
2234
sessionToken: config.sessionToken,
2335
});
24-
this.#host = config.queueURL;
36+
this.#queueURL = config.queueURL;
2537
}
2638

2739
private _doRequest(
2840
path: string,
41+
params: Params,
2942
method: string,
3043
headers: { [key: string]: string },
3144
body?: Uint8Array | undefined,
3245
): Promise<Response> {
33-
const url = `${this.#host}${path}`;
34-
const signedHeaders = this.#signer.sign("sqs", url, method, headers, body);
46+
const url = new URL(this.#queueURL + path);
47+
for (const key in params) {
48+
url.searchParams.set(key, params[key]);
49+
}
50+
const signedHeaders = this.#signer.sign(
51+
"sqs",
52+
url.toString(),
53+
method,
54+
headers,
55+
body,
56+
);
3557
signedHeaders["x-amz-content-sha256"] = sha256(
3658
body ?? "",
3759
"utf8",
@@ -47,10 +69,12 @@ export class SQSQueue {
4769
async sendMessage(
4870
options: SendMessageOptions,
4971
): Promise<SendMessageResponse> {
50-
const url = `/?Action=SendMessage&MessageBody=${
51-
encodeURIComponent(options.body)
52-
}`;
53-
const res = await this._doRequest(url, "GET", {});
72+
const res = await this._doRequest(
73+
"/",
74+
{ Action: "SendMessage", MessageBody: encodeURIComponent(options.body) },
75+
"GET",
76+
{},
77+
);
5478
if (!res.ok) {
5579
throw new SQSError(
5680
`Failed to send message: ${res.status} ${res.statusText}`,
@@ -60,4 +84,35 @@ export class SQSQueue {
6084
const xml = await res.text();
6185
return parseSendMessageResponse(xml);
6286
}
87+
88+
async receiveMessage(
89+
options?: ReceiveMessageOptions,
90+
): Promise<ReceiveMessageResponse> {
91+
const params: Params = { Action: "ReceiveMessage" };
92+
if (options) {
93+
if (options.maxNumberOfMessages) {
94+
params["MaxNumberOfMessages"] = options.maxNumberOfMessages.toString();
95+
}
96+
if (options.visibilityTimeout) {
97+
params["VisibilityTimeout"] = options.visibilityTimeout.toString();
98+
}
99+
if (options.waitTimeSeconds) {
100+
params["WaitTimeSeconds"] = options.waitTimeSeconds.toString();
101+
}
102+
}
103+
const res = await this._doRequest(
104+
"/",
105+
params,
106+
"GET",
107+
{},
108+
);
109+
if (!res.ok) {
110+
throw new SQSError(
111+
`Failed to send message: ${res.status} ${res.statusText}`,
112+
await res.text(),
113+
);
114+
}
115+
const xml = await res.text();
116+
return parseReceiveMessageBody(xml);
117+
}
63118
}

src/queue_test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,17 @@ Deno.test({
1818
assertEquals(typeof res.messageID, "string");
1919
},
2020
});
21+
22+
Deno.test({
23+
name: "receive message",
24+
async fn() {
25+
const res = await queue.receiveMessage();
26+
assert(res);
27+
assertEquals(res.messages.length, 1);
28+
const message = res.messages[0];
29+
assertEquals(typeof message.messageID, "string");
30+
assertEquals(typeof message.receiptHandle, "string");
31+
assertEquals(typeof message.md5OfBody, "string");
32+
assertEquals(message.body, "test");
33+
},
34+
});

src/types.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,75 @@
11
export interface SendMessageOptions {
2+
/**
3+
* The message to send. The minimum size is one character. The maximum size
4+
* is 256 KB.
5+
*/
26
body: string;
37
}
48

59
export interface SendMessageResponse {
6-
/** An attribute containing the MessageId of the message sent to the queue. */
10+
/**
11+
* An attribute containing the MessageId of the message sent to the queue.
12+
*/
713
messageID: string;
14+
15+
/**
16+
* An MD5 digest of the non-URL-encoded message body string.
17+
*/
18+
md5OfBody: string;
19+
}
20+
21+
export interface ReceiveMessageOptions {
22+
/**
23+
* The maximum number of messages to return. Amazon SQS never returns more
24+
* messages than this value (however, fewer messages might be returned).
25+
* Valid values: 1 to 10. Default: 1.
26+
*/
27+
maxNumberOfMessages?: number;
28+
29+
/**
30+
* The duration (in seconds) that the received messages are hidden from
31+
* subsequent retrieve requests after being retrieved by a ReceiveMessage
32+
* request.
33+
*/
34+
visibilityTimeout?: number;
35+
36+
/**
37+
* The duration (in seconds) for which the call waits for a message to arrive
38+
* in the queue before returning. If a message is available, the call returns
39+
* sooner than WaitTimeSeconds. If no messages are available and the wait time
40+
* expires, the call returns successfully with an empty list of messages.
41+
*/
42+
waitTimeSeconds?: number;
43+
}
44+
45+
export interface ReceiveMessageResponse {
46+
/**
47+
* An attribute containing the MessageId of the message sent to the queue.
48+
*/
49+
messages: Message[];
50+
}
51+
52+
export interface Message {
53+
/**
54+
* A unique identifier for the message. A MessageIdis considered unique across
55+
* all AWS accounts for an extended period of time.
56+
*/
57+
messageID: string;
58+
59+
/**
60+
* An MD5 digest of the non-URL-encoded message body string.
61+
*/
62+
md5OfBody: string;
63+
64+
/**
65+
* An identifier associated with the act of receiving the message. A new receipt
66+
* handle is returned every time you receive a message. When deleting a message,
67+
* you provide the last received receipt handle to delete the message.
68+
*/
69+
receiptHandle: string;
70+
71+
/**
72+
* The message's contents (not URL-encoded).
73+
*/
74+
body: string;
875
}

0 commit comments

Comments
 (0)