Skip to content

Commit bc5336e

Browse files
committed
hack the input stream
1 parent 886e640 commit bc5336e

File tree

3 files changed

+146
-37
lines changed

3 files changed

+146
-37
lines changed

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,12 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
100100
struct aws_byte_cursor file_name);
101101

102102
const char *aws_parallel_input_stream_get_file_path(struct aws_parallel_input_stream *stream);
103-
103+
struct aws_input_stream *aws_input_stream_new_from_parallel(
104+
struct aws_allocator *allocator,
105+
struct aws_parallel_input_stream *parallel_stream,
106+
uint64_t offset,
107+
size_t request_body_size);
108+
void aws_s3_part_streaming_input_stream_reset(struct aws_input_stream *stream);
104109
AWS_EXTERN_C_END
105110
AWS_POP_SANE_WARNING_LEVEL
106111

source/s3_auto_ranged_put.c

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -976,47 +976,54 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
976976
part_prep->request = request;
977977
part_prep->on_complete = aws_future_http_message_acquire(message_future);
978978
if (request->parallel) {
979-
printf("PARALLEL\n");
980-
uint64_t offset = 0;
981-
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
982-
request->request_stream = aws_input_stream_new_from_file(
983-
allocator, aws_parallel_input_stream_get_file_path(meta_request->request_body_parallel_stream));
984-
request->content_length = request_body_size;
985-
aws_input_stream_seek(request->request_stream, offset, AWS_SSB_BEGIN);
986-
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
987-
988-
/* BEGIN CRITICAL SECTION */
989-
aws_s3_meta_request_lock_synced_data(meta_request);
979+
if (request->num_times_prepared == 0) {
980+
uint64_t offset = 0;
981+
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
982+
request->request_stream = aws_input_stream_new_from_parallel(
983+
allocator, meta_request->request_body_parallel_stream, offset, request_body_size);
984+
request->content_length = request_body_size;
985+
struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
986+
987+
/* BEGIN CRITICAL SECTION */
988+
aws_s3_meta_request_lock_synced_data(meta_request);
990989

991-
--auto_ranged_put->synced_data.num_parts_pending_read;
990+
--auto_ranged_put->synced_data.num_parts_pending_read;
992991

993-
auto_ranged_put->synced_data.is_body_stream_at_end = false;
994-
if (!request->is_noop) {
995-
/* The part can finish out of order. Resize array-list to be long enough to hold this part,
996-
* filling any intermediate slots with NULL. */
997-
aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
998-
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
999-
struct aws_s3_mpu_part_info *null_part = NULL;
1000-
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
992+
auto_ranged_put->synced_data.is_body_stream_at_end = false;
993+
if (!request->is_noop) {
994+
/* The part can finish out of order. Resize array-list to be long enough to hold this part,
995+
* filling any intermediate slots with NULL. */
996+
aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
997+
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
998+
struct aws_s3_mpu_part_info *null_part = NULL;
999+
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
1000+
}
1001+
/* Add part to array-list */
1002+
struct aws_s3_mpu_part_info *part =
1003+
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
1004+
part->size = request->request_body.len;
1005+
aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1);
10011006
}
1002-
/* Add part to array-list */
1003-
struct aws_s3_mpu_part_info *part =
1004-
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
1005-
part->size = request->request_body.len;
1006-
aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1);
1007-
}
1008-
aws_s3_meta_request_unlock_synced_data(meta_request);
1009-
/* END CRITICAL SECTION */
1007+
aws_s3_meta_request_unlock_synced_data(meta_request);
1008+
/* END CRITICAL SECTION */
10101009

1011-
/* We throttle the number of parts that can be "pending read"
1012-
* (e.g. only 1 at a time if reading from async-stream).
1013-
* Now that read is complete, poke the client to see if it can give us more work.
1014-
*
1015-
* Poking now gives measurable speedup (1%) for async streaming,
1016-
* vs waiting until all the part-prep steps are complete (still need to sign, etc) */
1017-
aws_s3_client_schedule_process_work(meta_request->client);
1010+
/* We throttle the number of parts that can be "pending read"
1011+
* (e.g. only 1 at a time if reading from async-stream).
1012+
* Now that read is complete, poke the client to see if it can give us more work.
1013+
*
1014+
* Poking now gives measurable speedup (1%) for async streaming,
1015+
* vs waiting until all the part-prep steps are complete (still need to sign, etc) */
1016+
aws_s3_client_schedule_process_work(meta_request->client);
10181017

