Skip to content

Make mirror_file fail if file object already exists (#7134) #7141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/azul/indexer/mirror_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,13 @@ def mirror_file(self,
deployment_is_stable = (config.deployment.is_stable
and not config.deployment.is_unit_test
and catalog not in config.integration_test_catalogs)

if file_is_large and not deployment_is_stable:
log.info('Not mirroring file to save cost: %r', file)
elif self.service.info_exists(catalog, file):
log.info('File is already mirrored, skipping upload: %r', file)
elif self.service.file_exists(catalog, file):
assert False, R('File object is already present', file)
else:
# Ensure we test with multiple parts on lower deployments
part_size = FilePart.default_size if deployment_is_stable else FilePart.min_size
Expand Down
34 changes: 21 additions & 13 deletions src/azul/indexer/mirror_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,17 @@ def mirror_object_key(self, file: File) -> str:
def info_object_key(self, file: File) -> str:
return self._file_key('info', file, extension='.json')

def is_mirrored(self, catalog: CatalogName, file: File) -> bool:
def info_exists(self, catalog: CatalogName, file: File) -> bool:
return self._get_info(catalog, file) is not None

def file_exists(self, catalog: CatalogName, file: File) -> bool:
try:
self._storage(catalog).head(self.mirror_object_key(file))
except StorageObjectNotFound:
return False
else:
return True

def _file_key(self, prefix: str, file: File, *, extension: str = '') -> str:
digest = file.digest
assert all(c in string.hexdigits for c in digest.value), R(
Expand All @@ -214,17 +222,15 @@ def mirror_file(self, catalog: CatalogName, file: File):
Upload the file in a single request. For larger files, use
:meth:`begin_mirroring_file` instead.
"""
if self.is_mirrored(catalog, file):
log.info('File is already mirrored, skipping upload: %r', file)
else:
file_content = self._download(catalog, file)
self._storage(catalog).put(object_key=self.mirror_object_key(file),
data=file_content,
content_type=file.content_type)
hasher = get_resumable_hasher(file.digest.type)
hasher.update(file_content)
self._verify_digest(file, hasher)
self._put_info(catalog, file)
file_content = self._download(catalog, file)
self._storage(catalog).put(object_key=self.mirror_object_key(file),
data=file_content,
content_type=file.content_type,
overwrite=False)
hasher = get_resumable_hasher(file.digest.type)
hasher.update(file_content)
self._verify_digest(file, hasher)
self._put_info(catalog, file)

def begin_mirroring_file(self, catalog: CatalogName, file: File) -> str:
"""
Expand Down Expand Up @@ -268,7 +274,9 @@ def finish_mirroring_file(self,
Complete a multipart upload begun with :meth:`begin_mirroring_file`.
"""
upload = self._get_upload(catalog, file, upload_id)
self._storage(catalog).complete_multipart_upload(upload, etags)
self._storage(catalog).complete_multipart_upload(upload,
etags,
overwrite=False)
self._verify_digest(file, hasher)
self._get_info(catalog, file)
self._put_info(catalog, file)
Expand Down
2 changes: 1 addition & 1 deletion src/azul/service/repository_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def download_file(self,
plugin = self.repository_plugin(catalog)

is_mirrored = (config.enable_mirroring
and self.mirror_service.is_mirrored(catalog, file))
and self.mirror_service.info_exists(catalog, file))
if is_mirrored:
download = MirrorFileDownload(
file=file,
Expand Down
58 changes: 47 additions & 11 deletions src/azul/service/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
urlencode,
)

import botocore
import botocore.exceptions
from botocore.response import (
StreamingBody,
)
Expand Down Expand Up @@ -75,6 +77,10 @@ class StorageObjectNotFound(Exception):
pass


class StorageObjectExists(Exception):
pass


class StorageService:

def __init__(self, bucket_name: str | None = None):
Expand All @@ -92,7 +98,7 @@ def head(self, object_key: str) -> HeadObjectOutputTypeDef:
Key=object_key)
except self._s3.exceptions.ClientError as e:
if int(e.response['Error']['Code']) == 404:
raise StorageObjectNotFound
raise StorageObjectNotFound(object_key)
else:
raise e

Expand All @@ -101,7 +107,7 @@ def get(self, object_key: str) -> bytes:
response = self._s3.get_object(Bucket=self.bucket_name,
Key=object_key)
except self._s3.exceptions.NoSuchKey:
raise StorageObjectNotFound
raise StorageObjectNotFound(object_key)
else:
return response['Body'].read()

Expand All @@ -110,12 +116,19 @@ def put(self,
data: bytes,
content_type: str | None = None,
tagging: Tagging | None = None,
*,
overwrite: bool = True,
**kwargs):
self._s3.put_object(Bucket=self.bucket_name,
Key=object_key,
Body=data,
**self._object_creation_kwargs(content_type=content_type, tagging=tagging),
**kwargs)
try:
self._s3.put_object(Bucket=self.bucket_name,
Key=object_key,
Body=data,
**self._object_creation_kwargs(content_type=content_type,
tagging=tagging,
overwrite=overwrite),
**kwargs)
except botocore.exceptions.ClientError as e:
self._handle_overwrite(e, object_key)

def list(self, prefix: str) -> OrderedSet[str]:
keys, num_keys = OrderedSet(), 0
Expand Down Expand Up @@ -155,15 +168,22 @@ def upload_multipart_part(self,

def complete_multipart_upload(self,
upload: MultipartUpload,
etags: Sequence[str]) -> None:
etags: Sequence[str],
*,
overwrite: bool = True,
) -> None:
parts = [
{
'PartNumber': index + 1,
'ETag': etag
}
for index, etag in enumerate(etags)
]
upload.complete(MultipartUpload={'Parts': parts})
try:
upload.complete(MultipartUpload={'Parts': parts},
**self._object_creation_kwargs(overwrite=overwrite))
except botocore.exceptions.ClientError as e:
self._handle_overwrite(e, upload.object_key)

def upload(self,
file_path: str,
Expand All @@ -179,14 +199,19 @@ def upload(self,
if tagging:
self.put_object_tagging(object_key, tagging)

def _object_creation_kwargs(self, *,
def _object_creation_kwargs(self,
*,
content_type: str | None = None,
tagging: Tagging | None = None):
tagging: Tagging | None = None,
overwrite: bool = True
) -> Mapping[str, str]:
kwargs = {}
if content_type is not None:
kwargs['ContentType'] = content_type
if tagging is not None:
kwargs['Tagging'] = urlencode(tagging)
if overwrite is False:
kwargs['IfNoneMatch'] = '*'
return kwargs

def get_presigned_url(self, key: str, file_name: str | None = None) -> str:
Expand Down Expand Up @@ -281,6 +306,17 @@ def _time_until_object_expires(self,
expiration_header, expected_expiry)
return time_left

def _handle_overwrite(self,
exception: botocore.exceptions.ClientError,
object_key: str
):
error = exception.response['Error']
code, condition = error['Code'], error['Condition']
if code == 'PreconditionFailed' and condition == 'If-None-Match':
raise StorageObjectExists(object_key)
else:
raise exception


@dataclass
class Part:
Expand Down
28 changes: 23 additions & 5 deletions test/indexer/test_mirror_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,18 @@ def test_mirroring(self):
with self.subTest('mirror_partition'):
file, file_message = self._test_mirror_partition(partition_message)

with self.subTest('mirror_file'):
with self.subTest('mirror_file', corrupted=False, exists=False):
self._test_mirror_file(file, file_message)

self._s3.delete_object(Bucket=self.mirror_bucket,
Key=self.mirror_controller.service.info_object_key(file))

with self.subTest('mirror_file', corrupted=True):
self._test_corrupted_download(file_message)

with self.subTest('mirror_file', corrupted=False, exists=True):
self._test_reuploaded_file(file_message)

_file_contents = b'lorem ipsum dolor sit\n'

@property
Expand Down Expand Up @@ -136,14 +145,23 @@ def _test_mirror_file(self, file, file_message):
Key=self.mirror_controller.service.mirror_object_key(file))
mirrored_file_contents = response['Body'].read()
self.assertEqual(mirrored_file_contents, self._file_contents)

def _test_corrupted_download(self, file_message):
event = [self._mock_sqs_record(file_message)]
corrupted_contents = self._file_contents[:-1] + b'Q'
with patch.object(MirrorService, '_download', return_value=corrupted_contents):
# Force reupload attempt in spite of info object being present
with patch.object(MirrorService, 'is_mirrored', return_value=False):
with self.assertRaises(AssertionError) as e:
self.mirror_controller.mirror(event)
with self.assertRaises(AssertionError) as e:
self.mirror_controller.mirror(event)
self.assertTrue(R.caused(e.exception))

def _test_reuploaded_file(self, file_message):
event = [self._mock_sqs_record(file_message)]
with patch.object(MirrorService, '_download', return_value=self._file_contents):
with self.assertRaises(AssertionError) as e:
self.mirror_controller.mirror(event)
self.assertTrue(R.caused(e.exception))
self.assertEqual(e.exception.args[0].args[0], 'File object is already present')

def test_info_schema(self):
client = http_client(log)
file = MagicMock(content_type='text/plain')
Expand Down
6 changes: 3 additions & 3 deletions test/service/test_repository_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def assertUrlEqual(self, a: Union[str, furl], b: Union[str, furl]):
@mock.patch.object(SourceService, '_put', new=MagicMock())
@mock.patch.object(SourceService, '_get')
@mock.patch.object(BaseMirrorService,
'is_mirrored',
'info_exists',
new=MagicMock(return_value=False))
class TestRepositoryFilesWithTDR(DCP2TestCase, RepositoryFilesTestCase):

Expand Down Expand Up @@ -250,7 +250,7 @@ def _test(*, authenticate: bool, cache: bool):


@mock.patch.object(BaseMirrorService,
'is_mirrored',
'info_exists',
new=MagicMock(return_value=False))
class TestRepositoryFilesWithDSS(DCP1TestCase,
RepositoryFilesTestCase,
Expand Down Expand Up @@ -424,7 +424,7 @@ def test_repository_files(self):
mirror_service = MirrorService(schema_url_func=MagicMock())
with mock.patch.object(MirrorService, '_download', return_value=file_content):
mirror_service.mirror_file(self.catalog, file)
self.assertTrue(mirror_service.is_mirrored(self.catalog, file))
self.assertTrue(mirror_service.info_exists(self.catalog, file))

client = http_client(log)
args = dict(catalog=self.catalog, version=file_version)
Expand Down