Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-download-32668.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "``GetObjectTask``",
"description": "Validate ETag of stored object during multipart downloads"
}
36 changes: 27 additions & 9 deletions s3transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import logging
import threading

from botocore.exceptions import ClientError

from s3transfer.compat import seekable
from s3transfer.exceptions import RetriesExceededError
from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError
from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG
from s3transfer.tasks import SubmissionTask, Task
from s3transfer.utils import (
Expand Down Expand Up @@ -346,14 +348,18 @@ def _submit(
:param bandwidth_limiter: The bandwidth limiter to use when
downloading streams
"""
response = client.head_object(
Bucket=transfer_future.meta.call_args.bucket,
Key=transfer_future.meta.call_args.key,
**transfer_future.meta.call_args.extra_args,
)
# Provide an etag to ensure a stored object is not modified
# during a multipart download.
transfer_future.meta.provide_object_etag(response.get('ETag'))

if transfer_future.meta.size is None:
# If a size was not provided figure out the size for the
# user.
response = client.head_object(
Bucket=transfer_future.meta.call_args.bucket,
Key=transfer_future.meta.call_args.key,
**transfer_future.meta.call_args.extra_args,
)
transfer_future.meta.provide_transfer_size(
response['ContentLength']
)
Expand Down Expand Up @@ -479,9 +485,12 @@ def _submit_ranged_download_request(
part_size, i, num_parts
)

# Inject the Range parameter to the parameters to be passed in
# as extra args
extra_args = {'Range': range_parameter}
# Inject extra parameters to be passed in as extra args
extra_args = {
'Range': range_parameter,
}
if transfer_future.meta.etag is not None:
extra_args['IfMatch'] = transfer_future.meta.etag
extra_args.update(call_args.extra_args)
finalize_download_invoker.increment()
# Submit the ranged downloads
Expand Down Expand Up @@ -593,6 +602,15 @@ def _main(
else:
return
return
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == "PreconditionFailed":
raise S3DownloadFailedError(
f'Contents of stored object "{key}" in bucket '
f'"{bucket}" did not match expected ETag.'
)
else:
raise
except S3_RETRYABLE_DOWNLOAD_ERRORS as e:
logger.debug(
"Retrying exception caught (%s), "
Expand Down
4 changes: 4 additions & 0 deletions s3transfer/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class S3UploadFailedError(Exception):
pass


class S3DownloadFailedError(Exception):
pass


class InvalidSubscriberMethodError(Exception):
pass

Expand Down
15 changes: 15 additions & 0 deletions s3transfer/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self, call_args=None, transfer_id=None):
self._transfer_id = transfer_id
self._size = None
self._user_context = {}
self._etag = None

@property
def call_args(self):
Expand All @@ -155,6 +156,11 @@ def user_context(self):
"""A dictionary that requesters can store data in"""
return self._user_context

@property
def etag(self):
"""The etag of the stored object for validating multipart downloads"""
return self._etag

def provide_transfer_size(self, size):
"""A method to provide the size of a transfer request

Expand All @@ -164,6 +170,15 @@ def provide_transfer_size(self, size):
"""
self._size = size

def provide_object_etag(self, etag):
"""A method to provide the etag of a transfer request

By providing this value, the TransferManager will validate
multipart downloads by supplying an IfMatch parameter with
the etag as the value to GetObject requests.
"""
self._etag = etag


class TransferCoordinator:
"""A helper class for managing TransferFuture"""
Expand Down
88 changes: 82 additions & 6 deletions tests/functional/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from botocore.exceptions import ClientError

from s3transfer.compat import SOCKET_ERROR
from s3transfer.exceptions import RetriesExceededError
from s3transfer.exceptions import RetriesExceededError, S3DownloadFailedError
from s3transfer.manager import TransferConfig, TransferManager
from tests import (
BaseGeneralInterfaceTest,
Expand All @@ -48,6 +48,7 @@ def setUp(self):
# Initialize some default arguments
self.bucket = 'mybucket'
self.key = 'mykey'
self.etag = 'myetag'
self.extra_args = {}
self.subscribers = []

Expand Down Expand Up @@ -84,7 +85,10 @@ def create_stubbed_responses(self):
return [
{
'method': 'head_object',
'service_response': {'ContentLength': len(self.content)},
'service_response': {
'ContentLength': len(self.content),
'ETag': self.etag,
},
},
{
'method': 'get_object',
Expand Down Expand Up @@ -291,6 +295,7 @@ def test_retry_rewinds_callbacks(self):
self.assertEqual(-3, progress_byte_amts[1])

def test_can_provide_file_size(self):
self.add_head_object_response()
self.add_successful_get_object_responses()

call_kwargs = self.create_call_kwargs()
Expand All @@ -299,8 +304,6 @@ def test_can_provide_file_size(self):
future = self.manager.download(**call_kwargs)
future.result()

# The HeadObject should have not happened and should have been able
# to successfully download the file.
self.stubber.assert_no_pending_responses()
with open(self.filename, 'rb') as f:
self.assertEqual(self.content, f.read())
Expand Down Expand Up @@ -469,7 +472,10 @@ def create_stubbed_responses(self):
return [
{
'method': 'head_object',
'service_response': {'ContentLength': len(self.content)},
'service_response': {
'ContentLength': len(self.content),
'ETag': self.etag,
},
},
{
'method': 'get_object',
Expand Down Expand Up @@ -502,7 +508,7 @@ def test_download(self):
expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-']
self.add_head_object_response(expected_params)
self.add_successful_get_object_responses(
expected_params, expected_ranges
{**expected_params, 'IfMatch': self.etag}, expected_ranges
)

future = self.manager.download(
Expand All @@ -523,6 +529,76 @@ def test_download_with_checksum_enabled(self):
}
expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-']
self.add_head_object_response(expected_params)
self.add_successful_get_object_responses(
{**expected_params, 'IfMatch': self.etag}, expected_ranges
)

future = self.manager.download(
self.bucket, self.key, self.filename, self.extra_args
)
future.result()

# Ensure that the contents are correct
with open(self.filename, 'rb') as f:
self.assertEqual(self.content, f.read())

def test_download_raises_if_etag_validation_fails(self):
expected_params = {
'Bucket': self.bucket,
'Key': self.key,
}
expected_ranges = ['bytes=0-3', 'bytes=4-7']
self.add_head_object_response(expected_params)

# Add successful GetObject responses for the first 2 requests.
for i, stubbed_response in enumerate(
self.create_stubbed_responses()[1:3]
):
stubbed_response['expected_params'] = copy.deepcopy(
{**expected_params, 'IfMatch': self.etag}
)
stubbed_response['expected_params']['Range'] = expected_ranges[i]
self.stubber.add_response(**stubbed_response)

# Simulate ETag validation failure by adding a
# client error for the last GetObject request.
self.stubber.add_client_error(
method='get_object',
service_error_code='PreconditionFailed',
service_message=(
'At least one of the pre-conditions you specified did not hold'
),
http_status_code=412,
)

future = self.manager.download(
self.bucket, self.key, self.filename, self.extra_args
)
with self.assertRaises(S3DownloadFailedError) as e:
future.result()
self.assertIn('did not match expected ETag', str(e.exception))

# Ensure no data is written to disk.
self.assertFalse(os.path.exists(self.filename))

def test_download_without_etag(self):
expected_params = {
'Bucket': self.bucket,
'Key': self.key,
}
expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-']

# Stub HeadObject response with no ETag
head_object_response = {
'method': 'head_object',
'service_response': {
'ContentLength': len(self.content),
},
'expected_params': expected_params,
}
self.stubber.add_response(**head_object_response)

# This asserts that IfMatch isn't in the GetObject requests.
self.add_successful_get_object_responses(
expected_params, expected_ranges
)
Expand Down
9 changes: 7 additions & 2 deletions tests/unit/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ def setUp(self):

self.bucket = 'mybucket'
self.key = 'mykey'
self.extra_args = {}
self.etag = 'myetag'
self.extra_args = {'IfMatch': self.etag}
self.subscribers = []

# Create a stream to read from
Expand Down Expand Up @@ -452,7 +453,11 @@ def assert_tag_for_get_object(self, tag_value):

def add_head_object_response(self):
self.stubber.add_response(
'head_object', {'ContentLength': len(self.content)}
'head_object',
{
'ContentLength': len(self.content),
'ETag': self.etag,
},
)

def add_get_responses(self):
Expand Down
Loading