Skip to content

Commit c685200

Browse files
committed
Merge branch 'part_streaming_input_stream' into parallel_input_stream_update
2 parents a881fcd + a620cfa commit c685200

File tree

6 files changed

+929
-5
lines changed

6 files changed

+929
-5
lines changed

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ struct aws_parallel_input_stream_vtable {
4747
*/
4848
struct aws_future_bool *(
4949
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, size_t max_length, struct aws_byte_buf *dest);
50+
51+
/**
52+
* Get the length of the stream.
53+
*
54+
* @param stream The stream to get length from
55+
* @param out_length The output length
56+
* @return AWS_OP_SUCCESS if success, otherwise AWS_OP_ERR
57+
*/
58+
int (*get_length)(struct aws_parallel_input_stream *stream, int64_t *out_length);
5059
};
5160

5261
AWS_EXTERN_C_BEGIN
@@ -95,6 +104,16 @@ struct aws_future_bool *aws_parallel_input_stream_read(
95104
size_t max_length,
96105
struct aws_byte_buf *dest);
97106

107+
/**
108+
* Get the total length of the parallel input stream.
109+
*
110+
* @param stream
111+
* @param out_length
112+
* @return AWS_S3_API
113+
*/
114+
AWS_S3_API
115+
int aws_parallel_input_stream_get_length(struct aws_parallel_input_stream *stream, int64_t *out_length);
116+
98117
/**
99118
* Creates a new parallel input stream that reads from a file.
100119
* This stream uses an event loop group to perform file I/O operations asynchronously.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#ifndef AWS_S3_PART_STREAMING_INPUT_STREAM_H
7+
#define AWS_S3_PART_STREAMING_INPUT_STREAM_H
8+
9+
#include <aws/s3/s3.h>
10+
11+
struct aws_input_stream;
12+
struct aws_s3_buffer_ticket;
13+
struct aws_parallel_input_stream;
14+
15+
AWS_PUSH_SANE_WARNING_LEVEL
16+
AWS_EXTERN_C_BEGIN
17+
18+
/**
19+
* Creates a new streaming input stream that reads from a parallel input stream.
20+
* This adapter allows using a parallel input stream with APIs that expect a standard input stream.
21+
* The adapter uses double-buffering to read ahead and provide efficient streaming.
22+
*
23+
* @param allocator The allocator to use for memory allocation
24+
* @param para_stream The parallel input stream to read from
25+
* @param buffer_ticket The buffer pool ticket to use for buffering
26+
* @param offset The starting offset in the stream
27+
* @param request_body_size The total size to read
28+
* @return A new input stream that reads from the parallel input stream
29+
*/
30+
AWS_S3_API
31+
struct aws_input_stream *aws_part_streaming_input_stream_new(
32+
struct aws_allocator *allocator,
33+
struct aws_parallel_input_stream *para_stream,
34+
struct aws_s3_buffer_ticket *buffer_ticket,
35+
uint64_t offset,
36+
size_t request_body_size);
37+
38+
AWS_EXTERN_C_END
39+
AWS_POP_SANE_WARNING_LEVEL
40+
41+
#endif /* AWS_S3_PART_STREAMING_INPUT_STREAM_H */

source/s3_parallel_input_stream.c

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_p
4444
return NULL;
4545
}
4646

47+
int aws_parallel_input_stream_get_length(struct aws_parallel_input_stream *stream, int64_t *out_length) {
48+
if (stream->vtable->get_length) {
49+
return stream->vtable->get_length(stream, out_length);
50+
} else {
51+
return aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
52+
}
53+
}
54+
4755
struct aws_future_bool *aws_parallel_input_stream_read(
4856
struct aws_parallel_input_stream *stream,
4957
uint64_t offset,
@@ -68,7 +76,7 @@ struct aws_parallel_input_stream_from_file_impl {
6876
struct aws_event_loop_group *reading_elg;
6977
};
7078

71-
static void s_para_from_file_destroy(struct aws_parallel_input_stream *stream) {
79+
static void s_parallel_from_file_destroy(struct aws_parallel_input_stream *stream) {
7280
struct aws_parallel_input_stream_from_file_impl *impl =
7381
AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);
7482

@@ -102,6 +110,7 @@ static int s_read_from_file_impl(
102110
int rt_code = AWS_OP_ERR;
103111
FILE *file_stream = aws_fopen_safe(file_path, s_readonly_bytes_mode);
104112
if (file_stream == NULL) {
113+
AWS_LOGF_ERROR(AWS_LS_S3_GENERAL, "Failed to open file %s", aws_string_c_str(file_path));
105114
return AWS_OP_ERR;
106115
}
107116

@@ -158,7 +167,7 @@ static void s_s3_parallel_from_file_read_task(struct aws_task *task, void *arg,
158167
aws_mem_release(impl->base.alloc, read_task);
159168
}
160169

161-
struct aws_future_bool *s_para_from_file_read(
170+
struct aws_future_bool *s_parallel_from_file_read(
162171
struct aws_parallel_input_stream *stream,
163172
uint64_t offset,
164173
size_t max_length,
@@ -199,9 +208,23 @@ struct aws_future_bool *s_para_from_file_read(
199208
return future;
200209
}
201210

211+
int s_parallel_from_file_get_length(struct aws_parallel_input_stream *stream, int64_t *length) {
212+
struct aws_parallel_input_stream_from_file_impl *impl =
213+
AWS_CONTAINER_OF(stream, struct aws_parallel_input_stream_from_file_impl, base);
214+
FILE *file = aws_fopen_safe(impl->file_path, s_readonly_bytes_mode);
215+
if (!file) {
216+
return AWS_OP_ERR;
217+
}
218+
219+
int ret_val = aws_file_get_length(file, length);
220+
fclose(file);
221+
return ret_val;
222+
}
223+
202224
static struct aws_parallel_input_stream_vtable s_parallel_input_stream_from_file_vtable = {
203-
.destroy = s_para_from_file_destroy,
204-
.read = s_para_from_file_read,
225+
.destroy = s_parallel_from_file_destroy,
226+
.read = s_parallel_from_file_read,
227+
.get_length = s_parallel_from_file_get_length,
205228
};
206229

207230
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
@@ -219,7 +242,7 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
219242
if (!aws_path_exists(impl->file_path)) {
220243
/* If file path not exists, raise error from errno. */
221244
aws_translate_and_raise_io_error(errno);
222-
s_para_from_file_destroy(&impl->base);
245+
s_parallel_from_file_destroy(&impl->base);
223246
return NULL;
224247
}
225248

0 commit comments

Comments
 (0)