diff --git a/.changes/next-release/enhancement-s3-85557.json b/.changes/next-release/enhancement-s3-85557.json new file mode 100644 index 00000000..ba091b72 --- /dev/null +++ b/.changes/next-release/enhancement-s3-85557.json @@ -0,0 +1,5 @@ +{ + "type": "enhancement", + "category": "``s3``", + "description": "Skip the HEAD request during S3 downloads when the client is configured with ``response_checksum_validation='when_required'``, reducing latency for small-object transfers. The HEAD request remains in place by default to enable full-object checksum validation." +} diff --git a/s3transfer/download.py b/s3transfer/download.py index f6646cd8..c6ed80fa 100644 --- a/s3transfer/download.py +++ b/s3transfer/download.py @@ -352,6 +352,26 @@ def _submit( :param bandwidth_limiter: The bandwidth limiter to use when downloading streams """ + download_output_manager = self._get_download_output_manager_cls( + transfer_future, osutil + )(osutil, self._transfer_coordinator, io_executor) + + # Skip the HEAD request only when the caller has explicitly opted out + # of response checksum validation. Otherwise we need the HEAD to + # obtain the full-object ETag/size for checksum validation. + if client.meta.config.response_checksum_validation == "when_required": + self._submit_first_chunk_request( + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ) + return + if ( transfer_future.meta.size is None or transfer_future.meta.etag is None @@ -370,10 +390,6 @@ def _submit( # during a multipart download. transfer_future.meta.provide_object_etag(response.get('ETag')) - download_output_manager = self._get_download_output_manager_cls( - transfer_future, osutil - )(osutil, self._transfer_coordinator, io_executor) - # If it is greater than threshold do a ranged download, otherwise # do a regular GetObject download. if transfer_future.meta.size < config.multipart_threshold: @@ -541,6 +557,214 @@ def _calculate_range_param(self, part_size, part_index, num_parts): range_param = f'bytes={start_range}-{end_range}' return range_param + def _submit_first_chunk_request( + self, + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ): + call_args = transfer_future.meta.call_args + + # Get a handle to the file that will be used for writing downloaded + # contents + fileobj = download_output_manager.get_fileobj_for_io_writes( + transfer_future + ) + + # Get the needed callbacks for the task + progress_callbacks = get_callbacks(transfer_future, 'progress') + + # Get any associated tags for the get object task. + get_object_tag = download_output_manager.get_download_task_tag() + + # Request first chunk to get object metadata from response headers + chunk_size = config.multipart_chunksize + extra_args = dict(call_args.extra_args) + extra_args['Range'] = f'bytes=0-{chunk_size - 1}' + + if transfer_future.meta.etag is not None: + extra_args['IfMatch'] = transfer_future.meta.etag + + # Callback will determine if additional chunks are needed based on + # the Content-Range header in the response + on_done_callback = GetObjectFirstChunkOnDoneCallback( + transfer_future, + download_output_manager, + io_executor, + self._transfer_coordinator, + client, + config, + request_executor, + bandwidth_limiter, + fileobj, + progress_callbacks, + get_object_tag, + ) + + task = GetObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'fileobj': fileobj, + 'extra_args': extra_args, + 'callbacks': progress_callbacks, + 'max_attempts': config.num_download_attempts, + 'start_index': 0, + 'download_output_manager': download_output_manager, + 'io_chunksize': config.io_chunksize, + 'bandwidth_limiter': bandwidth_limiter, + }, + done_callbacks=[on_done_callback], + ) + on_done_callback.set_task(task) + + self._transfer_coordinator.submit( + request_executor, + task, + tag=get_object_tag, + ) + + +class GetObjectFirstChunkOnDoneCallback: + def __init__( + self, + transfer_future, + download_output_manager, + io_executor, + transfer_coordinator, + client, + config, + request_executor, + bandwidth_limiter, + fileobj, + progress_callbacks, + get_object_tag, + ): + self._transfer_future = transfer_future + self._download_output_manager = download_output_manager + self._io_executor = io_executor + self._transfer_coordinator = transfer_coordinator + self._client = client + self._config = config + self._request_executor = request_executor + self._bandwidth_limiter = bandwidth_limiter + self._fileobj = fileobj + self._progress_callbacks = progress_callbacks + self._get_object_tag = get_object_tag + self._task = None + + def __call__(self): + if self._task is None: + raise RuntimeError( + "set_task() must be called before the task is submitted" + ) + + response = self._task.get_response() + if not response: + # No response means the GET failed or was cancelled + # Still need to submit final task to signal completion + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + return + + # If transfer is already done (cancelled/failed), don't schedule more work + # but still submit the final task + if self._transfer_coordinator.done(): + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + return + + size, etag = self._extract_metadata(response) + self._transfer_future.meta.provide_transfer_size(size) + self._transfer_future.meta.provide_object_etag(etag) + + if size == 0: + # Force-open the DeferredOpenFile so the temp file exists + # on disk for IORenameFileTask. Without this, the deferred + # file is never opened since no bytes are written. + self._fileobj.write(b'') + + chunk_size = self._config.multipart_chunksize + if size > chunk_size: + self._schedule_remaining_chunks(size, etag) + else: + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + + def set_task(self, task): + self._task = task + + def _extract_metadata(self, response): + content_range = response.get('ContentRange') + if content_range: + # Content-Range format: 'bytes 0-8388607/39542919' + # Extract total size from the part after the slash + size = int(content_range.split('/')[-1]) + else: + size = response['ContentLength'] + etag = response.get('ETag') + return size, etag + + def _schedule_remaining_chunks(self, size, etag): + call_args = self._transfer_future.meta.call_args + part_size = self._config.multipart_chunksize + num_parts = calculate_num_parts(size, part_size) + + # Callback invoker to submit the final io task once all downloads + # are complete. + final_task = self._download_output_manager.get_final_io_task() + finalize_download_invoker = CountCallbackInvoker( + FunctionContainer( + self._transfer_coordinator.submit, + self._io_executor, + final_task, + ) + ) + + # Start from 1 since chunk 0 was already requested + for i in range(1, num_parts): + range_parameter = calculate_range_parameter( + part_size, i, num_parts + ) + extra_args = { + 'Range': range_parameter, + } + # Use IfMatch to ensure object hasn't changed during download + if etag is not None: + extra_args['IfMatch'] = etag + extra_args.update(call_args.extra_args) + finalize_download_invoker.increment() + + self._transfer_coordinator.submit( + self._request_executor, + GetObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': self._client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'fileobj': self._fileobj, + 'extra_args': extra_args, + 'callbacks': self._progress_callbacks, + 'max_attempts': self._config.num_download_attempts, + 'start_index': i * part_size, + 'download_output_manager': self._download_output_manager, + 'io_chunksize': self._config.io_chunksize, + 'bandwidth_limiter': self._bandwidth_limiter, + }, + done_callbacks=[finalize_download_invoker.decrement], + ), + tag=self._get_object_tag, + ) + finalize_download_invoker.finalize() + class GetObjectTask(Task): def _main( @@ -582,6 +806,9 @@ def _main( response = client.get_object( Bucket=bucket, Key=key, **extra_args ) + # Store response so callback can extract metadata + self._response = response + self._validate_content_range( extra_args.get('Range'), response.get('ContentRange'), @@ -619,6 +846,13 @@ def _main( f'Contents of stored object "{key}" in bucket ' f'"{bucket}" did not match expected ETag.' ) + elif error_code == "InvalidRange": + self._response = { + 'ContentLength': 0, + 'ContentRange': None, + 'ETag': None, + } + return else: raise except S3_RETRYABLE_DOWNLOAD_ERRORS as e: @@ -643,6 +877,9 @@ def _main( def _handle_io(self, download_output_manager, fileobj, chunk, index): download_output_manager.queue_file_io_task(fileobj, chunk, index) + def get_response(self): + return getattr(self, '_response', None) + def _validate_content_range(self, requested_range, content_range): if not requested_range or not content_range: return @@ -650,18 +887,33 @@ def _validate_content_range(self, requested_range, content_range): # where `0-8388607` is the fetched range and `39542919` is # the total object size. response_range, total_size = content_range.split('/') - # Subtract `1` because range is 0-indexed. - final_byte = str(int(total_size) - 1) - # If it's the last part, the requested range will not include - # the final byte, eg `bytes=33554432-`. - if requested_range.endswith('-'): - requested_range += final_byte - # Request looks like `bytes=0-8388607`. - # Parsed response looks like `bytes 0-8388607`. - if requested_range[6:] != response_range[6:]: + # Parse requested range: `bytes=0-8388607` -> start=0, end=8388607 + req_range_part = requested_range[6:] # Remove 'bytes=' + if '-' not in req_range_part: + return + req_start, req_end = req_range_part.split('-', 1) + req_start = int(req_start) + # req_end might be empty for open-ended ranges + req_end = int(req_end) if req_end else int(total_size) - 1 + + # Parse response range: `bytes 0-8388607` -> start=0, end=8388607 + resp_range_part = response_range[6:] # Remove 'bytes ' + resp_start, resp_end = resp_range_part.split('-', 1) + resp_start = int(resp_start) + resp_end = int(resp_end) + + # Validate that response starts where we requested + if resp_start != req_start: + raise S3ValidationError( + f"Response range start `{resp_start}` does not match " + f"requested start `{req_start}`" + ) + + # Validate that response doesn't exceed what we requested + if resp_end > req_end: raise S3ValidationError( - f"Requested range: `{requested_range[6:]}` does not match " - f"content range in response: `{response_range[6:]}`" + f"Response range end `{resp_end}` exceeds " + f"requested end `{req_end}`" ) diff --git a/scripts/performance/time-batch-download.py b/scripts/performance/time-batch-download.py new file mode 100755 index 00000000..1cf04fc5 --- /dev/null +++ b/scripts/performance/time-batch-download.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Direct timing of batch downloads without shell wrapper.""" + +import argparse +import shutil +import tempfile +import time + +from botocore.session import get_session + +from s3transfer.manager import TransferManager + + +def create_file(filename, file_size): + with open(filename, 'wb') as f: + f.write(b'a' * file_size) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--file-count', type=int, required=True) + parser.add_argument('--file-size', type=int, required=True) + parser.add_argument('--s3-bucket', required=True) + args = parser.parse_args() + + session = get_session() + client = session.create_client('s3') + + tempdir = tempfile.mkdtemp() + s3_keys = [] + + try: + # Upload files + print(f"Uploading {args.file_count} files...") + with TransferManager(client) as manager: + for i in range(args.file_count): + file_path = f"{tempdir}/upload_{i}" + create_file(file_path, args.file_size) + s3_key = f"perf_test_{i}" + manager.upload(file_path, args.s3_bucket, s3_key) + s3_keys.append(s3_key) + + # Download files + print(f"Downloading {args.file_count} files...") + start_time = time.time() + with TransferManager(client) as manager: + for i, s3_key in enumerate(s3_keys): + download_path = f"{tempdir}/download_{i}" + manager.download(args.s3_bucket, s3_key, download_path) + duration = time.time() - start_time + + print(f"Download duration: {duration:.2f} seconds") + + # Cleanup + for s3_key in s3_keys: + client.delete_object(Bucket=args.s3_bucket, Key=s3_key) + finally: + shutil.rmtree(tempdir) + + +if __name__ == '__main__': + main() diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index b8d07427..28264c81 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -18,6 +18,7 @@ import time from io import BytesIO +from botocore.client import Config from botocore.exceptions import ClientError from s3transfer.compat import SOCKET_ERROR @@ -35,6 +36,7 @@ RecordingOSUtils, RecordingSubscriber, StreamWithError, + StubbedClientTest, skip_if_using_serial_implementation, skip_if_windows, ) @@ -576,7 +578,7 @@ def test_download_raises_if_content_range_mismatch(self): ) with self.assertRaises(S3ValidationError) as e: future.result() - self.assertIn('does not match content range', str(e.exception)) + self.assertIn('does not match requested start', str(e.exception)) def test_download_raises_if_etag_validation_fails(self): expected_params = { @@ -647,3 +649,199 @@ def test_download_without_etag(self): # Ensure that the contents are correct with open(self.filename, 'rb') as f: self.assertEqual(self.content, f.read()) + + +class TestDownloadWhenRequiredChecksumValidation(StubbedClientTest): + """Exercises the HEAD-less download path enabled when a client is + configured with ``response_checksum_validation="when_required"``. + + Note: unlike ``TestDownloadWhenRequired`` in the integ suite (which + subclasses ``TestDownload`` to re-run every inherited test method under + the alternate config), this class does not re-use the other functional + download tests. It only inherits ``StubbedClientTest`` for stubber setup + helpers and defines its own targeted tests below. The inherited + ``TestDownload`` / ``TestRangedDownload`` functional tests hard-code + HEAD-request expectations in their stubs, which do not apply when the + HEAD-less branch is taken. + """ + + def setUp(self): + super().setUp() + self.reset_stubber_with_new_client( + {'config': Config(response_checksum_validation="when_required")} + ) + self.config = TransferConfig(max_request_concurrency=1) + self.manager = TransferManager(self.client, self.config) + self.tempdir = tempfile.mkdtemp() + self.filename = os.path.join(self.tempdir, 'myfile') + self.bucket = 'mybucket' + self.key = 'mykey' + self.etag = 'myetag' + self.content = b'my content' + + def tearDown(self): + super().tearDown() + shutil.rmtree(self.tempdir) + + def test_single_chunk_download_skips_head(self): + chunk_size = self.config.multipart_chunksize + self.stubber.add_response( + method='get_object', + service_response={ + 'Body': BytesIO(self.content), + 'ContentRange': f'bytes 0-{len(self.content) - 1}/{len(self.content)}', + 'ETag': self.etag, + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': f'bytes=0-{chunk_size - 1}', + }, + ) + + future = self.manager.download(self.bucket, self.key, self.filename) + future.result() + + with open(self.filename, 'rb') as f: + self.assertEqual(self.content, f.read()) + + def test_empty_object_download_skips_second_get(self): + chunk_size = self.config.multipart_chunksize + # S3 returns InvalidRange for a ranged GET on a 0-byte object; + # the download path must surface an empty file without a retry GET. + self.stubber.add_client_error( + method='get_object', + service_error_code='InvalidRange', + service_message='The requested range is not satisfiable', + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': f'bytes=0-{chunk_size - 1}', + }, + ) + + future = self.manager.download(self.bucket, self.key, self.filename) + future.result() + + with open(self.filename, 'rb') as f: + self.assertEqual(b'', f.read()) + self.assertEqual(future.meta.size, 0) + + def test_multipart_download_schedules_remaining_chunks(self): + # Object larger than a single chunk should trigger scheduling of + # additional ranged GETs from _schedule_remaining_chunks, each + # carrying the IfMatch header from the first-chunk response ETag. + chunk_size = self.config.multipart_chunksize + total_size = chunk_size * 2 + 7 + content = b'x' * total_size + self.stubber.add_response( + method='get_object', + service_response={ + 'Body': BytesIO(content[:chunk_size]), + 'ContentRange': f'bytes 0-{chunk_size - 1}/{total_size}', + 'ETag': self.etag, + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': f'bytes=0-{chunk_size - 1}', + }, + ) + for i in (1, 2): + start = i * chunk_size + end = min(start + chunk_size, total_size) - 1 + range_header = ( + f'bytes={start}-' if i == 2 else f'bytes={start}-{end}' + ) + self.stubber.add_response( + method='get_object', + service_response={'Body': BytesIO(content[start : end + 1])}, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': range_header, + 'IfMatch': self.etag, + }, + ) + + future = self.manager.download(self.bucket, self.key, self.filename) + future.result() + + with open(self.filename, 'rb') as f: + self.assertEqual(content, f.read()) + + def test_first_get_uses_ifmatch_when_etag_provided(self): + # If the caller pre-provides the ETag (e.g. via a subscriber from a + # prior HEAD), the first GET must also carry IfMatch. + chunk_size = self.config.multipart_chunksize + self.stubber.add_response( + method='get_object', + service_response={ + 'Body': BytesIO(self.content), + 'ContentRange': f'bytes 0-{len(self.content) - 1}/{len(self.content)}', + 'ETag': self.etag, + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': f'bytes=0-{chunk_size - 1}', + 'IfMatch': self.etag, + }, + ) + + future = self.manager.download( + self.bucket, + self.key, + self.filename, + subscribers=[ETagProvider(self.etag)], + ) + future.result() + + def test_zero_length_response_writes_empty_file(self): + # Some callers may return a successful 0-byte response (ContentLength + # 0) instead of InvalidRange; the download must still produce the + # output file by forcing the DeferredOpenFile open. + chunk_size = self.config.multipart_chunksize + self.stubber.add_response( + method='get_object', + service_response={ + 'Body': BytesIO(b''), + 'ContentLength': 0, + 'ETag': self.etag, + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': f'bytes=0-{chunk_size - 1}', + }, + ) + + future = self.manager.download(self.bucket, self.key, self.filename) + future.result() + + with open(self.filename, 'rb') as f: + self.assertEqual(b'', f.read()) + self.assertEqual(future.meta.size, 0) + + def test_download_raises_if_content_range_mismatch(self): + # The first-chunk response's ContentRange must match what we asked + # for. A mismatched start surfaces S3ValidationError via future.result(). + chunk_size = self.config.multipart_chunksize + self.stubber.add_response( + method='get_object', + service_response={ + 'Body': BytesIO(self.content), + 'ContentRange': f'bytes 100-{len(self.content) - 1}/{len(self.content)}', + 'ETag': self.etag, + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': f'bytes=0-{chunk_size - 1}', + }, + ) + + future = self.manager.download(self.bucket, self.key, self.filename) + with self.assertRaises(S3ValidationError) as e: + future.result() + self.assertIn('does not match requested start', str(e.exception)) diff --git a/tests/integration/test_download.py b/tests/integration/test_download.py index da08514e..27b33b20 100644 --- a/tests/integration/test_download.py +++ b/tests/integration/test_download.py @@ -16,7 +16,9 @@ import time from concurrent.futures import CancelledError -from s3transfer.manager import TransferConfig +from botocore.client import Config + +from s3transfer.manager import TransferConfig, TransferManager from tests import ( NonSeekableWriter, RecordingSubscriber, @@ -38,6 +40,25 @@ def setUp(self): multipart_threshold=self.multipart_threshold ) + def test_download_empty_object(self): + transfer_manager = self.create_transfer_manager(self.config) + + # Upload a 0-byte object + self.client.put_object( + Bucket=self.bucket_name, Key='empty.txt', Body=b'' + ) + self.addCleanup(self.delete_object, 'empty.txt') + + download_path = os.path.join(self.files.rootdir, 'empty.txt') + future = transfer_manager.download( + self.bucket_name, 'empty.txt', download_path + ) + future.result() + + with open(download_path, 'rb') as f: + self.assertEqual(b'', f.read()) + self.assertEqual(future.meta.size, 0) + def test_below_threshold(self): transfer_manager = self.create_transfer_manager(self.config) @@ -282,3 +303,26 @@ def test_download_to_special_file(self): 'Should have been able to download to /dev/null but received ' f'following exception {e}' ) + + +class TestDownloadWhenRequired(TestDownload): + """Re-runs every test method inherited from ``TestDownload`` against the + HEAD-less download path. + + Subclassing, not duplication: each ``test_*`` method defined on + ``TestDownload`` is discovered again here and executed with the overridden + ``create_transfer_manager`` below, which hands the ``TransferManager`` a + client configured with ``response_checksum_validation='when_required'``. + That config toggle is what activates the HEAD-less branch in + ``DownloadSubmissionTask._submit``, so these tests exercise the same + end-to-end behaviour as ``TestDownload`` but through the alternate code + path. + """ + + def create_transfer_manager(self, config=None): + client = self.session.create_client( + 's3', + self.region, + config=Config(response_checksum_validation='when_required'), + ) + return TransferManager(client, config=config) diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index f042b677..b9893f1c 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -27,6 +27,7 @@ DownloadSeekableOutputManager, DownloadSpecialFilenameOutputManager, DownloadSubmissionTask, + GetObjectFirstChunkOnDoneCallback, GetObjectTask, ImmediatelyWriteIOGetObjectTask, IOCloseTask, @@ -1044,3 +1045,55 @@ def test_duplicate_writes_longer_length_update_queue(self): {'offset': 1, 'data': 'bar'}, ], ) + + +class TestGetObjectFirstChunkOnDoneCallback(unittest.TestCase): + """Covers defensive fallbacks in the no-HEAD first-chunk callback. + + Each branch submits the final IO task so the download future completes + even when the first GET fails to produce a usable response. + """ + + def _make_callback(self): + self.io_executor = mock.Mock() + self.transfer_coordinator = mock.Mock() + self.download_output_manager = mock.Mock() + self.final_task = mock.sentinel.final_task + self.download_output_manager.get_final_io_task.return_value = ( + self.final_task + ) + return GetObjectFirstChunkOnDoneCallback( + transfer_future=mock.Mock(), + download_output_manager=self.download_output_manager, + io_executor=self.io_executor, + transfer_coordinator=self.transfer_coordinator, + client=mock.Mock(), + config=mock.Mock(), + request_executor=mock.Mock(), + bandwidth_limiter=None, + fileobj=mock.Mock(), + progress_callbacks=[], + get_object_tag=None, + ) + + def _assert_only_final_task_submitted(self): + self.transfer_coordinator.submit.assert_called_once_with( + self.io_executor, self.final_task + ) + + def test_no_response_submits_final_task(self): + callback = self._make_callback() + task = mock.Mock() + task.get_response.return_value = None + callback.set_task(task) + callback() + self._assert_only_final_task_submitted() + + def test_transfer_done_submits_final_task(self): + callback = self._make_callback() + task = mock.Mock() + task.get_response.return_value = {'ContentLength': 5, 'ETag': 'e'} + callback.set_task(task) + self.transfer_coordinator.done.return_value = True + callback() + self._assert_only_final_task_submitted()