Skip to content

Commit f0d2f2b

Browse files
nadove-ucscdsotirho-ucsc
authored andcommitted
Make mirror_file fail if file object already exists (#7134)
1 parent d897827 commit f0d2f2b

File tree

3 files changed

+37
-13
lines changed

3 files changed

+37
-13
lines changed

src/azul/indexer/mirror_controller.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,13 @@ def mirror_file(self,
152152
deployment_is_stable = (config.deployment.is_stable
153153
and not config.deployment.is_unit_test
154154
and catalog not in config.integration_test_catalogs)
155+
155156
if file_is_large and not deployment_is_stable:
156157
log.info('Not mirroring file to save cost: %r', file)
158+
elif self.service.info_exists(catalog, file):
159+
log.info('File is already mirrored, skipping upload: %r', file)
160+
elif self.service.file_exists(catalog, file):
161+
assert False, R('File object is already present', file)
157162
else:
158163
# Ensure we test with multiple parts on lower deployments
159164
part_size = FilePart.default_size if deployment_is_stable else FilePart.min_size

src/azul/indexer/mirror_service.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,14 @@ def info_object_key(self, file: File) -> str:
194194
def info_exists(self, catalog: CatalogName, file: File) -> bool:
195195
return self._get_info(catalog, file) is not None
196196

197+
def file_exists(self, catalog: CatalogName, file: File) -> bool:
198+
try:
199+
self._storage(catalog).head(self.mirror_object_key(file))
200+
except StorageObjectNotFound:
201+
return False
202+
else:
203+
return True
204+
197205
def _file_key(self, prefix: str, file: File, *, extension: str = '') -> str:
198206
digest = file.digest
199207
assert all(c in string.hexdigits for c in digest.value), R(
@@ -214,17 +222,15 @@ def mirror_file(self, catalog: CatalogName, file: File):
214222
Upload the file in a single request. For larger files, use
215223
:meth:`begin_mirroring_file` instead.
216224
"""
217-
if self.info_exists(catalog, file):
218-
log.info('File is already mirrored, skipping upload: %r', file)
219-
else:
220-
file_content = self._download(catalog, file)
221-
self._storage(catalog).put(object_key=self.mirror_object_key(file),
222-
data=file_content,
223-
content_type=file.content_type)
224-
hasher = get_resumable_hasher(file.digest.type)
225-
hasher.update(file_content)
226-
self._verify_digest(file, hasher)
227-
self._put_info(catalog, file)
225+
file_content = self._download(catalog, file)
226+
self._storage(catalog).put(object_key=self.mirror_object_key(file),
227+
data=file_content,
228+
content_type=file.content_type,
229+
overwrite=False)
230+
hasher = get_resumable_hasher(file.digest.type)
231+
hasher.update(file_content)
232+
self._verify_digest(file, hasher)
233+
self._put_info(catalog, file)
228234

229235
def begin_mirroring_file(self, catalog: CatalogName, file: File) -> str:
230236
"""
@@ -268,7 +274,9 @@ def finish_mirroring_file(self,
268274
Complete a multipart upload begun with :meth:`begin_mirroring_file`.
269275
"""
270276
upload = self._get_upload(catalog, file, upload_id)
271-
self._storage(catalog).complete_multipart_upload(upload, etags)
277+
self._storage(catalog).complete_multipart_upload(upload,
278+
etags,
279+
overwrite=False)
272280
self._verify_digest(file, hasher)
273281
self._get_info(catalog, file)
274282
self._put_info(catalog, file)

test/indexer/test_mirror_controller.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def test_mirroring(self):
7575
with self.subTest('mirror_partition'):
7676
file, file_message = self._test_mirror_partition(partition_message)
7777

78-
with self.subTest('mirror_file', corrupted=False):
78+
with self.subTest('mirror_file', corrupted=False, exists=False):
7979
self._test_mirror_file(file, file_message)
8080

8181
self._s3.delete_object(Bucket=self.mirror_bucket,
@@ -84,6 +84,9 @@ def test_mirroring(self):
8484
with self.subTest('mirror_file', corrupted=True):
8585
self._test_corrupted_download(file_message)
8686

87+
with self.subTest('mirror_file', corrupted=False, exists=True):
88+
self._test_reuploaded_file(file_message)
89+
8790
_file_contents = b'lorem ipsum dolor sit\n'
8891

8992
@property
@@ -151,6 +154,14 @@ def _test_corrupted_download(self, file_message):
151154
self.mirror_controller.mirror(event)
152155
self.assertTrue(R.caused(e.exception))
153156

157+
def _test_reuploaded_file(self, file_message):
158+
event = [self._mock_sqs_record(file_message)]
159+
with patch.object(MirrorService, '_download', return_value=self._file_contents):
160+
with self.assertRaises(AssertionError) as e:
161+
self.mirror_controller.mirror(event)
162+
self.assertTrue(R.caused(e.exception))
163+
self.assertEqual(e.exception.args[0].args[0], 'File object is already present')
164+
154165
def test_info_schema(self):
155166
client = http_client(log)
156167
file = MagicMock(content_type='text/plain')

0 commit comments

Comments
 (0)