@@ -356,17 +356,207 @@ def _submit(
356356 transfer_future , osutil
357357 )(osutil , self ._transfer_coordinator , io_executor )
358358
359- self ._submit_first_chunk_request (
360- client ,
361- config ,
362- osutil ,
359+ # Skip the HEAD request only when the caller has explicitly opted out
360+ # of response checksum validation. Otherwise we need the HEAD to
361+ # obtain the full-object ETag/size for checksum validation.
362+ if client .meta .config .response_checksum_validation == "when_required" :
363+ self ._submit_first_chunk_request (
364+ client ,
365+ config ,
366+ osutil ,
367+ request_executor ,
368+ io_executor ,
369+ download_output_manager ,
370+ transfer_future ,
371+ bandwidth_limiter ,
372+ )
373+ return
374+
375+ if (
376+ transfer_future .meta .size is None
377+ or transfer_future .meta .etag is None
378+ ):
379+ response = client .head_object (
380+ Bucket = transfer_future .meta .call_args .bucket ,
381+ Key = transfer_future .meta .call_args .key ,
382+ ** transfer_future .meta .call_args .extra_args ,
383+ )
384+ # If a size was not provided figure out the size for the
385+ # user.
386+ transfer_future .meta .provide_transfer_size (
387+ response ['ContentLength' ]
388+ )
389+ # Provide an etag to ensure a stored object is not modified
390+ # during a multipart download.
391+ transfer_future .meta .provide_object_etag (response .get ('ETag' ))
392+
393+ # If it is greater than threshold do a ranged download, otherwise
394+ # do a regular GetObject download.
395+ if transfer_future .meta .size < config .multipart_threshold :
396+ self ._submit_download_request (
397+ client ,
398+ config ,
399+ osutil ,
400+ request_executor ,
401+ io_executor ,
402+ download_output_manager ,
403+ transfer_future ,
404+ bandwidth_limiter ,
405+ )
406+ else :
407+ self ._submit_ranged_download_request (
408+ client ,
409+ config ,
410+ osutil ,
411+ request_executor ,
412+ io_executor ,
413+ download_output_manager ,
414+ transfer_future ,
415+ bandwidth_limiter ,
416+ )
417+
418+ def _submit_download_request (
419+ self ,
420+ client ,
421+ config ,
422+ osutil ,
423+ request_executor ,
424+ io_executor ,
425+ download_output_manager ,
426+ transfer_future ,
427+ bandwidth_limiter ,
428+ ):
429+ call_args = transfer_future .meta .call_args
430+
431+ # Get a handle to the file that will be used for writing downloaded
432+ # contents
433+ fileobj = download_output_manager .get_fileobj_for_io_writes (
434+ transfer_future
435+ )
436+
437+ # Get the needed callbacks for the task
438+ progress_callbacks = get_callbacks (transfer_future , 'progress' )
439+
440+ # Get any associated tags for the get object task.
441+ get_object_tag = download_output_manager .get_download_task_tag ()
442+
443+ # Get the final io task to run once the download is complete.
444+ final_task = download_output_manager .get_final_io_task ()
445+
446+ # Submit the task to download the object.
447+ self ._transfer_coordinator .submit (
363448 request_executor ,
364- io_executor ,
365- download_output_manager ,
366- transfer_future ,
367- bandwidth_limiter ,
449+ ImmediatelyWriteIOGetObjectTask (
450+ transfer_coordinator = self ._transfer_coordinator ,
451+ main_kwargs = {
452+ 'client' : client ,
453+ 'bucket' : call_args .bucket ,
454+ 'key' : call_args .key ,
455+ 'fileobj' : fileobj ,
456+ 'extra_args' : call_args .extra_args ,
457+ 'callbacks' : progress_callbacks ,
458+ 'max_attempts' : config .num_download_attempts ,
459+ 'download_output_manager' : download_output_manager ,
460+ 'io_chunksize' : config .io_chunksize ,
461+ 'bandwidth_limiter' : bandwidth_limiter ,
462+ },
463+ done_callbacks = [final_task ],
464+ ),
465+ tag = get_object_tag ,
368466 )
369467
468+ def _submit_ranged_download_request (
469+ self ,
470+ client ,
471+ config ,
472+ osutil ,
473+ request_executor ,
474+ io_executor ,
475+ download_output_manager ,
476+ transfer_future ,
477+ bandwidth_limiter ,
478+ ):
479+ call_args = transfer_future .meta .call_args
480+
481+ # Get the needed progress callbacks for the task
482+ progress_callbacks = get_callbacks (transfer_future , 'progress' )
483+
484+ # Get a handle to the file that will be used for writing downloaded
485+ # contents
486+ fileobj = download_output_manager .get_fileobj_for_io_writes (
487+ transfer_future
488+ )
489+
490+ # Determine the number of parts
491+ part_size = config .multipart_chunksize
492+ num_parts = calculate_num_parts (transfer_future .meta .size , part_size )
493+
494+ # Get any associated tags for the get object task.
495+ get_object_tag = download_output_manager .get_download_task_tag ()
496+
497+ # Callback invoker to submit the final io task once all downloads
498+ # are complete.
499+ finalize_download_invoker = CountCallbackInvoker (
500+ self ._get_final_io_task_submission_callback (
501+ download_output_manager , io_executor
502+ )
503+ )
504+ for i in range (num_parts ):
505+ # Calculate the range parameter
506+ range_parameter = calculate_range_parameter (
507+ part_size , i , num_parts
508+ )
509+
510+ # Inject extra parameters to be passed in as extra args
511+ extra_args = {
512+ 'Range' : range_parameter ,
513+ }
514+ if transfer_future .meta .etag is not None :
515+ extra_args ['IfMatch' ] = transfer_future .meta .etag
516+ extra_args .update (call_args .extra_args )
517+ finalize_download_invoker .increment ()
518+ # Submit the ranged downloads
519+ self ._transfer_coordinator .submit (
520+ request_executor ,
521+ GetObjectTask (
522+ transfer_coordinator = self ._transfer_coordinator ,
523+ main_kwargs = {
524+ 'client' : client ,
525+ 'bucket' : call_args .bucket ,
526+ 'key' : call_args .key ,
527+ 'fileobj' : fileobj ,
528+ 'extra_args' : extra_args ,
529+ 'callbacks' : progress_callbacks ,
530+ 'max_attempts' : config .num_download_attempts ,
531+ 'start_index' : i * part_size ,
532+ 'download_output_manager' : download_output_manager ,
533+ 'io_chunksize' : config .io_chunksize ,
534+ 'bandwidth_limiter' : bandwidth_limiter ,
535+ },
536+ done_callbacks = [finalize_download_invoker .decrement ],
537+ ),
538+ tag = get_object_tag ,
539+ )
540+ finalize_download_invoker .finalize ()
541+
542+ def _get_final_io_task_submission_callback (
543+ self , download_manager , io_executor
544+ ):
545+ final_task = download_manager .get_final_io_task ()
546+ return FunctionContainer (
547+ self ._transfer_coordinator .submit , io_executor , final_task
548+ )
549+
550+ def _calculate_range_param (self , part_size , part_index , num_parts ):
551+ # Used to calculate the Range parameter
552+ start_range = part_index * part_size
553+ if part_index == num_parts - 1 :
554+ end_range = ''
555+ else :
556+ end_range = start_range + part_size - 1
557+ range_param = f'bytes={ start_range } -{ end_range } '
558+ return range_param
559+
370560 def _submit_first_chunk_request (
371561 self ,
372562 client ,
0 commit comments