Skip to content

Commit 43d33d6

Browse files
authored
File streaming support (#564)
- Support streaming from file for upload instead of buffer the full part reviewed in 5 separate PRs: - #556 - #549 - #546 - #544 - #566
1 parent 6c203e4 commit 43d33d6

33 files changed

+2206
-365
lines changed

include/aws/s3/private/s3_checksums.h

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,29 @@ struct aws_s3_meta_request_checksum_config_storage {
6262
};
6363

6464
/**
65-
* a stream that takes in a stream
65+
* Helper stream that takes in a stream and the checksum context to help finalize the checksum from the underlying
66+
* stream.
67+
* The context will be only finalized when the checksum stream has read to the end of stream.
68+
*
69+
* Note: seek this stream will immediately fail, as it would prevent an accurate calculation of the
70+
* checksum.
71+
*
72+
* @param allocator
73+
* @param existing_stream The real content to read from. Destroying the checksum stream destroys the existing stream.
74+
* outputs the checksum of existing stream to checksum_output upon destruction. Will be kept
75+
* alive by the checksum stream
76+
* @param context Checksum context to keep and get checksum requirements from.
77+
*/
78+
AWS_S3_API
79+
struct aws_input_stream *aws_checksum_stream_new_with_context(
80+
struct aws_allocator *allocator,
81+
struct aws_input_stream *existing_stream,
82+
struct aws_s3_upload_request_checksum_context *context);
83+
84+
/**
85+
* Helper stream that takes in a stream to keep track of the checksum of the underlying stream during read.
86+
* Invoke `aws_checksum_stream_finalize_checksum` to get the checksum of the data has been read so far.
87+
*
6688
* Note: seek this stream will immediately fail, as it would prevent an accurate calculation of the
6789
* checksum.
6890
*
@@ -85,15 +107,6 @@ struct aws_input_stream *aws_checksum_stream_new(
85107
AWS_S3_API
86108
int aws_checksum_stream_finalize_checksum(struct aws_input_stream *checksum_stream, struct aws_byte_buf *checksum_buf);
87109

88-
/**
89-
* Finalize the checksum has read so far to the checksum context.
90-
* Not thread safe.
91-
*/
92-
AWS_S3_API
93-
int aws_checksum_stream_finalize_checksum_context(
94-
struct aws_input_stream *checksum_stream,
95-
struct aws_s3_upload_request_checksum_context *checksum_context);
96-
97110
/**
98111
* TODO: properly support chunked encoding.
99112
* Creates a chunked encoding stream that wraps an existing stream and adds checksum trailers.

include/aws/s3/private/s3_client_impl.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,11 @@ struct aws_s3_client_vtable {
168168

169169
void (*finish_destroy)(struct aws_s3_client *client);
170170

171-
struct aws_parallel_input_stream *(
172-
*parallel_input_stream_new_from_file)(struct aws_allocator *allocator, struct aws_byte_cursor file_name);
171+
struct aws_parallel_input_stream *(*parallel_input_stream_new_from_file)(
172+
struct aws_allocator *allocator,
173+
struct aws_byte_cursor file_name,
174+
struct aws_event_loop_group *reading_elg,
175+
bool direct_io_read);
173176

174177
struct aws_http_stream *(*http_connection_make_request)(
175178
struct aws_http_connection *client_connection,
@@ -235,6 +238,10 @@ struct aws_s3_client {
235238
* to meta requests for use. */
236239
const uint64_t max_part_size;
237240

241+
/* File I/O options. */
242+
bool fio_options_set;
243+
struct aws_s3_file_io_options fio_opts;
244+
238245
/* The size threshold in bytes for when to use multipart uploads for a AWS_S3_META_REQUEST_TYPE_PUT_OBJECT meta
239246
* request. Uploads over this size will automatically use a multipart upload strategy, while uploads smaller or
240247
* equal to this threshold will use a single request to upload the whole object. If not set, `part_size` will be
@@ -351,6 +358,9 @@ struct aws_s3_client {
351358

352359
/* Number of requests currently scheduled to be streamed the response body or are actively being streamed. */
353360
struct aws_atomic_var num_requests_streaming_response;
361+
362+
/* Number of overall requests currently streaming the request body instead of buffering. */
363+
struct aws_atomic_var num_requests_streaming_request_body;
354364
} stats;
355365

