forked from aws-samples/aws-lambda-redshift-event-driven-app
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLambdaSetupRedshiftObjects.py
More file actions
91 lines (83 loc) · 3.52 KB
/
LambdaSetupRedshiftObjects.py
File metadata and controls
91 lines (83 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import boto3
import cfnresponse
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def handler(event, context):
logger.info(json.dumps(event))
lambda_arn = event['ResourceProperties']['lambda_arn']
redshift_cluster_id = event['ResourceProperties']['redshift_cluster_id']
redshift_database = event['ResourceProperties']['redshift_database']
redshift_user = event['ResourceProperties']['redshift_user']
redshift_cluster_iam_role = event['ResourceProperties']['redshift_cluster_iam_role']
sns_topic_arn = event['ResourceProperties']['sns_topic_arn']
if event['RequestType'] == 'Delete':
sql_text = '''
DROP PROCEDURE execute_elt_process();
DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis;
DROP TABLE IF EXISTS nyc_yellow_taxi;
'''
response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn)
logger.info(response)
cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'})
else:
sql_text = '''
CREATE TABLE IF NOT EXISTS nyc_yellow_taxi
(pickup_date DATE
, pickup_datetime TIMESTAMP
, dropoff_datetime TIMESTAMP
, ratecode SMALLINT
, passenger_count SMALLINT
, trip_distance FLOAT4
, fare_amount FLOAT4
, total_amount FLOAT4
, payment_type SMALLINT
, vendorid VARCHAR(20))
SORTKEY(pickup_date);
DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis;
CREATE MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis
AS
SELECT
DATE_TRUNC('mon',pickup_date) pickup_month
, ROUND(AVG(trip_distance),2) avg_distance
, ROUND(AVG(fare_amount),2) avg_fare
, COUNT(1) total_trips
, SUM(trip_distance) total_distance_per_month
, SUM(fare_amount) total_fare_per_month
FROM nyc_yellow_taxi
GROUP BY 1;
CREATE OR REPLACE PROCEDURE execute_elt_process()
AS $$
BEGIN
truncate table nyc_yellow_taxi;
COPY nyc_yellow_taxi FROM 's3://event-driven-app-with-lambda-redshift/nyc_yellow_taxi_raw/'
IAM_ROLE '{}'
region 'us-west-2' delimiter '|';
REFRESH MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis;
END;
$$ LANGUAGE plpgsql;
'''
sql_text = sql_text.format(redshift_cluster_iam_role)
response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn)
logger.info(response)
cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create complete'})
def invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn):
client = boto3.client('lambda')
lambda_payload = {
"Input": {
"redshift_cluster_id": redshift_cluster_id,
"redshift_database": redshift_database,
"redshift_user": redshift_user,
"action": "execute_sql",
"sql_text": sql_text,
"sns_topic_arn": sns_topic_arn
}
}
response = client.invoke(
FunctionName=lambda_arn,
InvocationType='Event',
Payload=json.dumps(lambda_payload)
)
return response