Skip to content

Commit 39ae2ac

Browse files
committed
fixes
1 parent a549942 commit 39ae2ac

1 file changed

Lines changed: 13 additions & 7 deletions

File tree

source/s3_parallel_input_stream.c

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
106106
if (actually_read == 0) {
107107
printf("############fread failed!");
108108
}
109+
read_task->dest->len += actually_read;
109110

110111
fclose(file_stream);
111112
struct aws_future_bool *end_future = read_task->end_future;
@@ -331,26 +332,28 @@ static int s_aws_s3_mmap_part_streaming_input_stream_read(struct aws_input_strea
331332
impl->reading_chunk_buf = impl->loading_chunk_buf;
332333
impl->loading_chunk_buf = tmp;
333334
size_t new_offset = impl->offset + impl->total_length_read + impl->chunk_load_size;
335+
size_t new_load_length = aws_min_size(
336+
impl->chunk_load_size, impl->total_length - impl->total_length_read - impl->reading_chunk_buf->len);
334337
/* Kick off loading the next chunk. */
335338
impl->loading_future = aws_parallel_input_stream_read(
336-
impl->stream, new_offset, new_offset + impl->chunk_load_size, impl->loading_chunk_buf);
339+
impl->stream, new_offset, new_offset + new_load_length, impl->loading_chunk_buf);
337340
impl->in_chunk_offset = 0;
338341
}
339-
read_length = aws_min_size(read_length, impl->chunk_load_size - impl->in_chunk_offset);
342+
read_length = aws_min_size(read_length, impl->reading_chunk_buf->len - impl->in_chunk_offset);
340343
struct aws_byte_cursor chunk_cursor = aws_byte_cursor_from_buf(impl->reading_chunk_buf);
341344
aws_byte_cursor_advance(&chunk_cursor, impl->in_chunk_offset);
342345
chunk_cursor.len = read_length;
343346
int rt = aws_byte_buf_append(dest, &chunk_cursor);
344347
impl->in_chunk_offset += read_length;
345348
impl->total_length_read += read_length;
346349

347-
if (impl->in_chunk_offset == impl->chunk_load_size) {
350+
if (impl->in_chunk_offset == impl->reading_chunk_buf->len) {
348351
/* We finished reading the reading buffer, reset it. */
349352
aws_byte_buf_reset(impl->reading_chunk_buf, false);
350353
impl->in_chunk_offset = SIZE_MAX;
351354
}
352355

353-
return rt;
356+
return AWS_OP_SUCCESS;
354357
}
355358

356359
static int s_aws_s3_mmap_part_streaming_input_stream_get_status(
@@ -379,6 +382,8 @@ static int s_aws_s3_mmap_part_streaming_input_stream_get_length(struct aws_input
379382
static void s_aws_s3_mmap_part_streaming_input_stream_destroy(
380383
struct aws_s3_mmap_part_streaming_input_stream_impl *mmap_input_stream) {
381384
aws_parallel_input_stream_release(mmap_input_stream->stream);
385+
aws_byte_buf_clean_up(&mmap_input_stream->chunk_buf_1);
386+
aws_byte_buf_clean_up(&mmap_input_stream->chunk_buf_2);
382387
aws_mem_release(mmap_input_stream->allocator, mmap_input_stream);
383388
}
384389

@@ -421,11 +426,12 @@ struct aws_input_stream *aws_input_stream_new_from_mmap_context(
421426
aws_byte_buf_init(&mmap_input_stream->chunk_buf_2, allocator, mmap_input_stream->chunk_load_size);
422427

423428
mmap_input_stream->loading_chunk_buf = &mmap_input_stream->chunk_buf_1;
424-
mmap_input_stream->reading_chunk_buf = &mmap_input_stream->chunk_buf_1;
429+
mmap_input_stream->reading_chunk_buf = &mmap_input_stream->chunk_buf_2;
430+
size_t new_load_length = aws_min_size(mmap_input_stream->chunk_load_size, mmap_input_stream->total_length);
425431

426432
/* Start to load into the loading buffer. */
427-
mmap_input_stream->loading_future = aws_parallel_input_stream_read(
428-
stream, offset, offset + mmap_input_stream->chunk_load_size, mmap_input_stream->loading_chunk_buf);
433+
mmap_input_stream->loading_future =
434+
aws_parallel_input_stream_read(stream, offset, offset + new_load_length, mmap_input_stream->loading_chunk_buf);
429435

430436
return &mmap_input_stream->base;
431437
}

0 commit comments

Comments
 (0)