Skip to content

Commit dccb855

Browse files
kyleknaphssyoo
authored andcommitted
Support file-like objects in CRT transfer manager
This is a backport of this pull request: aws/aws-cli#8291
1 parent c764323 commit dccb855

File tree

6 files changed

+531
-82
lines changed

6 files changed

+531
-82
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "enhancement",
3+
"category": "``crt``",
4+
"description": "Add support for uploading and downloading file-like objects using CRT transfer manager. It supports both seekable and non-seekable file-like objects."
5+
}

s3transfer/crt.py

Lines changed: 130 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -428,19 +428,12 @@ def _crt_request_from_aws_request(self, aws_request):
428428
headers_list.append((name, str(value, 'utf-8')))
429429

430430
crt_headers = awscrt.http.HttpHeaders(headers_list)
431-
# CRT requires body (if it exists) to be an I/O stream.
432-
crt_body_stream = None
433-
if aws_request.body:
434-
if hasattr(aws_request.body, 'seek'):
435-
crt_body_stream = aws_request.body
436-
else:
437-
crt_body_stream = BytesIO(aws_request.body)
438431

439432
crt_request = awscrt.http.HttpRequest(
440433
method=aws_request.method,
441434
path=crt_path,
442435
headers=crt_headers,
443-
body_stream=crt_body_stream,
436+
body_stream=aws_request.body,
444437
)
445438
return crt_request
446439

@@ -453,6 +446,25 @@ def _convert_to_crt_http_request(self, botocore_http_request):
453446
crt_request.headers.set("host", url_parts.netloc)
454447
if crt_request.headers.get('Content-MD5') is not None:
455448
crt_request.headers.remove("Content-MD5")
449+
450+
# In general, the CRT S3 client expects a content length header. It
451+
# only expects a missing content length header if the body is not
452+
# seekable. However, botocore does not set the content length header
453+
# for GetObject API requests and so we set the content length to zero
454+
# to meet the CRT S3 client's expectation that the content length
455+
# header is set even if there is no body.
456+
if crt_request.headers.get('Content-Length') is None:
457+
if botocore_http_request.body is None:
458+
crt_request.headers.add('Content-Length', "0")
459+
460+
# Botocore sets the Transfer-Encoding header when it cannot determine
461+
# the content length of the request body (e.g. it's not seekable).
462+
# However, CRT does not support this header, but it supports
463+
# non-seekable bodies. So we remove this header to not cause issues
464+
# in the downstream CRT S3 request.
465+
if crt_request.headers.get('Transfer-Encoding') is not None:
466+
crt_request.headers.remove('Transfer-Encoding')
467+
456468
return crt_request
457469

