Skip to content

Commit 9303470

Browse files
committed
couple fixes
1 parent e8c9b30 commit 9303470

11 files changed

+179
-147
lines changed

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ struct aws_s3_request;
2525
struct aws_http_headers;
2626
struct aws_http_make_request_options;
2727
struct aws_retry_strategy;
28-
struct aws_mmap_context;
2928

3029
enum aws_s3_meta_request_state {
3130
AWS_S3_META_REQUEST_STATE_ACTIVE,

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,20 @@ struct aws_parallel_input_stream_vtable {
3333
void (*destroy)(struct aws_parallel_input_stream *stream);
3434

3535
/**
36-
* Read into the buffer in parallel.
37-
* The stream will split into parts and read each part in parallel.
36+
* Read from the offset until fill the dest, or EOF reached.
37+
* It's thread safe to be called from multiple threads without waiting for other read to complete
38+
*
39+
* @param stream The stream to read from
40+
* @param offset The offset in the stream from beginning to start reading
41+
* @param max_length The maximum number of bytes to read
42+
* @param dest The output buffer read to
43+
* @return a future, which will contain an error code if something went wrong,
44+
* or a result bool indicating whether EOF has been reached.
3845
*/
3946
struct aws_future_bool *(*read)(
4047
struct aws_parallel_input_stream *stream,
41-
size_t offset,
42-
size_t length,
48+
uint64_t offset,
49+
size_t max_length,
4350
struct aws_byte_buf *dest);
4451
};
4552

@@ -72,20 +79,21 @@ AWS_S3_API
7279
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream);
7380

7481
/**
75-
* Read data from the stream into the destination buffer.
82+
* Read from the offset until fill the dest, or EOF reached.
83+
* It's thread safe to be called from multiple threads without waiting for other read to complete
7684
*
77-
* @param stream The parallel input stream to read from
78-
* @param offset The starting offset in the stream to read from
79-
* @param length The number of bytes to read
80-
* @param dest The destination buffer to read into
81-
* @return A future that will be set to true when the read completes successfully,
82-
* or set to an error if the read fails
85+
* @param stream The stream to read from
86+
* @param offset The offset in the stream from beginning to start reading
87+
* @param max_length The maximum number of bytes to read
88+
* @param dest The output buffer read to
89+
* @return a future, which will contain an error code if something went wrong,
90+
* or a result bool indicating whether EOF has been reached.
8391
*/
8492
AWS_S3_API
8593
struct aws_future_bool *aws_parallel_input_stream_read(
8694
struct aws_parallel_input_stream *stream,
87-
size_t offset,
88-
size_t length,
95+
uint64_t offset,
96+
size_t max_length,
8997
struct aws_byte_buf *dest);
9098

