Skip to content

Commit 70875df

Browse files
authored
Merge pull request #83 from Sage-Bionetworks/etl-568
[ETL-558] Decouple S3 to JSON workflow from JSON to Parquet workflow
2 parents 1c7196b + c4080e1 commit 70875df

File tree

8 files changed

+81
-46
lines changed

8 files changed

+81
-46
lines changed

.github/workflows/upload-and-deploy.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ jobs:
287287
- name: Invoke Lambda
288288
run: |
289289
cd src/lambda_function/s3_to_glue/
290-
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=$NAMESPACE-PrimaryWorkflow"
290+
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=$NAMESPACE-S3ToJsonWorkflow"
291291
292292
293293
sceptre-deploy-staging:
@@ -358,4 +358,4 @@ jobs:
358358
- name: Invoke Lambda
359359
run: |
360360
cd src/lambda_function/s3_to_glue/
361-
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=staging-PrimaryWorkflow"
361+
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=staging-S3ToJsonWorkflow"

config/develop/namespaced/s3-to-glue-lambda.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
1111
parameters:
1212
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
1313
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
14-
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
14+
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
1515
LambdaBatchSize: '10'
1616
LambdaMaximumBatchingWindowInSeconds: '300'

config/prod/namespaced/s3-to-glue-lambda.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
1111
parameters:
1212
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
1313
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
14-
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
14+
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
1515
LambdaBatchSize: '10'
1616
LambdaMaximumBatchingWindowInSeconds: '300'

