88import json
99import logging
1010import boto3
11+ from urllib import parse
1112
1213logger = logging .getLogger ()
1314logger .setLevel (logging .INFO )
@@ -91,8 +92,25 @@ def is_s3_test_event(record : dict) -> bool:
9192 else :
9293 return False
9394
95+ def get_object_info (s3_event ) -> dict :
96+ """
97+ Derive object info formatted for submission to Glue from an S3 event.
98+
99+ Args:
100+ s3_event (dict): An S3 event
94101
95- def lambda_handler (event , context ) -> None :
102+ Returns:
103+ object_info (dict) The S3 object info
104+ """
105+ bucket_name = s3_event ["s3" ]["bucket" ]["name" ]
106+ object_key = parse .unquote (s3_event ["s3" ]["object" ]["key" ])
107+ object_info = {
108+ "source_bucket" : bucket_name ,
109+ "source_key" : object_key ,
110+ }
111+ return object_info
112+
113+ def lambda_handler (event , context ) -> dict :
96114 """
97115 This main lambda function will be triggered by a SQS event and will
98116 poll the SQS queue for all available S3 event messages. If the
@@ -113,27 +131,21 @@ def lambda_handler(event, context) -> None:
113131 logger .info (f"Found AWS default s3:TestEvent. Skipping." )
114132 else :
115133 for s3_event in s3_event_records ["Records" ]:
116- bucket_name = s3_event ["s3" ]["bucket" ]["name" ]
117- object_key = s3_event ["s3" ]["object" ]["key" ]
118- object_info = {
119- "source_bucket" : bucket_name ,
120- "source_key" : object_key ,
121- }
134+ object_info = get_object_info (s3_event )
122135 if filter_object_info (object_info ) is not None :
123136 s3_objects_info .append (object_info )
124137 else :
125138 logger .info (
126139 f"Object doesn't meet the S3 event rules to be processed. Skipping."
127140 )
128-
129141 if len (s3_objects_info ) > 0 :
130142 logger .info (
131143 "Submitting the following files to "
132144 f"{ os .environ ['PRIMARY_WORKFLOW_NAME' ]} : { json .dumps (s3_objects_info )} "
133145 )
134146 submit_s3_to_json_workflow (
135- objects_info = s3_objects_info ,
136- workflow_name = os .environ ["PRIMARY_WORKFLOW_NAME" ],
147+ objects_info = s3_objects_info ,
148+ workflow_name = os .environ ["PRIMARY_WORKFLOW_NAME" ]
137149 )
138150 else :
139151 logger .info (
0 commit comments