9199
/**

source/s3_auto_ranged_put.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,9 @@ static bool s_s3_auto_ranged_put_update(
571571

572572
/* Allocate a request for another part. */
573573
uint32_t new_flags = AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS;
574-
if (!meta_request->synced_data.async_write.ready_to_send) {
574+
if (!meta_request->synced_data.async_write.ready_to_send &&
575+
meta_request->request_body_parallel_stream == NULL) {
576+
/* TODO: now get around the memory pool when streaming from file, but we can still hook it up. */
575577
new_flags |= AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL;
576578
}
577579

source/s3_meta_request.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,9 +2248,10 @@ struct aws_future_bool *aws_s3_meta_request_read_body(
22482248
}
22492249

22502250
/* If parallel-stream, simply call read(), which must fill the buffer and/or EOF */
2251-
// if (meta_request->request_body_parallel_stream != NULL) {
2252-
// return aws_parallel_input_stream_read(meta_request->request_body_parallel_stream, offset, buffer);
2253-
// }
2251+
if (meta_request->request_body_parallel_stream != NULL) {
2252+
return aws_parallel_input_stream_read(
2253+
meta_request->request_body_parallel_stream, offset, buffer->capacity, buffer);
2254+
}
22542255

22552256
/* Further techniques are synchronous... */
22562257
struct aws_future_bool *synchronous_read_future = aws_future_bool_new(meta_request->allocator);

source/s3_parallel_input_stream.c

Lines changed: 66 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <aws/io/future.h>
1616
#include <aws/io/stream.h>
1717

18+
#include <errno.h>
19+
1820
#define ONE_SEC_IN_NS_P ((uint64_t)AWS_TIMESTAMP_NANOS)
1921
#define MAX_TIMEOUT_NS_P (600 * ONE_SEC_IN_NS_P)
2022

@@ -47,8 +49,8 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_p
4749

4850
struct aws_future_bool *aws_parallel_input_stream_read(
4951
struct aws_parallel_input_stream *stream,
50-
size_t offset,
51-
size_t length,
52+
uint64_t offset,
53+
size_t max_length,
5254
struct aws_byte_buf *dest) {
5355
/* Ensure the buffer has space available */
5456
if (dest->len == dest->capacity) {
@@ -57,7 +59,7 @@ struct aws_future_bool *aws_parallel_input_stream_read(
5759
return future;
5860
}
5961

60-
struct aws_future_bool *future = stream->vtable->read(stream, offset, length, dest);
62+
struct aws_future_bool *future = stream->vtable->read(stream, offset, max_length, dest);
6163
AWS_POSTCONDITION(future != NULL);
6264
return future;
6365
}
@@ -86,7 +88,7 @@ struct read_task_impl {
8688
struct aws_parallel_input_stream_from_file_impl *para_impl;
8789

8890
struct aws_future_bool *end_future;
89-
size_t offset;
91+
uint64_t offset;
9092
size_t length;
9193
struct aws_byte_buf *dest;
9294
};
@@ -98,13 +100,13 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
98100
struct aws_future_bool *end_future = read_task->end_future;
99101
FILE *file_stream = NULL;
100102
int error_code = AWS_ERROR_SUCCESS;
101-
103+
102104
file_stream = aws_fopen(aws_string_c_str(impl->file_path), "rb");
103105
if (file_stream == NULL) {
104106
AWS_LOGF_ERROR(
105-
AWS_LS_S3_GENERAL,
106-
"id=%p: Failed to open file %s for reading",
107-
(void *)&impl->base,
107+
AWS_LS_S3_GENERAL,
108+
"id=%p: Failed to open file %s for reading",
109+
(void *)&impl->base,
108110
aws_string_c_str(impl->file_path));
109111
error_code = aws_last_error();
110112
goto cleanup;
@@ -113,10 +115,10 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
113115
/* seek to the right position and then read */
114116
if (aws_fseek(file_stream, (int64_t)read_task->offset, SEEK_SET)) {
115117
AWS_LOGF_ERROR(
116-
AWS_LS_S3_GENERAL,
117-
"id=%p: Failed to seek to position %zu in file %s",
118-
(void *)&impl->base,
119-
read_task->offset,
118+
AWS_LS_S3_GENERAL,
119+
"id=%p: Failed to seek to position %llu in file %s",
120+
(void *)&impl->base,
121+
(unsigned long long)read_task->offset,
120122
aws_string_c_str(impl->file_path));
121123
error_code = aws_last_error();
122124
goto cleanup;
@@ -125,66 +127,71 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
125127
size_t actually_read = fread(read_task->dest->buffer + read_task->dest->len, 1, read_task->length, file_stream);
126128
if (actually_read == 0 && ferror(file_stream)) {
127129
AWS_LOGF_ERROR(
128-
AWS_LS_S3_GENERAL,
129-
"id=%p: Failed to read %zu bytes from file %s",
130-
(void *)&impl->base,
131-
read_task->length,
130+
AWS_LS_S3_GENERAL,
131+
"id=%p: Failed to read %zu bytes from file %s",
132+
(void *)&impl->base,
133+
read_task->length,
132134
aws_string_c_str(impl->file_path));
133-
error_code = aws_last_error();
135+
error_code = aws_translate_and_raise_io_error(errno);
134136
goto cleanup;
135137
}
136-
138+
137139
read_task->dest->len += actually_read;
138-
140+
139141
AWS_LOGF_TRACE(
140-
AWS_LS_S3_GENERAL,
141-
"id=%p: Successfully read %zu bytes from file %s at position %zu",
142-
(void *)&impl->base,
143-
actually_read,
144-
aws_string_c_str(impl->file_path),
145-
read_task->offset);
142+
AWS_LS_S3_GENERAL,
143+
"id=%p: Successfully read %zu bytes from file %s at position %llu",
144+
(void *)&impl->base,
145+
actually_read,
146+
aws_string_c_str(impl->file_path),
147+
(unsigned long long)read_task->offset);
146148

147149
cleanup:
148150
if (file_stream != NULL) {
149151
fclose(file_stream);
150152
}
151-
153+
152154
if (error_code != AWS_ERROR_SUCCESS) {
153155
aws_future_bool_set_error(end_future, error_code);
154156
} else {
155-
aws_future_bool_set_result(end_future, true);
157+
/* Return true if we reached EOF */
158+
bool eof_reached = (actually_read < read_task->length);
159+
aws_future_bool_set_result(end_future, eof_reached);
156160
}
157-
161+
158162
aws_future_bool_release(end_future);
159163
aws_mem_release(impl->base.alloc, task);
160164
aws_mem_release(impl->base.alloc, read_task);
161165
}
162166

163167
struct aws_future_bool *s_para_from_file_read(
164168
struct aws_parallel_input_stream *stream,
165-
size_t offset,
166-
size_t length,
169+
uint64_t offset,
170+
size_t max_length,
167171
struct aws_byte_buf *dest) {
168172

169173
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
170174
struct aws_parallel_input_stream_from_file_impl *impl =
171175
AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);
172176

173-
if (!length) {
177+
/* Calculate how much we can read based on available buffer space and max_length */
178+
size_t available_space = dest->capacity - dest->len;
179+
size_t length = aws_min_size(available_space, max_length);
180+
181+
if (length == 0) {
174182
/* Nothing to read. Complete the read with success. */
175-
aws_future_bool_set_result(future, true);
176-
return future;
177-
}
178-
179-
if (length > dest->capacity - dest->len) {
180-
AWS_LOGF_ERROR(AWS_LS_S3_GENERAL, "id=%p: The buffer read to cannot fit the data.", (void *)stream);
181-
aws_future_bool_set_error(future, AWS_ERROR_SHORT_BUFFER);
183+
aws_future_bool_set_result(future, false);
182184
return future;
183185
}
184186

185187
struct read_task_impl *read_task = aws_mem_calloc(impl->base.alloc, 1, sizeof(struct read_task_impl));
186188

187-
AWS_LOGF_TRACE(AWS_LS_S3_GENERAL, "id=%p: Read %zu bytes from offset %zu", (void *)stream, length, offset);
189+
AWS_LOGF_TRACE(
190+
AWS_LS_S3_GENERAL,
191+
"id=%p: Read %zu bytes from offset %llu",
192+
(void *)stream,
193+
length,
194+
(unsigned long long)offset);
188195

189196
/* Initialize for one read */
190197
read_task->dest = dest;
@@ -213,9 +220,17 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
213220

214221
struct aws_parallel_input_stream_from_file_impl *impl =
215222
aws_mem_calloc(allocator, 1, sizeof(struct aws_parallel_input_stream_from_file_impl));
223+
224+
aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl);
216225
impl->file_path = aws_string_new_from_cursor(allocator, &file_name);
217226
impl->reading_elg = aws_event_loop_group_acquire(reading_elg);
218-
aws_parallel_input_stream_init_base(&impl->base, allocator, &s_parallel_input_stream_from_file_vtable, impl);
227+
228+
if (!aws_path_exists(impl->file_path)) {
229+
/* If file path not exists, raise error from errno. */
230+
aws_translate_and_raise_io_error(errno);
231+
s_para_from_file_destroy(&impl->base);
232+
return NULL;
233+
}
219234

220235
return &impl->base;
221236
}
@@ -262,6 +277,12 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
262277
/* The reading buf is invalid. Block until the loading buf is available. */
263278
AWS_ASSERT(impl->loading_future != NULL);
264279
aws_future_bool_wait(impl->loading_future, MAX_TIMEOUT_NS_P);
280+
int read_error = aws_future_bool_get_error(impl->loading_future);
281+
if (read_error != 0) {
282+
/* Read failed. */
283+
return aws_raise_error(read_error);
284+
}
285+
bool eos = aws_future_bool_get_result(impl->loading_future);
265286
impl->loading_future = aws_future_bool_release(impl->loading_future);
266287
/* Swap the reading the loading pointer. */
267288
AWS_ASSERT(impl->reading_chunk_buf->len == 0);
@@ -271,9 +292,11 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
271292
size_t new_offset = impl->offset + impl->total_length_read + impl->chunk_load_size;
272293
size_t new_load_length = aws_min_size(
273294
impl->chunk_load_size, impl->total_length - impl->total_length_read - impl->reading_chunk_buf->len);
274-
/* Kick off loading the next chunk. */
275-
impl->loading_future = aws_parallel_input_stream_read(
276-
impl->stream, new_offset, new_load_length, impl->loading_chunk_buf);
295+
if (new_load_length > 0 && !eos) {
296+
/* Kick off loading the next chunk. */
297+
impl->loading_future =
298+
aws_parallel_input_stream_read(impl->stream, new_offset, new_load_length, impl->loading_chunk_buf);
299+
}
277300
impl->in_chunk_offset = 0;
278301
}
279302
read_length = aws_min_size(read_length, impl->reading_chunk_buf->len - impl->in_chunk_offset);

