Skip to content

Commit eb094d5

Browse files
committed
add model file for bulk importer, other cleanups
1 parent 2da7b96 commit eb094d5

File tree

3 files changed

+126
-48
lines changed

3 files changed

+126
-48
lines changed

src/bulk_importer/schemas.py

Lines changed: 0 additions & 48 deletions
This file was deleted.
File renamed without changes.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
"""
2+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
# SPDX-License-Identifier: MIT-0
4+
5+
Unit tests for bulk_importer
6+
"""
7+
import os
8+
import base64
9+
import copy
10+
import json
11+
from unittest import TestCase
12+
from moto import mock_aws
13+
from cryptography import x509
14+
from cryptography.hazmat.backends import default_backend
15+
from cryptography.hazmat.primitives import serialization
16+
from boto3 import resource, client
17+
from src.product_provider.main import lambda_handler, process
18+
19+
from .model_bulk_importer import LambdaSQSClass
20+
21+
@mock_aws(config={
22+
"core": {
23+
"mock_credentials": True,
24+
"reset_boto3_session": False,
25+
"service_whitelist": None,
26+
},
27+
'iot': {'use_valid_cert': True}})
28+
class TestBulkImporter(TestCase):
29+
"""Test cases for bulk importer lambda function"""
30+
def setUp(self):
31+
self.test_sqs_queue_name = "provider"
32+
sqs_client = client('sqs', region_name="us-east-1")
33+
sqs_client.create_queue(QueueName=self.test_sqs_queue_name)
34+
mocked_sqs_resource = resource("sqs")
35+
mocked_sqs_resource = { "resource" : resource('sqs'),
36+
"queue_name" : self.test_sqs_queue_name }
37+
self.mocked_sqs_class = LambdaSQSClass(mocked_sqs_resource)
38+
39+
def test_pos_process_1(self):
40+
"""Positive test case for processing certificate"""
41+
with open('./test/artifacts/single.pem', 'rb') as data:
42+
pem_obj = x509.load_pem_x509_certificate(data.read(),
43+
backend=default_backend())
44+
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
45+
cert = str(base64.b64encode(block.encode('ascii')))
46+
c = {'certificate': cert}
47+
d = copy.deepcopy(c)
48+
d['policy_name'] = "my_policy"
49+
d['thing_group_name'] = "my_thing_group"
50+
d['thing_type_name'] = "my_thing_type"
51+
52+
os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
53+
os.environ['POLICY_NAME'] = d['policy_name']
54+
os.environ['THING_GROUP_NAME'] = d['thing_group_name']
55+
os.environ['THING_TYPE_NAME'] = d['thing_type_name']
56+
r = process(c)
57+
assert r == d
58+
59+
def test_pos_process_2(self):
60+
"""Positive test case for processing certificate"""
61+
with open('./test/artifacts/single.pem', 'rb') as data:
62+
pem_obj = x509.load_pem_x509_certificate(data.read(),
63+
backend=default_backend())
64+
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
65+
cert = str(base64.b64encode(block.encode('ascii')))
66+
c = {'certificate': cert}
67+
d = copy.deepcopy(c)
68+
d['policy_name'] = "my_policy"
69+
d['thing_group_name'] = None
70+
d['thing_type_name'] = None
71+
72+
os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
73+
os.environ['POLICY_NAME'] = d['policy_name']
74+
os.environ['THING_GROUP_NAME'] = "None"
75+
os.environ['THING_TYPE_NAME'] = "None"
76+
r = process(c)
77+
assert r == d
78+
79+
def test_pos_lambda_handler(self):
80+
rcd = { "Records": [
81+
{
82+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
83+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
84+
"body": "Test message.",
85+
"attributes": {
86+
"ApproximateReceiveCount": "1",
87+
"SentTimestamp": "1545082649183",
88+
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
89+
"ApproximateFirstReceiveTimestamp": "1545082649185"
90+
},
91+
"messageAttributes": {},
92+
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
93+
"eventSource": "aws:sqs",
94+
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
95+
"awsRegion": "us-east-2"
96+
},
97+
]
98+
}
99+
100+
with open('./test/artifacts/single.pem', 'rb') as data:
101+
pem_obj = x509.load_pem_x509_certificate(data.read(),
102+
backend=default_backend())
103+
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
104+
cert = str(base64.b64encode(block.encode('ascii')))
105+
c = {'certificate': cert}
106+
rcd['Records'][0]['body'] = json.dumps(c)
107+
d = copy.deepcopy(c)
108+
d['policy_name'] = "my_policy"
109+
d['thing_group_name'] = "my_thing_group"
110+
d['thing_type_name'] = "my_thing_type"
111+
os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
112+
os.environ['POLICY_NAME'] = d['policy_name']
113+
os.environ['THING_GROUP_NAME'] = d['thing_group_name']
114+
os.environ['THING_TYPE_NAME'] = d['thing_type_name']
115+
r = lambda_handler(rcd, None)
116+
# The result is an array of processed records, so our fabricated certificate
117+
# needs to be in array context
118+
assert r == [d]
119+
120+
def tearDown(self):
121+
sqs_resource = resource("sqs", region_name="us-east-1")
122+
sqs_client = client("sqs", "us-east-1")
123+
sqs_queue_url_r = sqs_client.get_queue_url(QueueName=self.test_sqs_queue_name)
124+
sqs_queue_url = sqs_queue_url_r['QueueUrl']
125+
sqs_resource = sqs_resource.Queue(url=sqs_queue_url)
126+
sqs_resource.delete()

0 commit comments

Comments
 (0)