src/lambda_function/s3_to_glue/app.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""
22
This Lambda app responds to an SQS event notification and starts a Glue workflow.
3-
The Glue workflow name is set by the environment variable `PRIMARY_WORKFLOW_NAME`.
3+
The Glue workflow name is set by the environment variable `S3_TO_JSON_WORKFLOW_NAME`.
44
Subsequently, the S3 objects which were contained in the SQS event are written as a
55
JSON string to the `messages` workflow run property.
66
"""
@@ -141,14 +141,14 @@ def lambda_handler(event, context) -> dict:
141141
if len(s3_objects_info) > 0:
142142
logger.info(
143143
"Submitting the following files to "
144-
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
144+
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
145145
)
146146
submit_s3_to_json_workflow(
147147
objects_info=s3_objects_info,
148-
workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"]
148+
workflow_name=os.environ["S3_TO_JSON_WORKFLOW_NAME"]
149149
)
150150
else:
151151
logger.info(
152152
"NO files were submitted to "
153-
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
153+
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
154154
)

src/lambda_function/s3_to_glue/template.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Parameters:
1515
Type: String
1616
Description: Arn for the S3 to Glue Lambda Role
1717

18-
PrimaryWorkflowName:
18+
S3ToJsonWorkflowName:
1919
Type: String
2020
Description: >
2121
Name of the main glue workflow that runs glue jobs from S3 to JSON and JSON to Parquet
@@ -50,7 +50,7 @@ Resources:
5050
Timeout: 30
5151
Environment:
5252
Variables:
53-
PRIMARY_WORKFLOW_NAME: !Ref PrimaryWorkflowName
53+
S3_TO_JSON_WORKFLOW_NAME: !Ref S3ToJsonWorkflowName
5454
Events:
5555
SQSEvent:
5656
Type: SQS
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"S3ToGlueFunction": {
3-
"PRIMARY_WORKFLOW_NAME": "main-PrimaryWorkflow"
3+
"PRIMARY_WORKFLOW_NAME": "main-S3ToJsonWorkflow"
44
}
55
}

templates/glue-workflow.j2

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
AWSTemplateFormatVersion: '2010-09-09'
22

33
Description: >-
4-
The primary workflow for processing RECOVER data. An outline of the workflow is below:
4+
The two workflows for processing RECOVER data. An outline of each workflow is below:
55

6-
S3 to JSON ->
6+
(S3 to JSON) On-demand trigger (triggered by Lambda) ->
7+
(S3 to JSON) S3 to JSON
8+
9+
and
10+
11+
(JSON to Parquet) Scheduled trigger ->
12+
(JSON to Parquet) Crawler ->
713
(JSON to Parquet) EnrolledParticipants and SymptomLog ->
814
(JSON to Parquet) HealthKit ->
915
(JSON to Parquet) Fitbit ->
1016
(JSON to Parquet) Google ->
1117
(JSON to Parquet) Garmin ->
12-
CompareParquetJob
18+
(JSON to Parquet) CompareParquetJob (if Namespace != "main")
1319

1420
Parameters:
1521

@@ -50,6 +56,15 @@ Parameters:
5056
Type: String
5157
Description: The name of the S3 To JSON Job
5258

59+
JsontoParquetTriggerSchedule:
60+
Type: String
61+
Description: >-
62+
The cron schedule on which the JSON to Parquet workflow is triggered.
63+
When `IsMainNamespace`, the respective trigger is active from the moment
64+
of deployment. Otherwise, the trigger is disabled so that we don't waste
65+
resources running our development pipelines every day.
66+
Default: cron(0 2 * * ? *)
67+
5368
CompareParquetStagingNamespace:
5469
Type: String
5570
Description: the name of the "staging" namespace
@@ -59,7 +74,8 @@ Parameters:
5974
Description: The name of the "main" namespace
6075

6176
Conditions:
62-
IsStagingNamespace: !Not [!Equals [!Ref Namespace, "main"]]
77+
IsMainNamespace: !Equals [!Ref Namespace, "main"]
78+
IsDevelopmentNamespace: !Not [!Equals [!Ref Namespace, "main"]]
6379

6480
Resources:
6581

@@ -72,46 +88,65 @@ Resources:
7288
{% do datasets.append(dataset) %}
7389
{% endfor %}
7490

75-
PrimaryWorkflow:
91+
S3ToJsonWorkflow:
7692
Type: AWS::Glue::Workflow
7793
Properties:
7894
DefaultRunProperties:
7995
namespace: !Ref Namespace
8096
json_bucket: !Ref JsonBucketName
8197
json_prefix: !Ref JsonKeyPrefix
82-
parquet_bucket: !Ref ParquetBucketName
83-
parquet_prefix: !Ref ParquetKeyPrefix
84-
glue_database: !Ref GlueDatabase
8598
Description: >-
86-
Glue workflow for exporting RECOVER data to Parquet datasets
87-
MaxConcurrentRuns: 1
88-
Name: !Sub ${Namespace}-PrimaryWorkflow
99+
Glue workflow for exporting raw data to their JSON datasets
100+
Name: !Sub ${Namespace}-S3ToJsonWorkflow
89101

90-
InitialTrigger:
102+
S3ToJsonTrigger:
91103
Type: AWS::Glue::Trigger
92104
Properties:
93-
Name: !Sub "${Namespace}-InitialTrigger"
105+
Name: !Sub "${Namespace}-S3ToJsonTrigger"
94106
Actions:
95107
- JobName: !Ref S3ToJsonJobName
96108
Description: This is the first trigger in the primary workflow.
97109
Type: ON_DEMAND
98-
WorkflowName: !Ref PrimaryWorkflow
110+
WorkflowName: !Ref S3ToJsonWorkflow
99111

100-
S3ToJsonCompleteTrigger:
112+
JsonToParquetWorkflow:
113+
Type: AWS::Glue::Workflow
114+
Properties:
115+
DefaultRunProperties:
116+
namespace: !Ref Namespace
117+
parquet_bucket: !Ref ParquetBucketName
118+
parquet_prefix: !Ref ParquetKeyPrefix
119+
glue_database: !Ref GlueDatabase
120+
Description: >-
121+
Glue workflow which loads the JSON datasets and writes to them to Parquet datasets
122+
MaxConcurrentRuns: 1
123+
Name: !Sub ${Namespace}-JsonToParquetWorkflow
124+
125+
JsontoParquetTrigger:
126+
Condition: IsMainNamespace
101127
Type: AWS::Glue::Trigger
102128
Properties:
103-
Name: !Sub "${Namespace}-S3ToJsonCompleteTrigger"
129+
Name: !Sub "${Namespace}-JsontoParquetTrigger"
104130
Actions:
105131
- CrawlerName: !Ref StandardCrawler
106-
Description: This trigger starts the crawler.
107-
Type: CONDITIONAL
108-
Predicate:
109-
Conditions:
110-
- JobName: !Ref S3ToJsonJobName
111-
State: SUCCEEDED
112-
LogicalOperator: EQUALS
132+
Description: This trigger starts the JSON to Parquet workflow.
133+
Type: SCHEDULED
134+
Schedule: !Ref JsontoParquetTriggerSchedule
113135
StartOnCreation: true
114-
WorkflowName: !Ref PrimaryWorkflow
136+
WorkflowName: !Ref JsonToParquetWorkflow
137+
138+
JsontoParquetTrigger:
139+
Condition: IsDevelopmentNamespace
140+
Type: AWS::Glue::Trigger
141+
Properties:
142+
Name: !Sub "${Namespace}-JsontoParquetTrigger"
143+
Actions:
144+
- CrawlerName: !Ref StandardCrawler
145+
Description: This trigger starts the JSON to Parquet workflow.
146+
Type: SCHEDULED
147+
Schedule: !Ref JsontoParquetTriggerSchedule
148+
StartOnCreation: false
149+
WorkflowName: !Ref JsonToParquetWorkflow
115150

116151
StandardCrawler:
117152
Type: AWS::Glue::Crawler
@@ -150,7 +185,7 @@ Resources:
150185
LogicalOperator: EQUALS
151186
CrawlState: SUCCEEDED
152187
StartOnCreation: true
153-
WorkflowName: !Ref PrimaryWorkflow
188+
WorkflowName: !Ref JsonToParquetWorkflow
154189

155190
HealthKitTrigger:
156191
Type: AWS::Glue::Trigger
@@ -172,7 +207,7 @@ Resources:
172207
{% endfor %}
173208
Logical: AND
174209
StartOnCreation: true
175-
WorkflowName: !Ref PrimaryWorkflow
210+
WorkflowName: !Ref JsonToParquetWorkflow
176211

177212
FitbitTrigger:
178213
Type: AWS::Glue::Trigger
@@ -194,7 +229,7 @@ Resources:
194229
{% endfor %}
195230
Logical: AND
196231
StartOnCreation: true
197-
WorkflowName: !Ref PrimaryWorkflow
232+
WorkflowName: !Ref JsonToParquetWorkflow
198233

199234
GoogleTrigger:
200235
Type: AWS::Glue::Trigger
@@ -216,7 +251,7 @@ Resources:
216251
{% endfor %}
217252
Logical: AND
218253
StartOnCreation: true
219-
WorkflowName: !Ref PrimaryWorkflow
254+
WorkflowName: !Ref JsonToParquetWorkflow
220255

221256
GarminTrigger:
222257
Type: AWS::Glue::Trigger
@@ -238,11 +273,11 @@ Resources:
238273
{% endfor %}
239274
Logical: AND
240275
StartOnCreation: true
241-
WorkflowName: !Ref PrimaryWorkflow
276+
WorkflowName: !Ref JsonToParquetWorkflow
242277

243278
JsontoParquetCompleteTrigger:
244279
Type: AWS::Glue::Trigger
245-
Condition: IsStagingNamespace
280+
Condition: IsDevelopmentNamespace
246281
Properties:
247282
Name: !Sub "${Namespace}-JsontoParquetCompleteTrigger"
248283
Actions:
@@ -266,11 +301,11 @@ Resources:
266301
{% endfor %}
267302
Logical: AND
268303
StartOnCreation: true
269-
WorkflowName: !Ref PrimaryWorkflow
304+
WorkflowName: !Ref JsonToParquetWorkflow
270305

271306
Outputs:
272307

273-
WorkflowName:
274-
Value: !Ref PrimaryWorkflow
308+
S3ToJsonWorkflowName:
309+
Value: !Ref S3ToJsonWorkflow
275310
Export:
276-
Name: !Sub '${AWS::Region}-${AWS::StackName}-WorkflowName'
311+
Name: !Sub '${AWS::Region}-${AWS::StackName}-S3ToJsonWorkflowName'

tests/test_s3_to_glue_lambda.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def object_info(self):
156156

157157
@pytest.fixture
158158
def set_env_var(self, monkeypatch, sqs_queue):
159-
monkeypatch.setenv("PRIMARY_WORKFLOW_NAME", "test_workflow")
159+
monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow")
160160

161161
def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
162162
monkeypatch.setattr("boto3.client", lambda x: MockGlueClient())

0 commit comments

Comments
 (0)