source/s3_request.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ struct aws_s3_request *aws_s3_request_new(
3737

3838
request->part_number = part_number;
3939
request->record_response_headers = (flags & AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS) != 0;
40-
/* TODO: if the request is streaming, we are not allocating buffer from pool/ This will avoid the allocation buffer
41-
* to be the bottleneck when streaming. */
42-
request->should_allocate_buffer_from_pool = false;
40+
request->should_allocate_buffer_from_pool = (flags & AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL) != 0;
4341
request->always_send = (flags & AWS_S3_REQUEST_FLAG_ALWAYS_SEND) != 0;
4442

4543
return request;

source/s3express_credentials_provider.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ static struct aws_byte_cursor s_create_session_path_query = AWS_BYTE_CUR_INIT_FR
2626
static const size_t s_default_cache_capacity = 100;
2727

2828
/* Those number are from C++ SDK impl */
29-
static const uint64_t s_expired_threshold_secs = 60;
29+
/**
30+
* TODO: When each part is huge, the 5 sec is probably causing the credentials to be expired. Consider moving the
31+
* signing closer to HTTP request.
32+
*/
33+
static const uint64_t s_expired_threshold_secs = 5;
3034
static const uint64_t s_about_to_expire_threshold_secs = 60;
3135
static const uint64_t s_background_refresh_interval_secs = 60;
3236

tests/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,8 @@ if(AWS_ENABLE_S3_ENDPOINT_RESOLVER)
384384
add_test_case(test_s3_endpoint_resolver_resolve_endpoint_force_path_style)
385385
endif()
386386

387-
# add_test_case(parallel_read_stream_from_file_sanity_test)
388-
# add_test_case(parallel_read_stream_from_large_file_test)
387+
add_test_case(parallel_read_stream_from_file_sanity_test)
388+
add_test_case(parallel_read_stream_from_large_file_test)
389389

390390
add_test_case(test_s3_buffer_pool_threaded_allocs_and_frees)
391391
add_test_case(test_s3_buffer_pool_large_chunk_threaded_allocs_and_frees)

0 commit comments

Comments
 (0)