-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdynamodb_client.py
145 lines (129 loc) · 5.09 KB
/
dynamodb_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import boto3
import itertools
import json
import os
from dynamodb_json import json_util as dynamodb_json
from logging import getLogger
LOG = getLogger(__name__)
class CreateUpdateAndDeleteAspectScore(object):
def __init__(self, table_name):
self.dynamodb = None
self.tableName = table_name
self.createDynamodbClient()
def createDynamodbClient(self):
self.dynamodb = boto3.client('dynamodb', region_name="ap-southeast-1")
return self.dynamodb
def createTable(self, table_name):
try:
response = self.dynamodb.describe_table(TableName=table_name)
except self.dynamodb.exceptions.ResourceNotFoundException:
response = self.dynamodb.create_table(
TableName=table_name,
KeySchema=[
{
'AttributeName': 'hotel_id',
'KeyType': 'HASH' # Partition key
},
{
'AttributeName': 'aspect',
'KeyType': 'RANGE' # Sort key
},
],
AttributeDefinitions=[
{
'AttributeName': 'hotel_id',
'AttributeType': 'N'
},
{
'AttributeName': 'aspect',
'AttributeType': 'S'
}
],
LocalSecondaryIndexes=[
{
'IndexName': 'PostsByDate',
'KeySchema': [
{
'AttributeName': 'hotel_id',
'KeyType': 'HASH' # Partition key
},
{
'AttributeName': 'aspect',
'KeyType': 'RANGE' # Sort key
},
],
'Projection': {
'ProjectionType': 'ALL'
}
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 50,
'WriteCapacityUnits': 50,
}
)
return response
def upSertScore(self, item):
item = json.loads(dynamodb_json.dumps(item))
try:
response = self.dynamodb.update_item(
TableName=self.tableName,
Key={
'hotel_id': item['hotel_id'],
'aspect': item['aspect']
},
UpdateExpression="set score = :score",
ExpressionAttributeValues={
':score': item['score']
}
)
except Exception as e:
LOG.error("Error updating/inserting Scores table : " + str(e))
raise Exception(str(e))
return response
def deleteScores(self, newScores):
pe = "hotel_id, aspect, score"
try:
response = self.dynamodb.scan(
TableName=self.tableName,
ProjectionExpression=pe
)
self.ddbScores = dynamodb_json.loads(response['Items'])
if len(response['Items']) > 0:
while 'LastEvaluatedKey' in response:
response = self.dynamodb.scan(
TableName=self.tableName,
ProjectionExpression=pe,
ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=True,
ReturnConsumedCapacity=True
)
for item in response['Items']:
self.ddbScores.extend(dynamodb_json.loads(item))
except Exception as e:
LOG.error("Error scanning Scores table : " + str(e))
raise Exception(str(e))
new_scores = [{'hotel_id': item['hotel_id'],
'aspect':item['aspect']} for item in newScores]
ddb_scores = [{'hotel_id': item['hotel_id'],
'aspect':item['aspect']} for item in self.ddbScores]
removed_scores = list(itertools.filterfalse(
lambda x: x in new_scores, ddb_scores))
if len(removed_scores) > 0:
for score in removed_scores:
score = json.loads(dynamodb_json.dumps(score))
try:
self.dynamodb.delete_item(
TableName=self.tableName,
Key={
'hotel_id': score['hotel_id'],
'aspect': score['aspect'],
},
)
except Exception as e:
LOG.error(
"Error deleting scores in Scores table : " + str(e))
raise Exception(str(e))
_table_name = os.environ['DYNAMODB_TABLE']
_ddb_obj = CreateUpdateAndDeleteAspectScore(_table_name)
_ddb_obj.createTable(_table_name)