-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path__init__.py
More file actions
315 lines (267 loc) · 9.89 KB
/
__init__.py
File metadata and controls
315 lines (267 loc) · 9.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
from datetime import datetime
from enum import Enum
import typing
class BlobMetadataField(Enum):
"""
Cloud-agnostic dictionary keys to represent blob metadata.
CHECKSUM - Checksum of the blob.
Values returned are cloud-specific and should not be compared across cloud providers.
LAST_MODIFIED - Last modified date of the blob.
SIZE - Size of the blob.
"""
CHECKSUM = "checksum"
CREATED = "created"
LAST_MODIFIED = "last_modified"
SIZE = "size"
class PagedIter(typing.Iterable[typing.Tuple[str, dict]]):
"""
Provide an iterator that will iterate over every object, filtered by prefix and delimiter. Alternately continue
iteration with token and key (start_after_key).
"""
def get_api_response(self, next_token):
"""
Make blobstore-specific list api request.
"""
raise NotImplementedError()
def get_listing_from_response(self, resp) -> typing.Iterable[typing.Tuple[str, dict]]:
"""
Retrieve blob metadata objects from blobstore response.
Metadata objects represented as tuples in the form of:
(key, {BlobMetadataField: val, ...})
"""
raise NotImplementedError()
def get_next_token_from_response(self, resp) -> str:
"""
Retrieve opaque continuation token from blobstore response.
"""
raise NotImplementedError()
def __iter__(self):
"""
Iterate over the blobs, saving page tokens and blob key start_after_keys as needed in order to continue
listing where one left off.
If start_after_key is not None, iteration will begin on the next key if start_after_key is found on the
first page of results. If it is not found on the first page of results, BlobPagingError will be raised.
"""
while True:
resp = self.get_api_response(self.token)
listing = self.get_listing_from_response(resp)
self.token = self.get_next_token_from_response(resp)
if self.start_after_key:
while True:
try:
item = next(listing)
except StopIteration:
raise BlobPagingError('Marker not found in this page')
if item[0] == self.start_after_key:
break
while True:
try:
item = next(listing)
self.start_after_key = item[0]
yield item
except StopIteration:
break
self.start_after_key = None
self.token = self.get_next_token_from_response(resp)
if not self.token:
break
class BlobStore:
"""Abstract base class for all blob stores."""
def __init__(self):
pass
def list(
self,
bucket: str,
prefix: str=None,
delimiter: str=None,
) -> typing.Iterator[str]:
"""
Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that
contain the delimiter past the prefix.
"""
raise NotImplementedError()
def list_v2(
self,
bucket: str,
prefix: str=None,
delimiter: str=None,
start_after_key: str=None,
token: str=None,
k_page_max: int=None,
) -> typing.Iterable[typing.Tuple[str, dict]]:
"""
Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that
contain the delimiter past the prefix.
"""
raise NotImplementedError()
def generate_presigned_GET_url(
self,
bucket: str,
key: str,
**kwargs) -> str:
# TODO: things like http ranges need to be explicit parameters.
# users of this API should not need to know the argument names presented
# to the cloud API.
"""
Retrieves a presigned URL for the given HTTP method for blob at `key`. Raises BlobNotFoundError if the blob
is not present.
"""
raise NotImplementedError()
def upload_file_handle(
self,
bucket: str,
key: str,
src_file_handle: typing.BinaryIO,
content_type: str=None,
metadata: dict=None):
"""
Saves the contents of a file handle as the contents of an object in a bucket.
"""
raise NotImplementedError()
def delete(self, bucket: str, key: str):
"""
Deletes an object in a bucket. If the operation definitely did not delete anything, return False. Any other
return value is treated as something was possibly deleted.
"""
raise NotImplementedError()
def get(self, bucket: str, key: str) -> bytes:
"""
Retrieves the data for a given object in a given bucket.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which metadata is being
retrieved.
:return: the data
"""
raise NotImplementedError()
def get_cloud_checksum(
self,
bucket: str,
key: str
) -> str:
"""
Retrieves the cloud-provided checksum for a given object in a given bucket.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which checksum is being retrieved.
:return: the cloud-provided checksum
"""
raise NotImplementedError()
def get_content_type(
self,
bucket: str,
key: str
) -> str:
"""
Retrieves the content-type for a given object in a given bucket.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which content-type is being retrieved.
:return: the content-type
"""
raise NotImplementedError()
def get_copy_token(
self,
bucket: str,
key: str,
cloud_checksum: str,
) -> typing.Any:
"""
Given a bucket, key, and the expected cloud-provided checksum, retrieve a token that can be passed into
:func:`~cloud_blobstore.BlobStore.copy` that guarantees the copy refers to the same version of the blob
identified by the checksum.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which checksum is being retrieved.
:param cloud_checksum: the expected cloud-provided checksum.
:return: an opaque copy token
"""
raise NotImplementedError()
def get_creation_date(
self,
bucket: str,
key: str,
) -> datetime:
"""
Retrieves the creation date for a given key in a given bucket.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which the creation date is being retrieved.
:return: the creation date
"""
raise NotImplementedError()
def get_last_modified_date(
self,
bucket: str,
key: str,
) -> datetime:
"""
Retrieves last modified date for a given key in a given bucket.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which the last modified date is being retrieved.
:return: the last modified date
"""
raise NotImplementedError()
def get_user_metadata(
self,
bucket: str,
key: str
) -> typing.Dict[str, str]:
"""
Retrieves the user metadata for a given object in a given bucket. If the platform has any mandatory prefixes or
suffixes for the metadata keys, they should be stripped before being returned.
:param bucket: the bucket the object resides in.
:param key: the key of the object for which metadata is being
retrieved.
:return: a dictionary mapping metadata keys to metadata values.
"""
raise NotImplementedError()
def get_size(
self,
bucket: str,
key: str
) -> int:
"""
Retrieves the filesize
:param bucket: the bucket the object resides in.
:param key: the key of the object for which size is being retrieved.
:return: integer equal to filesize in bytes
"""
raise NotImplementedError()
def copy(
self,
src_bucket: str, src_key: str,
dst_bucket: str, dst_key: str,
copy_token: typing.Any=None,
**kwargs):
raise NotImplementedError()
def check_bucket_exists(self, bucket: str) -> bool:
"""
Checks if bucket with specified name exists.
:param bucket: the bucket to be checked.
:return: true if specified bucket exists.
"""
raise NotImplementedError()
def get_bucket_region(self, bucket) -> str:
"""
Get region associated with a specified bucket name.
:param bucket: the bucket to be checked.
:return: region in which specified bucket resides.
"""
raise NotImplementedError()
class BlobStoreError(Exception):
pass
class BlobStoreUnknownError(BlobStoreError):
pass
class BlobStoreCredentialError(BlobStoreError):
pass
class BlobStoreTimeoutError(BlobStoreError):
"""
BlobStoreTimeoutError wraps timeout errors from cloud providers.
For instance, boto3 provides `read_timeout` and `connect_timeout` configurations that may
lead to `ConnectTimeout` and `ReadTimeout` exceeptions.
"""
pass
class BlobBucketNotFoundError(BlobStoreError):
pass
class BlobNotFoundError(BlobStoreError):
pass
class BlobAlreadyExistsError(BlobStoreError):
pass
class BlobPagingError(BlobStoreError):
pass