forked from aws-samples/aws-cdk-pipelines-datalake-etl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlambda_handler.py
148 lines (131 loc) · 5.22 KB
/
lambda_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import boto3
import botocore
import os
import logging
import os.path
from datetime import datetime
import dateutil.tz
import uuid
def start_etl_job_run(execution_id, p_stp_fn_time, sfn_arn, sfn_name, table_name, sfn_input):
"""
Function to insert entry in dynamodb table for audit trail
@param execution_id:
@param p_stp_fn_time:
@param sfn_arn:
@param sfn_name:
@param table_name:
@param sfn_input:
"""
try:
print('start_etl_job_run')
logger.info('[INFO] start_etl_job_run() called')
item = {}
item['execution_id'] = execution_id
item['sfn_execution_name'] = sfn_name
item['sfn_arn'] = sfn_arn
item['sfn_input'] = sfn_input
item['job_latest_status'] = 'STARTED'
item['job_start_date'] = p_stp_fn_time
item['joblast_updated_timestamp'] = p_stp_fn_time
dynamo_client = boto3.resource('dynamodb')
table = dynamo_client.Table(table_name)
table.put_item(Item=item)
except botocore.exceptions.ClientError as error:
logger.info('[ERROR] Dynamodb process failed:{}'.format(error))
raise error
except Exception as e:
logger.info('[ERROR] Dynamodb process failed:{}'.format(e))
raise e
logger.info('[INFO] start_etl_job_run() execution completed')
print('insert table completed')
# Function for logger
def load_log_config():
# Basic config. Replace with your own logging config if required
root = logging.getLogger()
root.setLevel(logging.INFO)
return root
# Logger initiation
logger = load_log_config()
def lambda_handler(event, context):
print(event)
lambda_message = event['Records'][0]
source_bucket_name = lambda_message['s3']['bucket']['name']
key = lambda_message['s3']['object']['key']
p_full_path = key
# first object/directory name after buckname will be used as source system name example:
# s3://<buckename>/<source_system_name>/<table_name>
p_source_system_name = key.split('/')[0]
# second object/directory name after buckname will be used as tablename name example:
# s3://<buckename>/<source_system_name>/<table_name>
p_table_name = key.split('/')[1]
p_file_dir = os.path.dirname(p_full_path)
p_file_dir_upd = p_file_dir.replace('%3D', '=')
p_base_file_name = os.path.basename(p_full_path)
sfn_arn = os.environ['SFN_STATE_MACHINE_ARN']
target_bucket_name = os.environ['target_bucket_name']
raw_to_conformed_etl_job_name = 'raw_to_conformed_etl_job'
logger.info('bucket: ' + source_bucket_name)
logger.info('key: ' + key)
logger.info('source system name: ' + p_source_system_name)
logger.info('table name: ' + p_table_name)
logger.info('File Path: ' + p_file_dir)
logger.info('p_file_dir_upd: ' + p_file_dir_upd)
logger.info('file base name: ' + p_base_file_name)
logger.info('state machine arn: ' + sfn_arn)
logger.info('target bucket name: ' + target_bucket_name)
if p_base_file_name != '':
# Capturing the current time in CST
central = dateutil.tz.gettz('US/Central')
now = datetime.now(tz=central)
p_ingest_time = now.strftime('%m/%d/%Y %H:%M:%S')
logger.info(p_ingest_time)
# Time stamp for the stepfunction name
p_stp_fn_time = now.strftime('%Y%m%d%H%M%S%f')
p_year = now.strftime('%Y')
p_month = now.strftime('%m')
p_day = now.strftime('%d')
logger.info('year: ' + p_year)
logger.info('p_month: ' + p_month)
logger.info('p_day: ' + p_day)
logger.info('sfn name: ' + p_base_file_name + '-' + p_stp_fn_time)
sfn_name = p_base_file_name + '-' + p_stp_fn_time
print('before step function')
execution_id = str(uuid.uuid4())
sfn_input = json.dumps(
{
'JOB_NAME': raw_to_conformed_etl_job_name,
'target_databasename': p_source_system_name,
'target_bucketname': target_bucket_name,
'source_bucketname': source_bucket_name,
'source_key': p_file_dir_upd,
'base_file_name': p_base_file_name,
'p_year': p_year,
'p_month': p_month,
'p_day': p_day,
'table_name': p_table_name,
'execution_id': execution_id,
}
)
logger.info(sfn_input)
try:
sfn_client = boto3.client('stepfunctions')
sfn_response = sfn_client.start_execution(
stateMachineArn=sfn_arn,
name=sfn_name,
input=sfn_input
)
print(sfn_response)
except botocore.exceptions.ClientError as error:
logger.info('[ERROR] Step function client process failed:{}'.format(error))
raise error
except Exception as e:
logger.info('[ERROR] Step function call failed:{}'.format(e))
raise e
start_etl_job_run(execution_id, p_stp_fn_time, sfn_arn, sfn_name, os.environ['DYNAMODB_TABLE_NAME'], sfn_input)
return {
'statusCode': 200,
'body': json.dumps('Step function triggered successfully!')
}