356366
struct {

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ struct aws_s3_meta_request {
288288
FILE *recv_file;
289289
struct aws_string *recv_filepath;
290290
bool recv_file_delete_on_failure;
291+
292+
/* File I/O options. */
293+
struct aws_s3_file_io_options fio_opts;
291294
};
292295

293296
/* Info for each part, that we need to remember until we send CompleteMultipartUpload */
@@ -307,6 +310,7 @@ int aws_s3_meta_request_init_base(
307310
struct aws_s3_client *client,
308311
size_t part_size,
309312
bool should_compute_content_md5,
313+
bool should_default_streaming,
310314
const struct aws_s3_meta_request_options *options,
311315
void *impl,
312316
struct aws_s3_meta_request_vtable *vtable,

include/aws/s3/private/s3_parallel_input_stream.h

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,21 @@ AWS_PUSH_SANE_WARNING_LEVEL
1414

1515
struct aws_byte_buf;
1616
struct aws_future_bool;
17+
struct aws_future_void;
1718
struct aws_input_stream;
19+
struct aws_s3_meta_request;
20+
struct aws_s3_request;
1821

1922
struct aws_event_loop_group;
2023

24+
/**
25+
* This should be private, but keep it public for providing your own implementation.
26+
*/
2127
struct aws_parallel_input_stream {
2228
const struct aws_parallel_input_stream_vtable *vtable;
2329
struct aws_allocator *alloc;
2430
struct aws_ref_count ref_count;
31+
struct aws_future_void *shutdown_future;
2532

2633
void *impl;
2734
};
@@ -33,11 +40,27 @@ struct aws_parallel_input_stream_vtable {
3340
void (*destroy)(struct aws_parallel_input_stream *stream);
3441

3542
/**
36-
* Read into the buffer in parallel.
37-
* The implementation needs to support this to be invoked concurrently from multiple threads
43+
* Read from the offset until fill the dest, or EOF reached.
44+
* It's thread safe to be called from multiple threads without waiting for other read to complete
45+
*
46+
* @param stream The stream to read from
47+
* @param offset The offset in the stream from beginning to start reading
48+
* @param max_length The maximum number of bytes to read
49+
* @param dest The output buffer read to
50+
* @return a future, which will contain an error code if something went wrong,
51+
* or a result bool indicating whether EOF has been reached.
3852
*/
3953
struct aws_future_bool *(
40-
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, struct aws_byte_buf *dest);
54+
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, size_t max_length, struct aws_byte_buf *dest);
55+
56+
/**
57+
* Get the length of the stream.
58+
*
59+
* @param stream The stream to get length from
60+
* @param out_length The output length
61+
* @return AWS_OP_SUCCESS if success, otherwise AWS_OP_ERR
62+
*/
63+
int (*get_length)(struct aws_parallel_input_stream *stream, int64_t *out_length);
4164
};
4265

4366
AWS_EXTERN_C_BEGIN
@@ -74,6 +97,7 @@ struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_p
7497
*
7598
* @param stream The stream to read from
7699
* @param offset The offset in the stream from beginning to start reading
100+
* @param max_length The maximum number of bytes to read
77101
* @param dest The output buffer read to
78102
* @return a future, which will contain an error code if something went wrong,
79103
* or a result bool indicating whether EOF has been reached.
@@ -82,22 +106,51 @@ AWS_S3_API
82106
struct aws_future_bool *aws_parallel_input_stream_read(
83107
struct aws_parallel_input_stream *stream,
84108
uint64_t offset,
109+
size_t max_length,
85110
struct aws_byte_buf *dest);
86111

87112
/**
88-
* Create a new file based parallel input stream.
113+
* Get the total length of the parallel input stream.
89114
*
90-
* This implementation will open a file handler when the read happens, and seek to the offset to start reading. Close
91-
* the file handler as read finishes.
115+
* @param stream
116+
* @param out_length
117+
* @return AWS_S3_API
118+
*/
119+
AWS_S3_API
120+
int aws_parallel_input_stream_get_length(struct aws_parallel_input_stream *stream, int64_t *out_length);
121+
122+
/**
123+
* Creates a new parallel input stream that reads from a file.
124+
* This stream uses an event loop group to perform file I/O operations asynchronously.
125+
*
126+
* Notes for direct_io_read:
127+
* - checking `aws_file_path_read_from_offset_direct_io` for detail
128+
* - For `AWS_ERROR_UNSUPPORTED_OPERATION`, fallback to reading with cache with warnings, instead of fail.
129+
* - If alignment required, it's callers' responsibility to align with the page size.
92130
*
93-
* @param allocator memory allocator
94-
* @param file_name The file path to read from
95-
* @return aws_parallel_input_stream
131+
* @param allocator The allocator to use for memory allocation
132+
* @param file_name The name of the file to read from
133+
* @param reading_elg The event loop group to use for file I/O operations
134+
* @param direct_io_read Whether to use direct I/O for reading the file.
135+
*
136+
* @return A new parallel input stream that reads from the specified file
96137
*/
97138
AWS_S3_API
98139
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
99140
struct aws_allocator *allocator,
100-
struct aws_byte_cursor file_name);
141+
struct aws_byte_cursor file_name,
142+
struct aws_event_loop_group *reading_elg,
143+
bool direct_io_read);
144+
145+
/**
146+
* Get the shutdown future from the parallel input stream.
147+
* The future will be completed when every refcount on the stream has been released.
148+
* And all the resource has been released.
149+
* Don't hold any refcount of the stream while waiting on the future, otherwise, deadlock can happen.
150+
* You need to release the future after using it.
151+
*/
152+
AWS_S3_API
153+
struct aws_future_void *aws_parallel_input_stream_get_shutdown_future(struct aws_parallel_input_stream *stream);
101154

