Skip to content

Commit bac4526

Browse files
committed
Make mirror_file fail if file object already exists (#7134, PR #7141)
2 parents 0088ec9 + f0d2f2b commit bac4526

File tree

6 files changed

+100
-33
lines changed

6 files changed

+100
-33
lines changed

src/azul/indexer/mirror_controller.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,13 @@ def mirror_file(self,
152152
deployment_is_stable = (config.deployment.is_stable
153153
and not config.deployment.is_unit_test
154154
and catalog not in config.integration_test_catalogs)
155+
155156
if file_is_large and not deployment_is_stable:
156157
log.info('Not mirroring file to save cost: %r', file)
158+
elif self.service.info_exists(catalog, file):
159+
log.info('File is already mirrored, skipping upload: %r', file)
160+
elif self.service.file_exists(catalog, file):
161+
assert False, R('File object is already present', file)
157162
else:
158163
# Ensure we test with multiple parts on lower deployments
159164
part_size = FilePart.default_size if deployment_is_stable else FilePart.min_size

src/azul/indexer/mirror_service.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,17 @@ def mirror_object_key(self, file: File) -> str:
191191
def info_object_key(self, file: File) -> str:
192192
return self._file_key('info', file, extension='.json')
193193

194-
def is_mirrored(self, catalog: CatalogName, file: File) -> bool:
194+
def info_exists(self, catalog: CatalogName, file: File) -> bool:
195195
return self._get_info(catalog, file) is not None
196196

197+
def file_exists(self, catalog: CatalogName, file: File) -> bool:
198+
try:
199+
self._storage(catalog).head(self.mirror_object_key(file))
200+
except StorageObjectNotFound:
201+
return False
202+
else:
203+
return True
204+
197205
def _file_key(self, prefix: str, file: File, *, extension: str = '') -> str:
198206
digest = file.digest
199207
assert all(c in string.hexdigits for c in digest.value), R(
@@ -214,17 +222,15 @@ def mirror_file(self, catalog: CatalogName, file: File):
214222
Upload the file in a single request. For larger files, use
215223
:meth:`begin_mirroring_file` instead.
216224
"""
217-
if self.is_mirrored(catalog, file):
218-
log.info('File is already mirrored, skipping upload: %r', file)
219-
else:
220-
file_content = self._download(catalog, file)
221-
self._storage(catalog).put(object_key=self.mirror_object_key(file),
222-
data=file_content,
223-
content_type=file.content_type)
224-
hasher = get_resumable_hasher(file.digest.type)
225-
hasher.update(file_content)
226-
self._verify_digest(file, hasher)
227-
self._put_info(catalog, file)
225+
file_content = self._download(catalog, file)
226+
self._storage(catalog).put(object_key=self.mirror_object_key(file),
227+
data=file_content,
228+
content_type=file.content_type,
229+
overwrite=False)
230+
hasher = get_resumable_hasher(file.digest.type)
231+
hasher.update(file_content)
232+
self._verify_digest(file, hasher)
233+
self._put_info(catalog, file)
228234

229235
def begin_mirroring_file(self, catalog: CatalogName, file: File) -> str:
230236
"""
@@ -268,7 +274,9 @@ def finish_mirroring_file(self,
268274
Complete a multipart upload begun with :meth:`begin_mirroring_file`.
269275
"""
270276
upload = self._get_upload(catalog, file, upload_id)
271-
self._storage(catalog).complete_multipart_upload(upload, etags)
277+
self._storage(catalog).complete_multipart_upload(upload,
278+
etags,
279+
overwrite=False)
272280
self._verify_digest(file, hasher)
273281
self._get_info(catalog, file)
274282
self._put_info(catalog, file)

src/azul/service/repository_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ def download_file(self,
266266
plugin = self.repository_plugin(catalog)
267267

268268
is_mirrored = (config.enable_mirroring
269-
and self.mirror_service.is_mirrored(catalog, file))
269+
and self.mirror_service.info_exists(catalog, file))
270270
if is_mirrored:
271271
download = MirrorFileDownload(
272272
file=file,

src/azul/service/storage_service.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
urlencode,
3030
)
3131

32+
import botocore
33+
import botocore.exceptions
3234
from botocore.response import (
3335
StreamingBody,
3436
)
@@ -75,6 +77,10 @@ class StorageObjectNotFound(Exception):
7577
pass
7678

7779

80+
class StorageObjectExists(Exception):
81+
pass
82+
83+
7884
class StorageService:
7985

8086
def __init__(self, bucket_name: str | None = None):
@@ -92,7 +98,7 @@ def head(self, object_key: str) -> HeadObjectOutputTypeDef:
9298
Key=object_key)
9399
except self._s3.exceptions.ClientError as e:
94100
if int(e.response['Error']['Code']) == 404:
95-
raise StorageObjectNotFound
101+
raise StorageObjectNotFound(object_key)
96102
else:
97103
raise e
98104

@@ -101,7 +107,7 @@ def get(self, object_key: str) -> bytes:
101107
response = self._s3.get_object(Bucket=self.bucket_name,
102108
Key=object_key)
103109
except self._s3.exceptions.NoSuchKey:
104-
raise StorageObjectNotFound
110+
raise StorageObjectNotFound(object_key)
105111
else:
106112
return response['Body'].read()
107113

@@ -110,12 +116,19 @@ def put(self,
110116
data: bytes,
111117
content_type: str | None = None,
112118
tagging: Tagging | None = None,
119+
*,
120+
overwrite: bool = True,
113121
**kwargs):
114-
self._s3.put_object(Bucket=self.bucket_name,
115-
Key=object_key,
116-
Body=data,
117-
**self._object_creation_kwargs(content_type=content_type, tagging=tagging),
118-
**kwargs)
122+
try:
123+
self._s3.put_object(Bucket=self.bucket_name,
124+
Key=object_key,
125+
Body=data,
126+
**self._object_creation_kwargs(content_type=content_type,
127+
tagging=tagging,
128+
overwrite=overwrite),
129+
**kwargs)
130+
except botocore.exceptions.ClientError as e:
131+
self._handle_overwrite(e, object_key)
119132

120133
def list(self, prefix: str) -> OrderedSet[str]:
121134
keys, num_keys = OrderedSet(), 0
@@ -155,15 +168,22 @@ def upload_multipart_part(self,
155168

156169
def complete_multipart_upload(self,
157170
upload: MultipartUpload,
158-
etags: Sequence[str]) -> None:
171+
etags: Sequence[str],
172+
*,
173+
overwrite: bool = True,
174+
) -> None:
159175
parts = [
160176
{
161177
'PartNumber': index + 1,
162178
'ETag': etag
163179
}
164180
for index, etag in enumerate(etags)
165181
]
166-
upload.complete(MultipartUpload={'Parts': parts})
182+
try:
183+
upload.complete(MultipartUpload={'Parts': parts},
184+
**self._object_creation_kwargs(overwrite=overwrite))
185+
except botocore.exceptions.ClientError as e:
186+
self._handle_overwrite(e, upload.object_key)
167187

168188
def upload(self,
169189
file_path: str,
@@ -179,14 +199,19 @@ def upload(self,
179199
if tagging:
180200
self.put_object_tagging(object_key, tagging)
181201

182-
def _object_creation_kwargs(self, *,
202+
def _object_creation_kwargs(self,
203+
*,
183204
content_type: str | None = None,
184-
tagging: Tagging | None = None):
205+
tagging: Tagging | None = None,
206+
overwrite: bool = True
207+
) -> Mapping[str, str]:
185208
kwargs = {}
186209
if content_type is not None:
187210
kwargs['ContentType'] = content_type
188211
if tagging is not None:
189212
kwargs['Tagging'] = urlencode(tagging)
213+
if overwrite is False:
214+
kwargs['IfNoneMatch'] = '*'
190215
return kwargs
191216

192217
def get_presigned_url(self, key: str, file_name: str | None = None) -> str:
@@ -281,6 +306,17 @@ def _time_until_object_expires(self,
281306
expiration_header, expected_expiry)
282307
return time_left
283308

309+
def _handle_overwrite(self,
310+
exception: botocore.exceptions.ClientError,
311+
object_key: str
312+
):
313+
error = exception.response['Error']
314+
code, condition = error['Code'], error['Condition']
315+
if code == 'PreconditionFailed' and condition == 'If-None-Match':
316+
raise StorageObjectExists(object_key)
317+
else:
318+
raise exception
319+
284320

285321
@dataclass
286322
class Part:

test/indexer/test_mirror_controller.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,18 @@ def test_mirroring(self):
7575
with self.subTest('mirror_partition'):
7676
file, file_message = self._test_mirror_partition(partition_message)
7777

78-
with self.subTest('mirror_file'):
78+
with self.subTest('mirror_file', corrupted=False, exists=False):
7979
self._test_mirror_file(file, file_message)
8080

81+
self._s3.delete_object(Bucket=self.mirror_bucket,
82+
Key=self.mirror_controller.service.info_object_key(file))
83+
84+
with self.subTest('mirror_file', corrupted=True):
85+
self._test_corrupted_download(file_message)
86+
87+
with self.subTest('mirror_file', corrupted=False, exists=True):
88+
self._test_reuploaded_file(file_message)
89+
8190
_file_contents = b'lorem ipsum dolor sit\n'
8291

8392
@property
@@ -136,14 +145,23 @@ def _test_mirror_file(self, file, file_message):
136145
Key=self.mirror_controller.service.mirror_object_key(file))
137146
mirrored_file_contents = response['Body'].read()
138147
self.assertEqual(mirrored_file_contents, self._file_contents)
148+
149+
def _test_corrupted_download(self, file_message):
150+
event = [self._mock_sqs_record(file_message)]
139151
corrupted_contents = self._file_contents[:-1] + b'Q'
140152
with patch.object(MirrorService, '_download', return_value=corrupted_contents):
141-
# Force reupload attempt in spite of info object being present
142-
with patch.object(MirrorService, 'is_mirrored', return_value=False):
143-
with self.assertRaises(AssertionError) as e:
144-
self.mirror_controller.mirror(event)
153+
with self.assertRaises(AssertionError) as e:
154+
self.mirror_controller.mirror(event)
145155
self.assertTrue(R.caused(e.exception))
146156

157+
def _test_reuploaded_file(self, file_message):
158+
event = [self._mock_sqs_record(file_message)]
159+
with patch.object(MirrorService, '_download', return_value=self._file_contents):
160+
with self.assertRaises(AssertionError) as e:
161+
self.mirror_controller.mirror(event)
162+
self.assertTrue(R.caused(e.exception))
163+
self.assertEqual(e.exception.args[0].args[0], 'File object is already present')
164+
147165
def test_info_schema(self):
148166
client = http_client(log)
149167
file = MagicMock(content_type='text/plain')

test/service/test_repository_files.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def assertUrlEqual(self, a: Union[str, furl], b: Union[str, furl]):
117117
@mock.patch.object(SourceService, '_put', new=MagicMock())
118118
@mock.patch.object(SourceService, '_get')
119119
@mock.patch.object(BaseMirrorService,
120-
'is_mirrored',
120+
'info_exists',
121121
new=MagicMock(return_value=False))
122122
class TestRepositoryFilesWithTDR(DCP2TestCase, RepositoryFilesTestCase):
123123

@@ -250,7 +250,7 @@ def _test(*, authenticate: bool, cache: bool):
250250

251251

252252
@mock.patch.object(BaseMirrorService,
253-
'is_mirrored',
253+
'info_exists',
254254
new=MagicMock(return_value=False))
255255
class TestRepositoryFilesWithDSS(DCP1TestCase,
256256
RepositoryFilesTestCase,
@@ -424,7 +424,7 @@ def test_repository_files(self):
424424
mirror_service = MirrorService(schema_url_func=MagicMock())
425425
with mock.patch.object(MirrorService, '_download', return_value=file_content):
426426
mirror_service.mirror_file(self.catalog, file)
427-
self.assertTrue(mirror_service.is_mirrored(self.catalog, file))
427+
self.assertTrue(mirror_service.info_exists(self.catalog, file))
428428

429429
client = http_client(log)
430430
args = dict(catalog=self.catalog, version=file_version)

0 commit comments

Comments
 (0)