Skip to content

Commit 6bc15dc

Browse files
committed
hack to streaming data
1 parent 1762f83 commit 6bc15dc

File tree

6 files changed

+105
-16
lines changed

6 files changed

+105
-16
lines changed

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
9999
struct aws_allocator *allocator,
100100
struct aws_byte_cursor file_name);
101101

102+
const char *aws_parallel_input_stream_get_file_path(struct aws_parallel_input_stream *stream);
103+
102104
AWS_EXTERN_C_END
103105
AWS_POP_SANE_WARNING_LEVEL
104106

include/aws/s3/private/s3_request.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ struct aws_s3_request {
138138
/* Request body to use when sending the request. The contents of this body will be re-used if a request is
139139
* retried.*/
140140
struct aws_byte_buf request_body;
141+
struct aws_input_stream *request_stream;
142+
bool parallel;
143+
int64_t content_length;
141144

142145
/**
143146
* Ticket to acquire the buffer.

include/aws/s3/private/s3_request_messages.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,18 @@ struct aws_http_message *aws_s3_create_multipart_upload_message_new(
7878
struct aws_http_message *base_message,
7979
const struct checksum_config_storage *checksum_config);
8080

81-
/* Create an HTTP request for an S3 Put Object request, using the original request as a basis. Creates and assigns a
82-
* body stream using the passed in buffer. If multipart is not needed, part number and upload_id can be 0 and NULL,
83-
* respectively. */
84-
AWS_S3_API
85-
struct aws_http_message *aws_s3_upload_part_message_new(
81+
struct aws_http_message *aws_s3_upload_part_message_new_streaming(
82+
struct aws_allocator *allocator,
83+
struct aws_http_message *base_message,
84+
uint32_t part_number,
85+
struct aws_input_stream *input_stream,
86+
int64_t stream_length,
87+
const struct aws_string *upload_id);
88+
89+
/* Create an HTTP request for an S3 Put Object request, using the original request as a basis. Creates and assigns
90+
* a body stream using the passed in buffer. If multipart is not needed, part number and upload_id can be 0 and
91+
* NULL, respectively. */
92+
AWS_S3_API struct aws_http_message *aws_s3_upload_part_message_new(
8693
struct aws_allocator *allocator,
8794
struct aws_http_message *base_message,
8895
struct aws_byte_buf *buffer,

source/s3_auto_ranged_put.c

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "aws/s3/private/s3_auto_ranged_put.h"
77
#include "aws/s3/private/s3_checksums.h"
88
#include "aws/s3/private/s3_list_parts.h"
9+
#include "aws/s3/private/s3_parallel_input_stream.h"
910
#include "aws/s3/private/s3_request_messages.h"
1011
#include "aws/s3/private/s3_util.h"
1112
#include <aws/common/clock.h>
@@ -841,7 +842,7 @@ void s_s3_auto_ranged_put_schedule_prepare_request(
841842
* reading. */
842843
bool parallel_prepare =
843844
(meta_request->request_body_parallel_stream && request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART);
844-
845+
request->parallel = parallel_prepare;
845846
aws_s3_meta_request_schedule_prepare_request_default_impl(
846847
meta_request, request, parallel_prepare /*parallel*/, callback, user_data);
847848
}
@@ -974,8 +975,16 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
974975
part_prep->allocator = allocator;
975976
part_prep->request = request;
976977
part_prep->on_complete = aws_future_http_message_acquire(message_future);
978+
if (request->parallel) {
977979

978-
if (request->num_times_prepared == 0) {
980+
uint64_t offset = 0;
981+
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
982+
request->request_stream = aws_input_stream_new_from_file(
983+
allocator, aws_parallel_input_stream_get_file_path(meta_request->request_body_parallel_stream));
984+
request->content_length = request_body_size;
985+
aws_input_stream_seek(request->request_stream, offset, AWS_SSB_BEGIN);
986+
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_SUCCESS);
987+
} else if (request->num_times_prepared == 0) {
979988
/* Preparing request for the first time.
980989
* Next async step: read through the body stream until we've
981990
* skipped over parts that were already uploaded (in case we're resuming
@@ -1155,15 +1164,26 @@ static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_jo
11551164
}
11561165

11571166
/* Create a new put-object message to upload a part. */
1158-
struct aws_http_message *message = aws_s3_upload_part_message_new(
1159-
meta_request->allocator,
1160-
meta_request->initial_request_message,
1161-
&request->request_body,
1162-
request->part_number,
1163-
auto_ranged_put->upload_id,
1164-
meta_request->should_compute_content_md5,
1165-
&meta_request->checksum_config,
1166-
checksum_buf);
1167+
struct aws_http_message *message = NULL;
1168+
if (request->parallel) {
1169+
message = aws_s3_upload_part_message_new_streaming(
1170+
meta_request->allocator,
1171+
meta_request->initial_request_message,
1172+
request->part_number,
1173+
request->request_stream,
1174+
request->content_length,
1175+
auto_ranged_put->upload_id);
1176+
} else {
1177+
message = aws_s3_upload_part_message_new(
1178+
meta_request->allocator,
1179+
meta_request->initial_request_message,
1180+
&request->request_body,
1181+
request->part_number,
1182+
auto_ranged_put->upload_id,
1183+
meta_request->should_compute_content_md5,
1184+
&meta_request->checksum_config,
1185+
checksum_buf);
1186+
}
11671187
if (message == NULL) {
11681188
aws_future_http_message_set_error(part_prep->on_complete, aws_last_error());
11691189
goto on_done;

source/s3_parallel_input_stream.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,8 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
138138
s_para_from_file_destroy(&impl->base);
139139
return NULL;
140140
}
141+
142+
const char *aws_parallel_input_stream_get_file_path(struct aws_parallel_input_stream *stream) {
143+
struct aws_parallel_input_stream_from_file_impl *impl = stream->impl;
144+
return aws_string_c_str(impl->file_path);
145+
}

source/s3_request_messages.c

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,58 @@ struct aws_http_message *aws_s3_create_multipart_upload_message_new(
352352
return NULL;
353353
}
354354

355+
/* Create a new put object request from an existing put object request. Currently just optionally adds part information
356+
* for a multipart upload. */
357+
struct aws_http_message *aws_s3_upload_part_message_new_streaming(
358+
struct aws_allocator *allocator,
359+
struct aws_http_message *base_message,
360+
uint32_t part_number,
361+
struct aws_input_stream *input_stream,
362+
int64_t stream_length,
363+
const struct aws_string *upload_id) {
364+
AWS_PRECONDITION(allocator);
365+
AWS_PRECONDITION(base_message);
366+
AWS_PRECONDITION(part_number > 0);
367+
368+
struct aws_http_message *message = aws_s3_message_util_copy_http_message_no_body_filter_headers(
369+
allocator,
370+
base_message,
371+
g_s3_upload_part_excluded_headers,
372+
AWS_ARRAY_SIZE(g_s3_upload_part_excluded_headers),
373+
true /*exclude_x_amz_meta*/);
374+
375+
if (message == NULL) {
376+
return NULL;
377+
}
378+
379+
if (aws_s3_message_util_set_multipart_request_path(allocator, upload_id, part_number, false, message)) {
380+
goto error_clean_up;
381+
}
382+
struct aws_http_headers *headers = aws_http_message_get_headers(message);
383+
384+
if (headers == NULL) {
385+
return NULL;
386+
}
387+
388+
char content_length_buffer[64] = "";
389+
snprintf(content_length_buffer, sizeof(content_length_buffer), "%" PRIu64, (uint64_t)stream_length);
390+
struct aws_byte_cursor content_length_cursor =
391+
aws_byte_cursor_from_array(content_length_buffer, strlen(content_length_buffer));
392+
if (aws_http_headers_set(headers, g_content_length_header_name, content_length_cursor)) {
393+
goto error_clean_up;
394+
}
395+
396+
aws_http_message_set_body_stream(message, input_stream);
397+
/* Let the message take the full ownership */
398+
aws_input_stream_release(input_stream);
399+
400+
return message;
401+
402+
error_clean_up:
403+
aws_http_message_release(message);
404+
return NULL;
405+
}
406+
355407
/* Create a new put object request from an existing put object request. Currently just optionally adds part information
356408
* for a multipart upload. */
357409
struct aws_http_message *aws_s3_upload_part_message_new(

0 commit comments

Comments
 (0)