102155
AWS_EXTERN_C_END
103156
AWS_POP_SANE_WARNING_LEVEL
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#ifndef AWS_S3_PART_STREAMING_INPUT_STREAM_H
7+
#define AWS_S3_PART_STREAMING_INPUT_STREAM_H
8+
9+
#include <aws/s3/s3.h>
10+
11+
struct aws_input_stream;
12+
struct aws_s3_buffer_ticket;
13+
struct aws_parallel_input_stream;
14+
15+
AWS_PUSH_SANE_WARNING_LEVEL
16+
AWS_EXTERN_C_BEGIN
17+
18+
/**
19+
* Creates a new streaming input stream that reads from a parallel input stream.
20+
* This adapter allows using a parallel input stream with APIs that expect a standard input stream.
21+
* The adapter uses double-buffering to read ahead and provide efficient streaming.
22+
*
23+
* Note: The input stream only provides a blocking API to read, if reading from the same thread that
24+
* will be used from `para_stream` to handle the parallel read, it's subjected to a dead lock!!
25+
* Make sure the input stream is read from a different thread/thread pool that executes the `para_stream` reads.
26+
*
27+
* @param allocator The allocator to use for memory allocation
28+
* @param para_stream The parallel input stream to read from
29+
* @param buffer_ticket The buffer pool ticket to use for buffering
30+
* @param offset The starting offset in the stream
31+
* @param request_body_size The total size to read
32+
* @param page_aligned Whether the input stream only read from the para_stream on that
33+
* aligned with the page size, required for direct I/O in linux.
34+
* @return A new input stream that reads from the parallel input stream
35+
*/
36+
AWS_S3_API
37+
struct aws_input_stream *aws_part_streaming_input_stream_new(
38+
struct aws_allocator *allocator,
39+
struct aws_parallel_input_stream *para_stream,
40+
struct aws_s3_buffer_ticket *buffer_ticket,
41+
uint64_t offset,
42+
size_t request_body_size,
43+
bool page_aligned);
44+
45+
AWS_EXTERN_C_END
46+
AWS_POP_SANE_WARNING_LEVEL
47+
48+
#endif /* AWS_S3_PART_STREAMING_INPUT_STREAM_H */