1019-
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_SUCCESS);
1018+
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_SUCCESS);
1019+
} else {
1020+
printf("PARALLEL retry\n");
1021+
/* Not the first time preparing request (e.g. retry).
1022+
* We can skip over the async steps that read the body stream */
1023+
/* Seek back to beginning of the stream. */
1024+
aws_s3_part_streaming_input_stream_reset(request->request_stream);
1025+
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_SUCCESS);
1026+
}
10201027
} else if (request->num_times_prepared == 0) {
10211028
/* Preparing request for the first time.
10221029
* Next async step: read through the body stream until we've

source/s3_parallel_input_stream.c

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,100 @@ const char *aws_parallel_input_stream_get_file_path(struct aws_parallel_input_st
143143
struct aws_parallel_input_stream_from_file_impl *impl = stream->impl;
144144
return aws_string_c_str(impl->file_path);
145145
}
146+
147+
struct aws_s3_part_streaming_input_stream_impl {
148+
struct aws_input_stream base;
149+
struct aws_input_stream *base_stream;
150+
size_t offset;
151+
size_t total_length;
152+
size_t length_read;
153+
struct aws_allocator *allocator;
154+
};
155+
156+
static int s_aws_s3_part_streaming_input_stream_seek(
157+
struct aws_input_stream *stream,
158+
int64_t offset,
159+
enum aws_stream_seek_basis basis) {
160+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream =
161+
AWS_CONTAINER_OF(stream, struct aws_s3_part_streaming_input_stream_impl, base);
162+
aws_input_stream_seek(test_input_stream->base_stream, offset + test_input_stream->offset, basis);
163+
return AWS_OP_ERR;
164+
}
165+
166+
static int s_aws_s3_part_streaming_input_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
167+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream =
168+
AWS_CONTAINER_OF(stream, struct aws_s3_part_streaming_input_stream_impl, base);
169+
int rt = aws_input_stream_read(test_input_stream->base_stream, dest);
170+
test_input_stream->length_read += dest->len;
171+
return rt;
172+
}
173+
174+
static int s_aws_s3_part_streaming_input_stream_get_status(
175+
struct aws_input_stream *stream,
176+
struct aws_stream_status *status) {
177+
(void)stream;
178+
(void)status;
179+
180+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream =
181+
AWS_CONTAINER_OF(stream, struct aws_s3_part_streaming_input_stream_impl, base);
182+
183+
status->is_end_of_stream = test_input_stream->length_read == test_input_stream->total_length;
184+
status->is_valid = true;
185+
186+
return AWS_OP_SUCCESS;
187+
}
188+
189+
static int s_aws_s3_part_streaming_input_stream_get_length(struct aws_input_stream *stream, int64_t *out_length) {
190+
AWS_ASSERT(stream != NULL);
191+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream =
192+
AWS_CONTAINER_OF(stream, struct aws_s3_part_streaming_input_stream_impl, base);
193+
*out_length = (int64_t)test_input_stream->total_length;
194+
return AWS_OP_SUCCESS;
195+
}
196+
197+
static void s_aws_s3_part_streaming_input_stream_destroy(
198+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream) {
199+
aws_input_stream_release(test_input_stream->base_stream);
200+
aws_mem_release(test_input_stream->allocator, test_input_stream);
201+
}
202+
203+
static struct aws_input_stream_vtable s_aws_s3_part_streaming_input_stream_vtable = {
204+
.seek = s_aws_s3_part_streaming_input_stream_seek,
205+
.read = s_aws_s3_part_streaming_input_stream_read,
206+
.get_status = s_aws_s3_part_streaming_input_stream_get_status,
207+
.get_length = s_aws_s3_part_streaming_input_stream_get_length,
208+
};
209+
210+
void aws_s3_part_streaming_input_stream_reset(struct aws_input_stream *stream) {
211+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream =
212+
AWS_CONTAINER_OF(stream, struct aws_s3_part_streaming_input_stream_impl, base);
213+
test_input_stream->length_read = 0;
214+
aws_input_stream_seek(test_input_stream->base_stream, test_input_stream->offset, AWS_SSB_BEGIN);
215+
}
216+
217+
struct aws_input_stream *aws_input_stream_new_from_parallel(
218+
struct aws_allocator *allocator,
219+
struct aws_parallel_input_stream *parallel_stream,
220+
uint64_t offset,
221+
size_t request_body_size) {
222+
223+
struct aws_s3_part_streaming_input_stream_impl *test_input_stream =
224+
aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_part_streaming_input_stream_impl));
225+
aws_ref_count_init(
226+
&test_input_stream->base.ref_count,
227+
test_input_stream,
228+
(aws_simple_completion_callback *)s_aws_s3_part_streaming_input_stream_destroy);
229+
230+
test_input_stream->base.vtable = &s_aws_s3_part_streaming_input_stream_vtable;
231+
struct aws_parallel_input_stream_from_file_impl *impl = parallel_stream->impl;
232+
aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_impl));
233+
aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl);
234+
235+
test_input_stream->base_stream = aws_input_stream_new_from_file(allocator, aws_string_c_str(impl->file_path));
236+
test_input_stream->total_length = request_body_size;
237+
test_input_stream->offset = offset;
238+
test_input_stream->length_read = 0;
239+
aws_input_stream_seek(test_input_stream->base_stream, offset, AWS_SSB_BEGIN);
240+
241+
return &test_input_stream->base;
242+
}

0 commit comments

Comments
 (0)