Skip to content

Commit e0cc062

Browse files
committed
couple fix
1 parent 9303470 commit e0cc062

File tree

4 files changed

+29
-21
lines changed

4 files changed

+29
-21
lines changed

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,8 @@ struct aws_parallel_input_stream_vtable {
4343
* @return a future, which will contain an error code if something went wrong,
4444
* or a result bool indicating whether EOF has been reached.
4545
*/
46-
struct aws_future_bool *(*read)(
47-
struct aws_parallel_input_stream *stream,
48-
uint64_t offset,
49-
size_t max_length,
50-
struct aws_byte_buf *dest);
46+
struct aws_future_bool *(
47+
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, size_t max_length, struct aws_byte_buf *dest);
5148
};
5249

5350
AWS_EXTERN_C_BEGIN

source/s3_parallel_input_stream.c

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
100100
struct aws_future_bool *end_future = read_task->end_future;
101101
FILE *file_stream = NULL;
102102
int error_code = AWS_ERROR_SUCCESS;
103+
size_t actually_read = 0;
103104

104105
file_stream = aws_fopen(aws_string_c_str(impl->file_path), "rb");
105106
if (file_stream == NULL) {
@@ -124,7 +125,7 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
124125
goto cleanup;
125126
}
126127

127-
size_t actually_read = fread(read_task->dest->buffer + read_task->dest->len, 1, read_task->length, file_stream);
128+
actually_read = fread(read_task->dest->buffer + read_task->dest->len, 1, read_task->length, file_stream);
128129
if (actually_read == 0 && ferror(file_stream)) {
129130
AWS_LOGF_ERROR(
130131
AWS_LS_S3_GENERAL,
@@ -255,6 +256,9 @@ struct aws_s3_mmap_part_streaming_input_stream_impl {
255256

256257
struct aws_byte_buf chunk_buf_1;
257258
struct aws_byte_buf chunk_buf_2;
259+
260+
bool eos_loaded;
261+
bool eos_reached;
258262
};
259263

260264
static int s_aws_s3_mmap_part_streaming_input_stream_seek(
@@ -275,14 +279,18 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
275279

276280
if (impl->in_chunk_offset == SIZE_MAX) {
277281
/* The reading buf is invalid. Block until the loading buf is available. */
278-
AWS_ASSERT(impl->loading_future != NULL);
282+
if (impl->loading_future == NULL) {
283+
/* Nothing to read */
284+
AWS_ASSERT(impl->eos_reached);
285+
return AWS_OP_SUCCESS;
286+
}
279287
aws_future_bool_wait(impl->loading_future, MAX_TIMEOUT_NS_P);
280288
int read_error = aws_future_bool_get_error(impl->loading_future);
281289
if (read_error != 0) {
282290
/* Read failed. */
283291
return aws_raise_error(read_error);
284292
}
285-
bool eos = aws_future_bool_get_result(impl->loading_future);
293+
impl->eos_loaded = aws_future_bool_get_result(impl->loading_future);
286294
impl->loading_future = aws_future_bool_release(impl->loading_future);
287295
/* Swap the reading the loading pointer. */
288296
AWS_ASSERT(impl->reading_chunk_buf->len == 0);
@@ -292,7 +300,7 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
292300
size_t new_offset = impl->offset + impl->total_length_read + impl->chunk_load_size;
293301
size_t new_load_length = aws_min_size(
294302
impl->chunk_load_size, impl->total_length - impl->total_length_read - impl->reading_chunk_buf->len);
295-
if (new_load_length > 0 && !eos) {
303+
if (new_load_length > 0 && !impl->eos_loaded) {
296304
/* Kick off loading the next chunk. */
297305
impl->loading_future =
298306
aws_parallel_input_stream_read(impl->stream, new_offset, new_load_length, impl->loading_chunk_buf);
@@ -311,6 +319,10 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
311319
/* We finished reading the reading buffer, reset it. */
312320
aws_byte_buf_reset(impl->reading_chunk_buf, false);
313321
impl->in_chunk_offset = SIZE_MAX;
322+
if (impl->eos_loaded) {
323+
/* We reached the end of the stream. */
324+
impl->eos_reached = true;
325+
}
314326
}
315327

316328
return AWS_OP_SUCCESS;
@@ -322,10 +334,10 @@ static int s_aws_s3_mmap_part_streaming_input_stream_get_status(
322334
(void)stream;
323335
(void)status;
324336

325-
struct aws_s3_mmap_part_streaming_input_stream_impl *mmap_input_stream =
337+
struct aws_s3_mmap_part_streaming_input_stream_impl *impl =
326338
AWS_CONTAINER_OF(stream, struct aws_s3_mmap_part_streaming_input_stream_impl, base);
327339

328-
status->is_end_of_stream = mmap_input_stream->total_length_read == mmap_input_stream->total_length;
340+
status->is_end_of_stream = (impl->total_length_read == impl->total_length) || impl->eos_reached;
329341
status->is_valid = true;
330342

331343
return AWS_OP_SUCCESS;

source/s3_request_messages.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -657,13 +657,12 @@ static const struct aws_byte_cursor s_complete_payload_begin = AWS_BYTE_CUR_INIT
657657
static const struct aws_byte_cursor s_complete_payload_end =
658658
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("</CompleteMultipartUpload>");
659659

660-
static const struct aws_byte_cursor s_part_section_string_0 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(
661-
" <Part>\n"
662-
" <ETag>");
660+
static const struct aws_byte_cursor s_part_section_string_0 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(" <Part>\n"
661+
" <ETag>");
663662

664-
static const struct aws_byte_cursor s_part_section_string_1 = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(
665-
"</ETag>\n"
666-
" <PartNumber>");
663+
static const struct aws_byte_cursor s_part_section_string_1 =
664+
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("</ETag>\n"
665+
" <PartNumber>");
667666

668667
static const struct aws_byte_cursor s_close_part_number_tag = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("</PartNumber>\n");
669668
static const struct aws_byte_cursor s_close_part_tag = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(" </Part>\n");

tests/s3_test_parallel_stream.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ struct aws_parallel_input_stream_from_file_failure_impl {
1414
};
1515

1616
static void s_para_from_file_failure_destroy(struct aws_parallel_input_stream *stream) {
17-
struct aws_parallel_input_stream_from_file_failure_impl *impl =
17+
struct aws_parallel_input_stream_from_file_failure_impl *impl =
1818
AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_failure_impl, base);
1919

2020
aws_mem_release(stream->alloc, impl);
@@ -25,14 +25,14 @@ struct aws_future_bool *s_para_from_file_failure_read(
2525
uint64_t offset,
2626
size_t max_length,
2727
struct aws_byte_buf *dest) {
28-
28+
2929
(void)offset;
3030
(void)max_length;
3131

3232
struct aws_future_bool *future = aws_future_bool_new(stream->alloc);
33-
struct aws_parallel_input_stream_from_file_failure_impl *impl =
33+
struct aws_parallel_input_stream_from_file_failure_impl *impl =
3434
AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_failure_impl, base);
35-
35+
3636
size_t previous_number_read = aws_atomic_fetch_add(&impl->number_read, 1);
3737
if (previous_number_read == 1) {
3838
/* TODO: make the failure configurable */

0 commit comments

Comments
 (0)