-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathdynamodb_client.py
More file actions
218 lines (181 loc) · 7.75 KB
/
dynamodb_client.py
File metadata and controls
218 lines (181 loc) · 7.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import boto3 # type: ignore
import json
from typing import TypedDict, List, Optional, Tuple, Dict
from src.config import OBJECTS_TABLE, AWS_REGION
from src.logger import logger
class DynamoDbItemStringValue(TypedDict):
S: str
# Unfortunately, we can't use variables for the key names here, so we need to
# use literal strings
DynamoDbUserItem = TypedDict(
"DynamoDbUserItem",
{
"github/handle": DynamoDbItemStringValue,
"asana/domain-user-id": DynamoDbItemStringValue,
},
)
class DynamoDbClient(object):
"""
Encapsulates DynamoDb client interface, as exposed to the world. There is a single (singleton) instance of
DynamoDbClient in the process, which is lazily created upon the first request. This pattern supports test code
that does not require DynamoDb.
"""
GITHUB_HANDLE_KEY = "github/handle"
USER_ID_KEY = "asana/domain-user-id"
# the singleton instance of DynamoDbClient
_singleton = None
def __init__(self):
self.client = DynamoDbClient._create_client()
# getter for the singleton
@classmethod
def singleton(cls):
"""
Getter for the DynamoDbClient singleton
"""
if cls._singleton is None:
cls._singleton = DynamoDbClient()
return cls._singleton
def bulk_insert_items_in_batches(self, table_name: str, items: List[dict]):
"""Insert multiple items to a Dynamodb table.
We need to split large requests into batches of 25, since Dynamodb only accepts 25 items at a time.
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
"""
BATCH_SIZE = 25
for batch_start in range(0, len(items), BATCH_SIZE):
response = self.client.batch_write_item(
RequestItems={
table_name: [
{"PutRequest": {"Item": item}}
for item in items[batch_start : batch_start + BATCH_SIZE]
]
}
)
if response.get("UnprocessedItems"):
logger.warning(
"Failed to insert items: {}".format(response["UnprocessedItems"])
)
# OBJECTS TABLE
def get_asana_id_from_github_node_id(self, gh_node_id: str) -> Optional[str]:
"""
Retrieves the Asana object-id associated with the specified GitHub node-id,
or None, if no such association exists. Object-table associations are created
by SGTM via the insert_github_node_to_asana_id_mapping method, below.
"""
response = self.client.get_item(
TableName=OBJECTS_TABLE, Key={"github-node": {"S": gh_node_id}}
)
if "Item" in response and "asana-id" in response["Item"]:
return response["Item"]["asana-id"]["S"]
else:
logger.warning(
f"Asana id not found in dynamodb for github node id {gh_node_id}"
)
return None
def insert_github_node_to_asana_id_mapping(self, gh_node_id: str, asana_id: str):
"""
Creates an association between a GitHub node-id and an Asana object-id
"""
response = self.client.put_item(
TableName=OBJECTS_TABLE,
Item={"github-node": {"S": gh_node_id}, "asana-id": {"S": asana_id}},
)
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
logger.info(f"Inserted into dynamodb {gh_node_id} -> {asana_id}")
else:
logger.warning(
f"Error inserting into dynamodb {gh_node_id} -> {asana_id}, response {response}"
)
def bulk_insert_github_node_to_asana_id_mapping(
self, gh_and_asana_ids: List[Tuple[str, str]]
):
"""Insert multiple mappings from github node ids to Asana object ids.
Equivalent to calling insert_github_node_to_asana_id_mapping repeatedly,
but in a single request.
"""
items = [
{"github-node": {"S": gh_node_id}, "asana-id": {"S": asana_id}}
for gh_node_id, asana_id in gh_and_asana_ids
]
return self.bulk_insert_items_in_batches(OBJECTS_TABLE, items)
# ATTACHMENT METHODS
def get_attachments_for_github_node(self, gh_node_id: str) -> Dict[str, str]:
"""
Retrieves the attachment mappings (original asset ID -> Asana attachment ID) for a GitHub node.
Returns an empty dict if no attachments are found.
"""
response = self.client.get_item(
TableName=OBJECTS_TABLE, Key={"github-node": {"S": gh_node_id}}
)
if "Item" in response and "attachments" in response["Item"]:
try:
attachments_json = response["Item"]["attachments"]["S"]
return json.loads(attachments_json)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to parse attachments for {gh_node_id}: {e}")
return {}
return {}
def update_attachments_for_github_node(
self, gh_node_id: str, attachments: Dict[str, str]
):
"""
Updates the attachment mappings for a GitHub node. Creates the record if it doesn't exist.
"""
attachments_json = json.dumps(attachments)
response = self.client.update_item(
TableName=OBJECTS_TABLE,
Key={"github-node": {"S": gh_node_id}},
UpdateExpression="SET attachments = :attachments",
ExpressionAttributeValues={":attachments": {"S": attachments_json}},
ReturnValues="UPDATED_NEW",
)
if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
logger.info(
f"Updated attachments for {gh_node_id}: {len(attachments)} mappings"
)
else:
logger.warning(
f"Error updating attachments for {gh_node_id}, response {response}"
)
@staticmethod
def _create_client():
return boto3.client("dynamodb", region_name=AWS_REGION)
def get_asana_id_from_github_node_id(gh_node_id: str) -> Optional[str]:
"""
Using the singleton instance of DynamoDbClient, creating it if necessary:
Retrieves the Asana object-id associated with the specified GitHub node-id,
or None, if no such association exists. Object-table associations are created
by SGTM via the insert_github_node_to_asana_id_mapping method, below.
"""
return DynamoDbClient.singleton().get_asana_id_from_github_node_id(gh_node_id)
def insert_github_node_to_asana_id_mapping(gh_node_id: str, asana_id: str):
"""
Using the singleton instance of DynamoDbClient, creating it if necessary:
Creates an association between a GitHub node-id and an Asana object-id
"""
return DynamoDbClient.singleton().insert_github_node_to_asana_id_mapping(
gh_node_id, asana_id
)
def bulk_insert_github_node_to_asana_id_mapping(
gh_and_asana_ids: List[Tuple[str, str]]
):
"""
Insert multiple mappings from github node ids to Asana object ids.
Equivalent to calling insert_github_node_to_asana_id_mapping
repeatedly, but in a single request.
"""
DynamoDbClient.singleton().bulk_insert_github_node_to_asana_id_mapping(
gh_and_asana_ids
)
# Attachment convenience functions
def get_attachments_for_github_node(gh_node_id: str) -> Dict[str, str]:
"""
Retrieves the attachment mappings (original asset ID -> Asana attachment ID) for a GitHub node.
"""
return DynamoDbClient.singleton().get_attachments_for_github_node(gh_node_id)
def update_attachments_for_github_node(gh_node_id: str, attachments: Dict[str, str]):
"""
Updates the attachment mappings for a GitHub node.
"""
return DynamoDbClient.singleton().update_attachments_for_github_node(
gh_node_id, attachments
)