Skip to content

Commit 662b078

Browse files
authored
Merge pull request #417 from jiw-mh/feat/worker-ref
Feature: Option worker definition optionally as string reference.
2 parents bd148e8 + 706d2ab commit 662b078

File tree

6 files changed

+135
-15
lines changed

6 files changed

+135
-15
lines changed

docs/queue.md

+20-3
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,26 @@ constructs:
214214
handler: src/worker.handler
215215
```
216216

217-
_Note: the Lambda "worker" function is configured in the `queue` construct, instead of being defined in the `functions` section._
217+
or
218218

219-
The only required setting is the `handler`: this should point to the code that handles SQS messages. The handler [should be written to handle SQS events](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html), for example in JavaScript:
219+
```yaml
220+
function:
221+
my-worker:
222+
handler: src/worker.handler
223+
224+
constructs:
225+
my-queue:
226+
type: queue
227+
# References the my-worker function defined in the function section
228+
worker: my-worker
229+
```
230+
231+
It is both possible to reference an existing function as `worker` or specify the entire worker as an object.
232+
233+
By defining the Lambda "worker" as an object, a new function is configured in the `queue` construct.
234+
For the function object, the only required setting is the `handler`: this should point to the code that handles SQS messages.
235+
236+
Whether the handler is referenced, or created, it [should be written to handle SQS events](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html), for example in JavaScript:
220237

221238
```js
222239
exports.handler = async function (event, context) {
@@ -226,7 +243,7 @@ exports.handler = async function (event, context) {
226243
}
227244
```
228245

229-
[All settings allowed for functions](https://www.serverless.com/framework/docs/providers/aws/guide/functions/) can be used under the `worker` key. For example:
246+
When defined as an object, [all settings allowed for functions](https://www.serverless.com/framework/docs/providers/aws/guide/functions/) can be used under the `worker` key. For example:
230247

231248
```yaml
232249
constructs:

src/constructs/aws/Queue.ts

+38-12
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,16 @@ const QUEUE_DEFINITION = {
3232
properties: {
3333
type: { const: "queue" },
3434
worker: {
35-
type: "object",
36-
properties: {
37-
timeout: { type: "number" },
38-
},
39-
additionalProperties: true,
35+
oneOf: [
36+
{ type: "string" },
37+
{
38+
type: "object",
39+
properties: {
40+
timeout: { type: "number" },
41+
},
42+
additionalProperties: true,
43+
},
44+
] as const,
4045
},
4146
maxRetries: { type: "number" },
4247
alarm: { type: "string" },
@@ -127,6 +132,7 @@ export class Queue extends AwsConstruct {
127132
private readonly queueArnOutput: CfnOutput;
128133
private readonly queueUrlOutput: CfnOutput;
129134
private readonly dlqUrlOutput: CfnOutput;
135+
private readonly workerName: string;
130136

131137
constructor(
132138
scope: CdkConstruct,
@@ -146,8 +152,24 @@ export class Queue extends AwsConstruct {
146152
);
147153
}
148154

155+
let functionConfig: number | undefined;
156+
if (typeof configuration.worker === "string") {
157+
this.workerName = configuration.worker;
158+
const slsFunction = provider.getFunction(this.workerName);
159+
if (!slsFunction) {
160+
throw new ServerlessError(
161+
`Invalid configuration in 'constructs.${this.id}': 'workerRef' needs to point to an existing function.`,
162+
"LIFT_INVALID_CONSTRUCT_CONFIGURATION"
163+
);
164+
}
165+
functionConfig = slsFunction.timeout;
166+
} else {
167+
this.workerName = `${this.id}Worker`;
168+
functionConfig = configuration.worker.timeout;
169+
}
170+
149171
// The default function timeout is 6 seconds in the Serverless Framework
150-
const functionTimeout = configuration.worker.timeout ?? 6;
172+
const functionTimeout = functionConfig ?? 6;
151173

152174
// This should be 6 times the lambda function's timeout + MaximumBatchingWindowInSeconds
153175
// See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
@@ -326,10 +348,15 @@ export class Queue extends AwsConstruct {
326348
const maximumBatchingWindow = this.getMaximumBatchingWindow();
327349
const maximumConcurrency = this.configuration.maxConcurrency;
328350

329-
// Override events for the worker
330-
this.configuration.worker.events = [
331-
// Subscribe the worker to the SQS queue
332-
{
351+
if (typeof this.configuration.worker !== "string") {
352+
// Add the worker, if it is not a reference.
353+
this.provider.addFunction(this.workerName, this.configuration.worker);
354+
}
355+
356+
// Subscribe the worker to the SQS queue
357+
this.provider.addFunctionEvent({
358+
functionName: this.workerName,
359+
event: {
333360
sqs: {
334361
arn: this.queue.queueArn,
335362
batchSize: batchSize,
@@ -338,8 +365,7 @@ export class Queue extends AwsConstruct {
338365
functionResponseType: "ReportBatchItemFailures",
339366
},
340367
},
341-
];
342-
this.provider.addFunction(`${this.id}Worker`, this.configuration.worker);
368+
});
343369
}
344370

345371
private async getQueueUrl(): Promise<string | undefined> {

src/providers/AwsProvider.ts

+33
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ const AWS_DEFINITION = {
2525
additionalProperties: false,
2626
} as const;
2727

28+
type ValueType<T extends { [key: string | number | symbol]: unknown }> = T extends {
29+
[key: string | number | symbol]: infer V;
30+
}
31+
? V
32+
: never;
33+
34+
type ArrayType<T extends Array<unknown>> = T extends Array<infer V> ? V : never;
35+
2836
export class AwsProvider implements ProviderInterface {
2937
public static type = "aws";
3038
public static schema = AWS_DEFINITION;
@@ -112,6 +120,31 @@ export class AwsProvider implements ProviderInterface {
112120
this.serverless.service.setFunctionNames(this.serverless.processedInput.options);
113121
}
114122

123+
getFunction(functionName: string) {
124+
if (!this.serverless.service.functions) {
125+
return null;
126+
}
127+
128+
return this.serverless.service.functions[functionName];
129+
}
130+
131+
addFunctionEvent({
132+
functionName,
133+
event,
134+
}: {
135+
functionName: string;
136+
event: ArrayType<Required<ValueType<Required<Serverless["service"]>["functions"]>>["events"]>;
137+
}) {
138+
const slsFunction = this.getFunction(functionName);
139+
if (!slsFunction) {
140+
throw new Error(`Serverless function ${functionName} doesn't exit, can not add an event.`);
141+
}
142+
if (!slsFunction.events) {
143+
slsFunction.events = [];
144+
}
145+
slsFunction.events.push(event);
146+
}
147+
115148
/**
116149
* @internal
117150
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
service: app
2+
configValidationMode: error
3+
4+
provider:
5+
name: aws
6+
# To avoid versions with random names (easier diffs)
7+
versionFunctions: false
8+
9+
functions:
10+
foo:
11+
handler: worker.handler
12+
13+
constructs:
14+
emails:
15+
type: queue
16+
worker: foo

test/fixtures/queuesWorkerRef/worker.js

Whitespace-only changes.

test/unit/queues.test.ts

+28
Original file line numberDiff line numberDiff line change
@@ -750,4 +750,32 @@ describe("queues", () => {
750750
);
751751
}
752752
});
753+
it("should use a function if the function is defined", async () => {
754+
const awsMock = mockAws();
755+
sinon.stub(CloudFormationHelpers, "getStackOutput").resolves("queue-url");
756+
const sendSpy = awsMock.mockService("SQS", "sendMessage").resolves();
757+
758+
await runServerless({
759+
fixture: "queuesWorkerRef",
760+
configExt: merge({}, pluginConfigExt, {
761+
constructs: {
762+
emails: {
763+
fifo: true,
764+
},
765+
},
766+
}),
767+
command: "emails:send",
768+
options: {
769+
body: "Message body",
770+
"group-id": "123",
771+
},
772+
});
773+
774+
expect(sendSpy.callCount).toBe(1);
775+
expect(sendSpy.firstCall.firstArg).toStrictEqual({
776+
QueueUrl: "queue-url",
777+
MessageGroupId: "123",
778+
MessageBody: "Message body",
779+
});
780+
});
753781
});

0 commit comments

Comments
 (0)