458470
def _capture_http_request(self, request, **kwargs):
@@ -555,39 +567,20 @@ def __init__(self, crt_request_serializer, os_utils):
555567
def get_make_request_args(
556568
self, request_type, call_args, coordinator, future, on_done_after_calls
557569
):
558-
recv_filepath = None
559-
send_filepath = None
560-
s3_meta_request_type = getattr(
561-
S3RequestType, request_type.upper(), S3RequestType.DEFAULT
570+
request_args_handler = getattr(
571+
self,
572+
f'_get_make_request_args_{request_type}',
573+
self._default_get_make_request_args,
562574
)
563-
on_done_before_calls = []
564-
if s3_meta_request_type == S3RequestType.GET_OBJECT:
565-
final_filepath = call_args.fileobj
566-
recv_filepath = self._os_utils.get_temp_filename(final_filepath)
567-
file_ondone_call = RenameTempFileHandler(
568-
coordinator, final_filepath, recv_filepath, self._os_utils
569-
)
570-
on_done_before_calls.append(file_ondone_call)
571-
elif s3_meta_request_type == S3RequestType.PUT_OBJECT:
572-
send_filepath = call_args.fileobj
573-
data_len = self._os_utils.get_file_size(send_filepath)
574-
call_args.extra_args["ContentLength"] = data_len
575-
576-
crt_request = self._request_serializer.serialize_http_request(
577-
request_type, future
575+
return request_args_handler(
576+
request_type=request_type,
577+
call_args=call_args,
578+
coordinator=coordinator,
579+
future=future,
580+
on_done_before_calls=[],
581+
on_done_after_calls=on_done_after_calls,
578582
)
579583

580-
return {
581-
'request': crt_request,
582-
'type': s3_meta_request_type,
583-
'recv_filepath': recv_filepath,
584-
'send_filepath': send_filepath,
585-
'on_done': self.get_crt_callback(
586-
future, 'done', on_done_before_calls, on_done_after_calls
587-
),
588-
'on_progress': self.get_crt_callback(future, 'progress'),
589-
}
590-
591584
def get_crt_callback(
592585
self,
593586
future,
@@ -613,6 +606,97 @@ def invoke_all_callbacks(*args, **kwargs):
613606

614607
return invoke_all_callbacks
615608

609+
def _get_make_request_args_put_object(
610+
self,
611+
request_type,
612+
call_args,
613+
coordinator,
614+
future,
615+
on_done_before_calls,
616+
on_done_after_calls,
617+
):
618+
send_filepath = None
619+
if isinstance(call_args.fileobj, str):
620+
send_filepath = call_args.fileobj
621+
data_len = self._os_utils.get_file_size(send_filepath)
622+
call_args.extra_args["ContentLength"] = data_len
623+
else:
624+
call_args.extra_args["Body"] = call_args.fileobj
625+
626+
# Suppress botocore's automatic MD5 calculation by setting an override
627+
# value that will get deleted in the BotocoreCRTRequestSerializer.
628+
# The CRT S3 client is able automatically compute checksums as part of
629+
# requests it makes, and the intention is to configure automatic
630+
# checksums in a future update.
631+
call_args.extra_args["ContentMD5"] = "override-to-be-removed"
632+
633+
make_request_args = self._default_get_make_request_args(
634+
request_type=request_type,
635+
call_args=call_args,
636+
coordinator=coordinator,
637+
future=future,
638+
on_done_before_calls=on_done_before_calls,
639+
on_done_after_calls=on_done_after_calls,
640+
)
641+
make_request_args['send_filepath'] = send_filepath
642+
return make_request_args
643+
644+
def _get_make_request_args_get_object(
645+
self,
646+
request_type,
647+
call_args,
648+
coordinator,
649+
future,
650+
on_done_before_calls,
651+
on_done_after_calls,
652+
):
653+
recv_filepath = None
654+
on_body = None
655+
if isinstance(call_args.fileobj, str):
656+
final_filepath = call_args.fileobj
657+
recv_filepath = self._os_utils.get_temp_filename(final_filepath)
658+
on_done_before_calls.append(
659+
RenameTempFileHandler(
660+
coordinator, final_filepath, recv_filepath, self._os_utils
661+
)
662+
)
663+
else:
664+
on_body = OnBodyFileObjWriter(call_args.fileobj)
665+
666+
make_request_args = self._default_get_make_request_args(
667+
request_type=request_type,
668+
call_args=call_args,
669+
coordinator=coordinator,
670+
future=future,
671+
on_done_before_calls=on_done_before_calls,
672+
on_done_after_calls=on_done_after_calls,
673+
)
674+
make_request_args['recv_filepath'] = recv_filepath
675+
make_request_args['on_body'] = on_body
676+
return make_request_args
677+
678+
def _default_get_make_request_args(
679+
self,
680+
request_type,
681+
call_args,
682+
coordinator,
683+
future,
684+
on_done_before_calls,
685+
on_done_after_calls,
686+
):
687+
return {
688+
'request': self._request_serializer.serialize_http_request(
689+
request_type, future
690+
),
691+
'type': getattr(
692+
S3RequestType, request_type.upper(), S3RequestType.DEFAULT
693+
),
694+
'on_done': self.get_crt_callback(
695+
future, 'done', on_done_before_calls, on_done_after_calls
696+
),
697+
'on_progress': self.get_crt_callback(future, 'progress'),
698+
}
699+
616700

617701
class RenameTempFileHandler:
618702
def __init__(self, coordinator, final_filename, temp_filename, osutil):
@@ -642,3 +726,11 @@ def __init__(self, coordinator):
642726

643727
def __call__(self, **kwargs):
644728
self._coordinator.set_done_callbacks_complete()
729+
730+
731+
class OnBodyFileObjWriter:
732+
def __init__(self, fileobj):
733+
self._fileobj = fileobj
734+
735+
def __call__(self, chunk, **kwargs):
736+
self._fileobj.write(chunk)

tests/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,9 @@ def write(self, b):
509509
def read(self, n=-1):
510510
return self._data.read(n)
511511

512+
def readinto(self, b):
513+
return self._data.readinto(b)
514+
512515

513516
class NonSeekableWriter(io.RawIOBase):
514517
def __init__(self, fileobj):

0 commit comments

Comments
 (0)