From fb8a29ceb2c09651f5f059b3dbc759dfa2a1b10c Mon Sep 17 00:00:00 2001 From: Steve Yoo Date: Tue, 13 May 2025 14:13:17 -0400 Subject: [PATCH] Validate multipart downloads using object ETags --- .../next-release/bugfix-download-32668.json | 5 ++ s3transfer/download.py | 36 ++++++-- s3transfer/exceptions.py | 4 + s3transfer/futures.py | 15 ++++ tests/functional/test_download.py | 88 +++++++++++++++++-- tests/unit/test_download.py | 9 +- 6 files changed, 140 insertions(+), 17 deletions(-) create mode 100644 .changes/next-release/bugfix-download-32668.json diff --git a/.changes/next-release/bugfix-download-32668.json b/.changes/next-release/bugfix-download-32668.json new file mode 100644 index 00000000..e1e63dce --- /dev/null +++ b/.changes/next-release/bugfix-download-32668.json @@ -0,0 +1,5 @@ +{ + "type": "bugfix", + "category": "``GetObjectTask``", + "description": "Validate ETag of stored object during multipart downloads" +} diff --git a/s3transfer/download.py b/s3transfer/download.py index 4d431a4e..c0345a83 100644 --- a/s3transfer/download.py +++ b/s3transfer/download.py @@ -14,8 +14,10 @@ import logging import threading +from botocore.exceptions import ClientError + from s3transfer.compat import seekable -from s3transfer.exceptions import RetriesExceededError +from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG from s3transfer.tasks import SubmissionTask, Task from s3transfer.utils import ( @@ -346,14 +348,18 @@ def _submit( :param bandwidth_limiter: The bandwidth limiter to use when downloading streams """ + response = client.head_object( + Bucket=transfer_future.meta.call_args.bucket, + Key=transfer_future.meta.call_args.key, + **transfer_future.meta.call_args.extra_args, + ) + # Provide an etag to ensure a stored object is not modified + # during a multipart download. + transfer_future.meta.provide_object_etag(response.get('ETag')) + if transfer_future.meta.size is None: # If a size was not provided figure out the size for the # user. - response = client.head_object( - Bucket=transfer_future.meta.call_args.bucket, - Key=transfer_future.meta.call_args.key, - **transfer_future.meta.call_args.extra_args, - ) transfer_future.meta.provide_transfer_size( response['ContentLength'] ) @@ -479,9 +485,12 @@ def _submit_ranged_download_request( part_size, i, num_parts ) - # Inject the Range parameter to the parameters to be passed in - # as extra args - extra_args = {'Range': range_parameter} + # Inject extra parameters to be passed in as extra args + extra_args = { + 'Range': range_parameter, + } + if transfer_future.meta.etag is not None: + extra_args['IfMatch'] = transfer_future.meta.etag extra_args.update(call_args.extra_args) finalize_download_invoker.increment() # Submit the ranged downloads @@ -593,6 +602,15 @@ def _main( else: return return + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code') + if error_code == "PreconditionFailed": + raise S3DownloadFailedError( + f'Contents of stored object "{key}" in bucket ' + f'"{bucket}" did not match expected ETag.' + ) + else: + raise except S3_RETRYABLE_DOWNLOAD_ERRORS as e: logger.debug( "Retrying exception caught (%s), " diff --git a/s3transfer/exceptions.py b/s3transfer/exceptions.py index 6150fe65..857a6bf3 100644 --- a/s3transfer/exceptions.py +++ b/s3transfer/exceptions.py @@ -23,6 +23,10 @@ class S3UploadFailedError(Exception): pass +class S3DownloadFailedError(Exception): + pass + + class InvalidSubscriberMethodError(Exception): pass diff --git a/s3transfer/futures.py b/s3transfer/futures.py index 9ab30a7a..ca7cdfc3 100644 --- a/s3transfer/futures.py +++ b/s3transfer/futures.py @@ -134,6 +134,7 @@ def __init__(self, call_args=None, transfer_id=None): self._transfer_id = transfer_id self._size = None self._user_context = {} + self._etag = None @property def call_args(self): @@ -155,6 +156,11 @@ def user_context(self): """A dictionary that requesters can store data in""" return self._user_context + @property + def etag(self): + """The etag of the stored object for validating multipart downloads""" + return self._etag + def provide_transfer_size(self, size): """A method to provide the size of a transfer request @@ -164,6 +170,15 @@ def provide_transfer_size(self, size): """ self._size = size + def provide_object_etag(self, etag): + """A method to provide the etag of a transfer request + + By providing this value, the TransferManager will validate + multipart downloads by supplying an IfMatch parameter with + the etag as the value to GetObject requests. + """ + self._etag = etag + class TransferCoordinator: """A helper class for managing TransferFuture""" diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index ccbac406..ac79d33c 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -21,7 +21,7 @@ from botocore.exceptions import ClientError from s3transfer.compat import SOCKET_ERROR -from s3transfer.exceptions import RetriesExceededError +from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError from s3transfer.manager import TransferConfig, TransferManager from tests import ( BaseGeneralInterfaceTest, @@ -48,6 +48,7 @@ def setUp(self): # Initialize some default arguments self.bucket = 'mybucket' self.key = 'mykey' + self.etag = 'myetag' self.extra_args = {} self.subscribers = [] @@ -84,7 +85,10 @@ def create_stubbed_responses(self): return [ { 'method': 'head_object', - 'service_response': {'ContentLength': len(self.content)}, + 'service_response': { + 'ContentLength': len(self.content), + 'ETag': self.etag, + }, }, { 'method': 'get_object', @@ -291,6 +295,7 @@ def test_retry_rewinds_callbacks(self): self.assertEqual(-3, progress_byte_amts[1]) def test_can_provide_file_size(self): + self.add_head_object_response() self.add_successful_get_object_responses() call_kwargs = self.create_call_kwargs() @@ -299,8 +304,6 @@ def test_can_provide_file_size(self): future = self.manager.download(**call_kwargs) future.result() - # The HeadObject should have not happened and should have been able - # to successfully download the file. self.stubber.assert_no_pending_responses() with open(self.filename, 'rb') as f: self.assertEqual(self.content, f.read()) @@ -469,7 +472,10 @@ def create_stubbed_responses(self): return [ { 'method': 'head_object', - 'service_response': {'ContentLength': len(self.content)}, + 'service_response': { + 'ContentLength': len(self.content), + 'ETag': self.etag, + }, }, { 'method': 'get_object', @@ -502,7 +508,7 @@ def test_download(self): expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] self.add_head_object_response(expected_params) self.add_successful_get_object_responses( - expected_params, expected_ranges + {**expected_params, 'IfMatch': self.etag}, expected_ranges ) future = self.manager.download( @@ -523,6 +529,76 @@ def test_download_with_checksum_enabled(self): } expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] self.add_head_object_response(expected_params) + self.add_successful_get_object_responses( + {**expected_params, 'IfMatch': self.etag}, expected_ranges + ) + + future = self.manager.download( + self.bucket, self.key, self.filename, self.extra_args + ) + future.result() + + # Ensure that the contents are correct + with open(self.filename, 'rb') as f: + self.assertEqual(self.content, f.read()) + + def test_download_raises_if_etag_validation_fails(self): + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + expected_ranges = ['bytes=0-3', 'bytes=4-7'] + self.add_head_object_response(expected_params) + + # Add successful GetObject responses for the first 2 requests. + for i, stubbed_response in enumerate( + self.create_stubbed_responses()[1:3] + ): + stubbed_response['expected_params'] = copy.deepcopy( + {**expected_params, 'IfMatch': self.etag} + ) + stubbed_response['expected_params']['Range'] = expected_ranges[i] + self.stubber.add_response(**stubbed_response) + + # Simulate ETag validation failure by adding a + # client error for the last GetObject request. + self.stubber.add_client_error( + method='get_object', + service_error_code='PreconditionFailed', + service_message=( + 'At least one of the pre-conditions you specified did not hold' + ), + http_status_code=412, + ) + + future = self.manager.download( + self.bucket, self.key, self.filename, self.extra_args + ) + with self.assertRaises(S3DownloadFailedError) as e: + future.result() + self.assertIn('did not match expected ETag', str(e.exception)) + + # Ensure no data is written to disk. + self.assertFalse(os.path.exists(self.filename)) + + def test_download_without_etag(self): + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + } + expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] + + # Stub HeadObject response with no ETag + head_object_response = { + 'method': 'head_object', + 'service_response': { + 'ContentLength': len(self.content), + }, + 'expected_params': expected_params, + } + self.stubber.add_response(**head_object_response) + + # This asserts that IfMatch isn't in the GetObject requests. self.add_successful_get_object_responses( expected_params, expected_ranges ) diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index 912ca61f..f042b677 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -395,7 +395,8 @@ def setUp(self): self.bucket = 'mybucket' self.key = 'mykey' - self.extra_args = {} + self.etag = 'myetag' + self.extra_args = {'IfMatch': self.etag} self.subscribers = [] # Create a stream to read from @@ -452,7 +453,11 @@ def assert_tag_for_get_object(self, tag_value): def add_head_object_response(self): self.stubber.add_response( - 'head_object', {'ContentLength': len(self.content)} + 'head_object', + { + 'ContentLength': len(self.content), + 'ETag': self.etag, + }, ) def add_get_responses(self):