-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy paths3-cloudfront.config.yaml
More file actions
202 lines (200 loc) · 9.55 KB
/
Copy paths3-cloudfront.config.yaml
File metadata and controls
202 lines (200 loc) · 9.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
enable_s3_logging: false
cloudfront_component_name: cloudfront
# bucket_policy:
# OpsAccount:
# principal:
# AWS: arn:aws:iam::123456789:root
# components:
# cloudfront: # This must match 'cloudfront_component_name'
# config:
# comment: Frontend in S3
# Totally based on https://github.com/theonestack/hl-component-s3/tree/master/lambdas
code: |
import logging
from urllib.request import urlopen, Request, HTTPError, URLError
import json
import sys
import os
import boto3
sys.path.append(f"{os.environ['LAMBDA_TASK_ROOT']}/lib")
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3r = boto3.resource('s3')
class CustomResourceResponse:
def __init__(self, request_payload):
self.payload = request_payload
self.response = {
"StackId": request_payload["StackId"],
"RequestId": request_payload["RequestId"],
"LogicalResourceId": request_payload["LogicalResourceId"],
"NoEcho": False,
"Status": 'SUCCESS',
}
def respond_error(self, message):
self.response['Status'] = 'FAILED'
self.response['Reason'] = message
self.respond()
def respond(self, data=None, NoEcho=False):
event = self.payload
response = self.response
if event.get("PhysicalResourceId", False):
response["PhysicalResourceId"] = event["PhysicalResourceId"]
if data is not None:
response['Data'] = data
logger.debug("Received %s request with event: %s" % (event['RequestType'], json.dumps(event)))
if NoEcho:
response['NoEcho'] = NoEcho
serialized = json.dumps(response)
logger.info(f"Responding to {event['RequestType']} request with: {serialized}")
req_data = serialized.encode('utf-8')
req = Request(
event['ResponseURL'],
data=req_data,
headers={'Content-Length': len(req_data),'Content-Type': ''}
)
req.get_method = lambda: 'PUT'
try:
urlopen(req)
logger.debug("Request to CFN API succeeded, nothing to do here")
except HTTPError as e:
logger.error("Callback to CFN API failed with status %d" % e.code)
logger.error("Response: %s" % e.reason)
except URLError as e:
logger.error("Failed to reach the server - %s" % e.reason)
def handler(event, context):
print(f"Received event:{json.dumps(event)}")
lambda_response = CustomResourceResponse(event)
params = event['ResourceProperties']
print(f"Resource Properties {params}")
bucket = params['BucketName']
region = params['Region']
data = {
'Arn': f'arn:aws:s3:::{bucket}',
'DomainName': f'{bucket}.s3.amazonaws.com',
'DualStackDomainName': f'{bucket}.s3.dualstack.{region}.amazonaws.com',
'RegionalDomainName': f'{bucket}.s3.{region}.amazonaws.com',
'WebsiteURL': f'http://{bucket}.s3-website.{region}.amazonaws.com'}
try:
if event['RequestType'] == 'Create':
event['PhysicalResourceId'] = bucket
create_bucket(params, event, context)
lambda_response.respond(data)
elif event['RequestType'] == 'Update':
event['PhysicalResourceId'] = params['BucketName']
update_bucket(params, event, context)
lambda_response.respond(data)
elif event['RequestType'] == 'Delete':
print(f"ignoring deletion of bucket {params['BucketName']}")
lambda_response.respond()
except Exception as e:
message = str(e)
lambda_response.respond_error(message)
return 'OK'
def create_bucket(params, event, context):
if 'BucketName' not in params:
raise Exception('BucketName parameter is required')
notifications = params['Notifications'] if 'Notifications' in params else None
bucket_name = params['BucketName']
cors_configuration = params['CorsConfiguration'] if 'CorsConfiguration' in params else None
bucket_already_exists = True
try:
s3 = boto3.client('s3')
response = s3.head_bucket(Bucket=bucket_name)
print(f"bucket {bucket_name} does already existing so we need don't need to create it")
except Exception as e:
if "404" in str(e):
bucket_already_exists = False
print(f"bucket {bucket_name} does not already existing so we need to create it")
else:
print(f"error:{e}")
raise e
options = {'Bucket' : bucket_name}
if params['Region'] != 'us-east-1':
options = dict({'CreateBucketConfiguration' : {'LocationConstraint': params['Region']}}, **options)
if bucket_already_exists:
print(f"bucket {bucket_name} exists")
else:
bucket = s3.create_bucket(**options)
print(f"created bucket {bucket_name} in {bucket['Location']}")
if notifications:
add_notification(notifications, bucket_name)
if cors_configuration:
add_cors(cors_configuration, bucket_name)
def update_bucket(params, event, context):
if 'BucketName' not in params:
raise Exception('BucketName parameter is required')
notifications = params['Notifications'] if 'Notifications' in params else None
bucket_name = params['BucketName']
cors_configuration = params['CorsConfiguration'] if 'CorsConfiguration' in params else None
if notifications:
add_notification(notifications, bucket_name)
else:
delete_notification(bucket_name)
print(f"Put notification deletion request completed... :)")
if cors_configuration:
print(f"cors: {cors_configuration}")
add_cors(cors_configuration, bucket_name)
else:
delete_cors(bucket_name)
print(f"Cors configuration deletion request completed... :)")
def add_notification(Notifications, Bucket):
bucket_notification = s3r.BucketNotification(Bucket)
if "LambdaConfigurations" in Notifications:
sw=Notifications['LambdaConfigurations'][0]
sw['Events'] = sw.pop('Event')
sw['LambdaFunctionArn'] = sw.pop('Function')
if "Filter" in Notifications['QueueConfigurations'][0]:
sw['Filter']['Key'] = sw['Filter'].pop('S3Key')
sw['Filter']['Key']['FilterRules'] = sw['Filter']['Key'].pop('Rules')
for i in range((len(sw['Filter']['Key']['FilterRules']))):
sw['Filter']['Key']['FilterRules'][i]['Name'] = sw['Filter']['Key']['FilterRules'][i].pop('name')
sw['Filter']['Key']['FilterRules'][i]['Value'] = sw['Filter']['Key']['FilterRules'][i].pop('value')
if "QueueConfigurations" in Notifications:
sw=Notifications['QueueConfigurations'][0]
sw['Events'] = sw.pop('Event')
sw['QueueArn'] = sw.pop('Queue')
if "Filter" in sw:
sw['Filter']['Key'] = sw['Filter'].pop('S3Key')
sw['Filter']['Key']['FilterRules'] = sw['Filter']['Key'].pop('Rules')
for i in range((len(sw['Filter']['Key']['FilterRules']))):
sw['Filter']['Key']['FilterRules'][i]['Name'] = sw['Filter']['Key']['FilterRules'][i].pop('name')
sw['Filter']['Key']['FilterRules'][i]['Value'] = sw['Filter']['Key']['FilterRules'][i].pop('value')
if "TopicConfigurations" in Notifications:
sw=Notifications['TopicConfigurations'][0]
sw['Events'] = sw.pop('Event')
sw['QueueArn'] = sw.pop('Queue')
if "Filter" in sw:
sw['Filter']['Key'] = sw['Filter'].pop('S3Key')
sw['Filter']['Key']['FilterRules'] = sw['Filter']['Key'].pop('Rules')
for i in range((len(sw['Filter']['Key']['FilterRules']))):
sw['Filter']['Key']['FilterRules'][i]['Name'] = sw['Filter']['Key']['FilterRules'][i].pop('name')
sw['Filter']['Key']['FilterRules'][i]['Value'] = sw['Filter']['Key']['FilterRules'][i].pop('value')
print(f"transformed data is: {Notifications}")
response = bucket_notification.put(NotificationConfiguration = Notifications)
print(f"Put notification request completed... for {Bucket} :)")
def delete_notification(Bucket):
bucket_notification = s3r.BucketNotification(Bucket)
response = bucket_notification.put(NotificationConfiguration={})
print(f"Put notification delete request completed... for {Bucket} :)")
def add_cors(cors_configuration, bucket_name):
bucket_cors = s3r.BucketCors(bucket_name)
cors_rules = []
# Update MaxAgeSeconds to int if provided
if 'CorsRules' in cors_configuration and len(cors_configuration['CorsRules']) > 0:
for cors_rule in cors_configuration['CorsRules']:
if 'MaxAgeSeconds' in cors_rule:
try:
cors_rule['MaxAgeSeconds'] = int(cors_rule['MaxAgeSeconds'])
except ValueError:
print("Unable to convert MaxAgeSeconds to an integer.")
else:
print("CorsRules key not found.")
print(f"Cors Configuration: {cors_configuration}")
bucket_cors.put(CORSConfiguration={"CORSRules": cors_configuration['CorsRules']})
print(f"Put cors configuration request completed... for {bucket_name} :)")
def delete_cors(bucket_name):
bucket_cors = s3r.BucketCors(bucket_name)
response = bucket_cors.delete()
print(f"Put cors configuration delete request completed... for {bucket_name} :)")