include/aws/s3/private/s3_request.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,19 @@ struct aws_s3_request {
157157
/* Owning meta request. */
158158
struct aws_s3_meta_request *meta_request;
159159

160+
/* The buffer size to be allocated for the request, defaults the part size. */
161+
size_t buffer_size;
160162
/* Request body to use when sending the request. The contents of this body will be re-used if a request is
161163
* retried.*/
162164
struct aws_byte_buf request_body;
163165

166+
/* Set when the request will be streaming from file directly instead of the request_body. */
167+
bool fio_streaming;
168+
/* If file I/O options configure streaming. The request body will be streaming from this. If a request is retried,
169+
* this stream will be recreated. */
170+
struct aws_input_stream *request_body_stream;
171+
uint64_t content_length;
172+
164173
/**
165174
* Ticket to acquire the buffer.
166175
*/

include/aws/s3/private/s3_request_messages.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ AWS_S3_API
5252
struct aws_input_stream *aws_s3_message_util_assign_body(
5353
struct aws_allocator *allocator,
5454
struct aws_byte_buf *byte_buf,
55+
struct aws_input_stream *stream,
5556
struct aws_http_message *out_message,
5657
struct aws_s3_upload_request_checksum_context *checksum_context);
5758

@@ -91,6 +92,18 @@ struct aws_http_message *aws_s3_upload_part_message_new(
9192
bool should_compute_content_md5,
9293
struct aws_s3_upload_request_checksum_context *checksum_context);
9394

95+
/* Same as `aws_s3_upload_part_message_new`, but instead of taking the loaded buffer, taking the input stream for
96+
* streaming I/O. */
97+
AWS_S3_API
98+
struct aws_http_message *aws_s3_upload_part_message_new_streaming(
99+
struct aws_allocator *allocator,
100+
struct aws_http_message *base_message,
101+
struct aws_input_stream *input_stream,
102+
uint32_t part_number,
103+
const struct aws_string *upload_id,
104+
bool should_compute_content_md5,
105+
struct aws_s3_upload_request_checksum_context *checksum_context);
106+
94107
/* Create an HTTP request for an S3 UploadPartCopy request, using the original request as a basis.
95108
* If multipart is not needed, part number and upload_id can be 0 and NULL,
96109
* respectively. */

include/aws/s3/private/s3_util.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
#endif
2525
#define KB_TO_BYTES(kb) ((kb) * 1024)
2626
#define MB_TO_BYTES(mb) ((mb) * 1024 * 1024)
27-
#define GB_TO_BYTES(gb) ((gb) * 1024 * 1024 * 1024ULL)
27+
#define GB_TO_BYTES(gb) ((uint64_t)(gb) * 1024 * 1024 * 1024ULL)
28+
#define TB_TO_BYTES(tb) ((uint64_t)(tb) * 1024 * 1024 * 1024 * 1024ULL)
2829

2930
#define MS_TO_NS(ms) ((uint64_t)(ms) * 1000000)
3031
#define SEC_TO_NS(ms) ((uint64_t)(ms) * 1000000000)
@@ -150,6 +151,15 @@ extern const struct aws_byte_cursor g_delete_method;
150151
AWS_S3_API
151152
extern const uint32_t g_s3_max_num_upload_parts;
152153

154+
AWS_S3_API
155+
/* TODO: now this is hard-coded as 8MB, but maybe something else is better. */
156+
extern const size_t g_streaming_buffer_size;
157+
158+
AWS_S3_API
159+
extern const double g_default_throughput_target_gbps;
160+
161+
AWS_S3_API
162+
extern const uint64_t g_streaming_object_size_threshold;
153163
/**
154164
* Returns AWS_S3_REQUEST_TYPE_UNKNOWN if name doesn't map to an enum value.
155165
*/

0 commit comments

Comments
 (0)