Skip to content

Commit 1e352bd

Browse files
authored
[CZID-8390] Add sqs step notifications (#116)
* add current sqs queue to sqs notification step * run black * terraform fmt * typing * change to using sns * add formatting * add options to turn off step notifications
1 parent 3cd78ea commit 1e352bd

File tree

14 files changed

+249
-69
lines changed

14 files changed

+249
-69
lines changed

Dockerfile

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ RUN apt-get -q update && apt-get -q install -y \
4646
# upgrade because of this issue https://github.com/chanzuckerberg/miniwdl/issues/607 in miniwdl
4747
RUN pip3 install importlib-metadata==4.13.0
4848
RUN pip3 install miniwdl==${MINIWDL_VERSION}
49+
RUN pip3 install urllib3==1.26.16
4950

5051
RUN curl -Ls https://github.com/chanzuckerberg/s3parcp/releases/download/v1.0.1/s3parcp_1.0.1_linux_amd64.tar.gz | tar -C /usr/bin -xz s3parcp
5152

@@ -62,6 +63,7 @@ ADD miniwdl-plugins miniwdl-plugins
6263
RUN pip install miniwdl-plugins/s3upload
6364
RUN pip install miniwdl-plugins/sfn_wdl
6465
RUN pip install miniwdl-plugins/s3parcp_download
66+
RUN pip install miniwdl-plugins/sns_notification
6567

6668
RUN cd /usr/bin; curl -O https://amazon-ecr-credential-helper-releases.s3.amazonaws.com/0.4.0/linux-amd64/docker-credential-ecr-login
6769
RUN chmod +x /usr/bin/docker-credential-ecr-login

main.tf

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ module "sfn" {
6464
stage_vcpu_defaults = var.stage_vcpu_defaults
6565
extra_env_vars = var.extra_env_vars
6666
sqs_queues = var.sqs_queues
67+
step_notifications = var.step_notifications
6768
call_cache = var.call_cache
6869
output_status_json_files = var.output_status_json_files
6970
tags = var.tags
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# sns_notifications
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python3
2+
from setuptools import setup
3+
from os import path
4+
5+
this_directory = path.abspath(path.dirname(__file__))
6+
with open(path.join(path.dirname(__file__), "README.md")) as f:
7+
long_description = f.read()
8+
9+
setup(
10+
name="sns_notification",
11+
version="0.0.1",
12+
description="miniwdl plugin for notification of task completion to Amazon SQS",
13+
url="https://github.com/chanzuckerberg/swipe",
14+
project_urls={},
15+
long_description=long_description,
16+
long_description_content_type="text/markdown",
17+
author="",
18+
py_modules=["sns_notification"],
19+
python_requires=">=3.6",
20+
setup_requires=["reentry"],
21+
install_requires=["boto3"],
22+
reentry_register=True,
23+
entry_points={
24+
"miniwdl.plugin.task": ["sns_notification_task = sns_notification:task"],
25+
"miniwdl.plugin.workflow": [
26+
"sns_notification_workflow = sns_notification:workflow"
27+
],
28+
},
29+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""
2+
Send SNS notifications after each miniwdl step
3+
"""
4+
5+
import os
6+
import json
7+
from typing import Dict
8+
from datetime import datetime
9+
from WDL import values_to_json
10+
from WDL._util import StructuredLogMessage as _
11+
12+
import boto3
13+
14+
sns_client = boto3.client("sns", endpoint_url=os.getenv("AWS_ENDPOINT_URL"))
15+
topic_arn = os.getenv('STEP_NOTIFICATION_TOPIC_ARN')
16+
17+
18+
def process_outputs(outputs: Dict):
19+
"""process outputs dict into string to be passed into SQS"""
20+
# only stringify for now
21+
return json.dumps(outputs)
22+
23+
24+
def send_message(attr, body):
25+
"""send message to SNS"""
26+
sns_resp = sns_client.publish(
27+
TopicArn=topic_arn,
28+
Message=body,
29+
MessageAttributes=attr,
30+
)
31+
return sns_resp
32+
33+
34+
def task(cfg, logger, run_id, run_dir, task, **recv):
35+
"""
36+
on completion of any task sends a message to sns with the output files
37+
"""
38+
log = logger.getChild("sns_step_notification")
39+
40+
# ignore inputs
41+
recv = yield recv
42+
# ignore command/runtime/container
43+
recv = yield recv
44+
45+
log.info(_("sending message to sns"))
46+
47+
if topic_arn:
48+
message_attributes = {
49+
"WorkflowName": {"DataType": "String", "StringValue": run_id[0]},
50+
"TaskName": {"DataType": "String", "StringValue": run_id[-1]},
51+
"ExecutionId": {
52+
"DataType": "String",
53+
"StringValue": "execution_id_to_be_passed_in",
54+
},
55+
}
56+
57+
outputs = process_outputs(values_to_json(recv["outputs"]))
58+
message_body = {
59+
"version": "0",
60+
"id": "0",
61+
"detail-type": "Step Functions Execution Step Notification",
62+
"source": "aws.batch",
63+
"account": "",
64+
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
65+
"resources": [],
66+
"detail": outputs,
67+
}
68+
send_message(message_attributes, json.dumps(message_body))
69+
70+
yield recv
71+
72+
73+
def workflow(cfg, logger, run_id, run_dir, workflow, **recv):
74+
log = logger.getChild("sns_step_notification")
75+
76+
# ignore inputs
77+
recv = yield recv
78+
79+
log.info(_("ignores workflow calls"))
80+
yield recv

terraform/modules/swipe-sfn-batch-job/main.tf

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ locals {
3434
"MINIWDL__DOWNLOAD_CACHE__DISABLE_PATTERNS" = "[\"s3://swipe-samples-*/*\"]",
3535
"DOWNLOAD_CACHE_MAX_GB" = "500",
3636
"WDL_PASSTHRU_ENVVARS" = join(" ", [for k, v in var.extra_env_vars : k]),
37+
"STEP_NOTIFICATION_TOPIC_ARN" = var.sfn_notification_topic_arn,
3738
"OUTPUT_STATUS_JSON_FILES" = tostring(var.output_status_json_files)
3839
})
3940
container_env_vars = { "environment" : [for k in sort(keys(local.batch_env_vars)) : { "name" : k, "value" : local.batch_env_vars[k] }] }

terraform/modules/swipe-sfn-batch-job/variables.tf

+6
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,9 @@ variable "docker_network" {
7070
type = string
7171
default = ""
7272
}
73+
74+
variable "sfn_notification_topic_arn" {
75+
description = "ARN of notification sns topic"
76+
type = string
77+
}
78+

terraform/modules/swipe-sfn/main.tf

+14-13
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,20 @@ resource "aws_iam_role_policy_attachment" "swipe_sfn_service" {
3232
}
3333

3434
module "batch_job" {
35-
source = "../swipe-sfn-batch-job"
36-
app_name = var.app_name
37-
batch_job_docker_image = var.batch_job_docker_image
38-
batch_job_timeout_seconds = var.batch_job_timeout_seconds
39-
miniwdl_dir = var.miniwdl_dir
40-
workspace_s3_prefixes = var.workspace_s3_prefixes
41-
wdl_workflow_s3_prefix = var.wdl_workflow_s3_prefix
42-
job_policy_arns = var.job_policy_arns
43-
extra_env_vars = var.extra_env_vars
44-
docker_network = var.docker_network
45-
call_cache = var.call_cache
46-
output_status_json_files = var.output_status_json_files
47-
tags = var.tags
35+
source = "../swipe-sfn-batch-job"
36+
app_name = var.app_name
37+
batch_job_docker_image = var.batch_job_docker_image
38+
batch_job_timeout_seconds = var.batch_job_timeout_seconds
39+
miniwdl_dir = var.miniwdl_dir
40+
workspace_s3_prefixes = var.workspace_s3_prefixes
41+
wdl_workflow_s3_prefix = var.wdl_workflow_s3_prefix
42+
job_policy_arns = var.job_policy_arns
43+
extra_env_vars = var.extra_env_vars
44+
docker_network = var.docker_network
45+
call_cache = var.call_cache
46+
output_status_json_files = var.output_status_json_files
47+
sfn_notification_topic_arn = length(var.sqs_queues) > 0 && var.step_notifications ? aws_sns_topic.sfn_notifications_topic[0].arn : ""
48+
tags = var.tags
4849
}
4950

5051
module "sfn_io_helper" {

terraform/modules/swipe-sfn/notifications.tf

+4-3
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ data "aws_iam_policy_document" "sfn_notifications_topic_policy_document" {
6464
resource "aws_sns_topic_subscription" "sfn_notifications_sqs_target" {
6565
for_each = var.sqs_queues
6666

67-
topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
68-
protocol = "sqs"
69-
endpoint = aws_sqs_queue.sfn_notifications_queue[each.key].arn
67+
topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
68+
protocol = "sqs"
69+
endpoint = aws_sqs_queue.sfn_notifications_queue[each.key].arn
70+
raw_message_delivery = true
7071
}
7172

7273
resource "aws_sqs_queue" "sfn_notifications_queue" {

terraform/modules/swipe-sfn/variables.tf

+7
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,10 @@ variable "metrics_schedule" {
116116
type = string
117117
default = "rate(1 minute)"
118118
}
119+
120+
variable "step_notifications" {
121+
description = "Boolean to determine whether or not to use send step notifications with SNS"
122+
type = bool
123+
default = false
124+
}
125+

test/terraform/moto/main.tf

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ module "swipetest" {
3232
"Two" : { "spot" : 12800, "on_demand" : 256000 },
3333
}
3434

35-
workspace_s3_prefixes = ["swipe-test"]
36-
35+
workspace_s3_prefixes = ["swipe-test"]
3736
output_status_json_files = true
37+
step_notifications = true
3838
}

0 commit comments

Comments
 (0)