Skip to content

Commit 52c90d3

Browse files
Memory pool interface (#517)
Co-authored-by: Waqar Ahmed Khan <[email protected]>
1 parent 50c9a33 commit 52c90d3

20 files changed

+1661
-1057
lines changed

include/aws/s3/private/s3_buffer_pool.h renamed to include/aws/s3/private/s3_default_buffer_pool.h

Lines changed: 14 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
#include <aws/s3/s3.h>
10+
#include <aws/s3/s3_buffer_pool.h>
1011

1112
/*
1213
* S3 buffer pool.
@@ -30,10 +31,10 @@
3031

3132
AWS_EXTERN_C_BEGIN
3233

33-
struct aws_s3_buffer_pool;
34-
struct aws_s3_buffer_pool_ticket;
34+
struct aws_s3_default_buffer_pool;
35+
struct aws_s3_default_buffer_ticket;
3536

36-
struct aws_s3_buffer_pool_usage_stats {
37+
struct aws_s3_default_buffer_pool_usage_stats {
3738
/* Effective Max memory limit. Memory limit value provided during construction minus
3839
* buffer reserved for overhead of the pool */
3940
size_t mem_limit;
@@ -71,16 +72,15 @@ struct aws_s3_buffer_pool_usage_stats {
7172
* buffers can no longer be reserved from (reservation hold is placed on the pool).
7273
* Returns buffer pool pointer on success and NULL on failure.
7374
*/
74-
AWS_S3_API struct aws_s3_buffer_pool *aws_s3_buffer_pool_new(
75+
AWS_S3_API struct aws_s3_buffer_pool *aws_s3_default_buffer_pool_new(
7576
struct aws_allocator *allocator,
76-
size_t chunk_size,
77-
size_t mem_limit);
77+
struct aws_s3_buffer_pool_config config);
7878

7979
/*
8080
* Destroys buffer pool.
8181
* Does nothing if buffer_pool is NULL.
8282
*/
83-
AWS_S3_API void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool);
83+
AWS_S3_API void aws_s3_default_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool);
8484

8585
/*
8686
* Reserves memory from the pool for later use.
@@ -95,60 +95,32 @@ AWS_S3_API void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_poo
9595
* If you MUST acquire a buffer now (waiting to reserve a ticket would risk deadlock),
9696
* use aws_s3_buffer_pool_acquire_forced_buffer() instead.
9797
*/
98-
AWS_S3_API struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(
98+
AWS_S3_API struct aws_future_s3_buffer_ticket *aws_s3_default_buffer_pool_reserve(
9999
struct aws_s3_buffer_pool *buffer_pool,
100-
size_t size);
101-
102-
/*
103-
* Whether pool has a reservation hold.
104-
*/
105-
AWS_S3_API bool aws_s3_buffer_pool_has_reservation_hold(struct aws_s3_buffer_pool *buffer_pool);
106-
107-
/*
108-
* Remove reservation hold on pool.
109-
*/
110-
AWS_S3_API void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffer_pool);
100+
struct aws_s3_buffer_pool_reserve_meta meta);
111101

112102
/*
113103
* Trades in the ticket for a buffer.
114104
* Cannot fail and can over allocate above mem limit if reservation was not accurate.
115105
* Using the same ticket twice will return the same buffer.
116106
* Buffer is only valid until the ticket is released.
117107
*/
118-
AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer(
119-
struct aws_s3_buffer_pool *buffer_pool,
120-
struct aws_s3_buffer_pool_ticket *ticket);
121-
122-
/*
123-
* Force immediate acquisition of a buffer from the pool.
124-
* This should only be used if waiting to reserve a ticket would risk deadlock.
125-
* This cannot fail, not even if the pool has a reservation hold,
126-
* not even if the memory limit has been exceeded.
127-
*/
128-
AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_forced_buffer(
129-
struct aws_s3_buffer_pool *buffer_pool,
130-
size_t size,
131-
struct aws_s3_buffer_pool_ticket **out_new_ticket);
132-
133-
/*
134-
* Releases the ticket.
135-
* Any buffers associated with the ticket are invalidated.
136-
*/
137-
AWS_S3_API void aws_s3_buffer_pool_release_ticket(
108+
AWS_S3_API struct aws_byte_buf aws_s3_default_buffer_pool_acquire_buffer(
138109
struct aws_s3_buffer_pool *buffer_pool,
139-
struct aws_s3_buffer_pool_ticket *ticket);
110+
struct aws_s3_default_buffer_ticket *ticket);
140111

141112
/*
142113
* Get pool memory usage stats.
143114
*/
144-
AWS_S3_API struct aws_s3_buffer_pool_usage_stats aws_s3_buffer_pool_get_usage(struct aws_s3_buffer_pool *buffer_pool);
115+
AWS_S3_API struct aws_s3_default_buffer_pool_usage_stats aws_s3_default_buffer_pool_get_usage(
116+
struct aws_s3_buffer_pool *buffer_pool);
145117

146118
/*
147119
* Trims all unused mem from the pool.
148120
* Warning: fairly slow operation, do not use in critical path.
149121
* TODO: partial trimming? ex. only trim down to 50% of max?
150122
*/
151-
AWS_S3_API void aws_s3_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool);
123+
AWS_S3_API void aws_s3_default_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool);
152124

153125
AWS_EXTERN_C_END
154126

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ struct aws_s3_meta_request {
170170
/* Customer specified callbacks. */
171171
aws_s3_meta_request_headers_callback_fn *headers_callback;
172172
aws_s3_meta_request_receive_body_callback_fn *body_callback;
173+
aws_s3_meta_request_receive_body_callback_ex_fn *body_callback_ex;
173174
aws_s3_meta_request_finish_fn *finish_callback;
174175
aws_s3_meta_request_shutdown_fn *shutdown_callback;
175176
aws_s3_meta_request_progress_fn *progress_callback;
@@ -226,8 +227,11 @@ struct aws_s3_meta_request {
226227
/* To track aws_s3_requests with cancellable HTTP streams */
227228
struct aws_linked_list cancellable_http_streams_list;
228229

230+
/* To track aws_future_s3_buffers that might need to be cleaned up on cancel */
231+
struct aws_linked_list pending_buffer_futures;
232+
229233
/* Data for async-writes. */
230-
struct {
234+
struct aws_s3_async_write_data {
231235
/* Whether a part request can be sent (we have 1 part's worth of data, or EOF) */
232236
bool ready_to_send;
233237

@@ -237,7 +241,8 @@ struct aws_s3_meta_request {
237241
/* Holds buffered data we can't immediately send.
238242
* The length will always be less than part-size */
239243
struct aws_byte_buf buffered_data;
240-
struct aws_s3_buffer_pool_ticket *buffered_data_ticket;
244+
struct aws_s3_buffer_ticket *buffered_data_ticket;
245+
struct aws_future_s3_buffer_ticket *buffered_ticket_future;
241246

242247
/* Waker callback.
243248
* Stored if a poll_write() call returns result.is_pending
@@ -401,6 +406,9 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
401406
/* Cancel the requests with cancellable HTTP stream for the meta request */
402407
void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);
403408

409+
/* Cancel the pending buffer futures for the meta request */
410+
void aws_s3_meta_request_cancel_pending_buffer_futures_synced(struct aws_s3_meta_request *meta_request, int error_code);
411+
404412
/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
405413
* as reading from the stream could cause user code to call back into aws-c-s3.
406414
* This will fill the buffer to capacity, unless end of stream is reached.

include/aws/s3/private/s3_request.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#include <aws/common/thread.h>
1313
#include <aws/s3/s3.h>
1414

15-
#include <aws/s3/private/s3_buffer_pool.h>
1615
#include <aws/s3/private/s3_checksums.h>
1716

1817
struct aws_http_message;
@@ -21,9 +20,8 @@ struct aws_s3_meta_request;
2120

2221
enum aws_s3_request_flags {
2322
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS = 0x00000001,
24-
AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY = 0x00000002,
25-
AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000004,
26-
AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY = 0x00000008,
23+
AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000002,
24+
AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL = 0x00000004,
2725
};
2826

2927
/**
@@ -116,10 +114,16 @@ struct aws_s3_request {
116114
/* Linked list node used for tracking the request is active from HTTP level. */
117115
struct aws_linked_list_node cancellable_http_streams_list_node;
118116

117+
/* Linked list node used for tracking buffer acquire futures. */
118+
struct aws_linked_list_node pending_buffer_future_list_node;
119+
119120
/* The meta request lock must be held to access the data */
120121
struct {
121122
/* The underlying http stream, only valid when the request is active from HTTP level */
122123
struct aws_http_stream *cancellable_http_stream;
124+
125+
/* Buffer future. */
126+
struct aws_future_s3_buffer_ticket *buffer_future;
123127
} synced_data;
124128

125129
/* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request,
@@ -135,7 +139,7 @@ struct aws_s3_request {
135139
* retried.*/
136140
struct aws_byte_buf request_body;
137141

138-
struct aws_s3_buffer_pool_ticket *ticket;
142+
struct aws_s3_buffer_ticket *ticket;
139143

140144
/* Beginning range of this part. */
141145
/* TODO currently only used by auto_range_get, could be hooked up to auto_range_put as well. */
@@ -225,11 +229,8 @@ struct aws_s3_request {
225229
/* When true, response headers from the request will be stored in the request's response_headers variable. */
226230
uint32_t record_response_headers : 1;
227231

228-
/* When true, the response body buffer will be allocated in the size of a part. */
229-
uint32_t has_part_size_response_body : 1;
230-
231-
/* When true, the request body buffer will be allocated in the size of a part. */
232-
uint32_t has_part_size_request_body : 1;
232+
/* Indicates whether buffer should be allocated for the request from the pool. */
233+
uint32_t should_allocate_buffer_from_pool : 1;
233234

234235
/* When true, this request is being tracked by the client for limiting the amount of in-flight-requests/stats. */
235236
uint32_t tracked_by_client : 1;

include/aws/s3/s3.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ enum aws_s3_errors {
4949
AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS,
5050
AWS_ERROR_S3_RECV_FILE_NOT_FOUND,
5151
AWS_ERROR_S3_REQUEST_TIMEOUT,
52+
AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED,
5253

5354
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
5455
};

include/aws/s3/s3_buffer_pool.h

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#ifndef AWS_S3_BUFFER_POOL_H
2+
#define AWS_S3_BUFFER_POOL_H
3+
4+
#include <aws/common/ref_count.h>
5+
#include <aws/io/future.h>
6+
#include <aws/s3/s3.h>
7+
8+
/**
9+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
10+
* SPDX-License-Identifier: Apache-2.0.
11+
*/
12+
13+
/**
14+
* Generic memory pool interface.
15+
* Allows consumers of aws-c-s3 to override how buffer allocation for part buffers is done.
16+
* Refer to docs/memory_aware_request_execution.md for details on how default implementation works.
17+
* WARNING: this is currently experimental feature and does not provide API stability guarantees so should be used with
18+
* caution. At highlevel the flow is as follows:
19+
* - crt scheduler queues up requests to be prepared
20+
* - requests being prepared will try to reserve mem (i.e. obtain a ticket) and wait until they get it before proceeding
21+
* - once mem is reserved requests will proceed with the pipeline
22+
* - request will acquire buffer from the ticket when needed
23+
* - ticket is released when request is done with the buffer
24+
* Note: in some cases pipeline can stall if new buffer cannot be allocated (ex. async writes flow).
25+
* In this case reserve request will indicate that not granting the ticket can block and buffer pool should try to
26+
* allocate ticket right away (or wait and call waker when mem is allocated for the case of async writes).
27+
*/
28+
29+
AWS_PUSH_SANE_WARNING_LEVEL
30+
AWS_EXTERN_C_BEGIN
31+
struct aws_s3_buffer_ticket;
32+
33+
/**
34+
* aws_future<aws_s3_buffer_ticket*>
35+
* Buffer ticket future used for reservations.
36+
*/
37+
AWS_FUTURE_T_POINTER_WITH_RELEASE_DECLARATION(aws_future_s3_buffer_ticket, struct aws_s3_buffer_ticket, AWS_S3_API)
38+
39+
/**
40+
* Meta information about ticket reservation request.
41+
*/
42+
struct aws_s3_buffer_pool_reserve_meta {
43+
/* client reserving the ticket. accounts for buffer pool being shared between clients. */
44+
struct aws_s3_client *client;
45+
46+
/* meta request ticket is being reserved for. */
47+
struct aws_s3_meta_request *meta_request;
48+
49+
/* size of the buffer to reserve. */
50+
size_t size;
51+
52+
/* whether not granting reservation can result in request pipeline being blocked.
53+
* Note: blocking is currently a terminal condition and that cannot be recovered from,
54+
* i.e. meta request will be stuck and not make any process.
55+
* As such buffer pool should either grant or error out reservation in sync.
56+
* This scenario currently only occurs in the async_write flows. */
57+
bool can_block;
58+
};
59+
60+
struct aws_s3_buffer_ticket;
61+
62+
struct aws_s3_buffer_ticket_vtable {
63+
/**
64+
* Get buffer associated with the ticket.
65+
* Note: can be called multiple times and the same buffer should be returned. In some cases ticket might not be
66+
* claimed at all.
67+
*/
68+
struct aws_byte_buf (*claim)(struct aws_s3_buffer_ticket *ticket);
69+
70+
/* Implement below for custom ref count behavior. Alternatively set those to null and init the ref count. */
71+
struct aws_s3_buffer_ticket *(*acquire)(struct aws_s3_buffer_ticket *ticket);
72+
struct aws_s3_buffer_ticket *(*release)(struct aws_s3_buffer_ticket *ticket);
73+
};
74+
75+
/**
76+
* Polymorphic ticket.
77+
*/
78+
struct aws_s3_buffer_ticket {
79+
struct aws_s3_buffer_ticket_vtable *vtable;
80+
struct aws_ref_count ref_count;
81+
void *impl;
82+
};
83+
84+
AWS_S3_API struct aws_byte_buf aws_s3_buffer_ticket_claim(struct aws_s3_buffer_ticket *ticket);
85+
86+
AWS_S3_API struct aws_s3_buffer_ticket *aws_s3_buffer_ticket_acquire(struct aws_s3_buffer_ticket *ticket);
87+
AWS_S3_API struct aws_s3_buffer_ticket *aws_s3_buffer_ticket_release(struct aws_s3_buffer_ticket *ticket);
88+
89+
struct aws_s3_buffer_pool;
90+
91+
struct aws_s3_buffer_pool_vtable {
92+
/* Reserve a ticket. Returns a future that is granted whenever reservation can be made. */
93+
struct aws_future_s3_buffer_ticket *(
94+
*reserve)(struct aws_s3_buffer_pool *pool, struct aws_s3_buffer_pool_reserve_meta meta);
95+
96+
/**
97+
* Trim the pool. This is mostly a suggestion, which pool can decide to ignore. Triggered by CRT when
98+
* client has been idle for some time.
99+
**/
100+
void (*trim)(struct aws_s3_buffer_pool *pool);
101+
102+
/* Implement below for custom ref count behavior. Alternatively set those to null and init the ref count. */
103+
struct aws_s3_buffer_pool *(*acquire)(struct aws_s3_buffer_pool *pool);
104+
struct aws_s3_buffer_pool *(*release)(struct aws_s3_buffer_pool *pool);
105+
};
106+
107+
/**
108+
* Polymorphic buffer pool.
109+
*/
110+
struct aws_s3_buffer_pool {
111+
struct aws_s3_buffer_pool_vtable *vtable;
112+
struct aws_ref_count ref_count;
113+
void *impl;
114+
};
115+
116+
AWS_S3_API struct aws_future_s3_buffer_ticket *aws_s3_buffer_pool_reserve(
117+
struct aws_s3_buffer_pool *buffer_pool,
118+
struct aws_s3_buffer_pool_reserve_meta meta);
119+
AWS_S3_API void aws_s3_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool);
120+
121+
AWS_S3_API struct aws_s3_buffer_pool *aws_s3_buffer_pool_acquire(struct aws_s3_buffer_pool *buffer_pool);
122+
AWS_S3_API struct aws_s3_buffer_pool *aws_s3_buffer_pool_release(struct aws_s3_buffer_pool *buffer_pool);
123+
124+
/**
125+
* Buffer pool configuration options.
126+
*/
127+
struct aws_s3_buffer_pool_config {
128+
struct aws_s3_client *client; /* Client creating the pool. */
129+
size_t part_size; /* Default part size of the client. */
130+
size_t max_part_size; /* Max part size configured on the client. */
131+
size_t memory_limit; /* Memory limit set on the client. */
132+
};
133+
134+
/**
135+
* Factory to construct the pool for the given config. Passes along buffer related info configured on the client, which
136+
* factory may ignore when considering how to construct pool.
137+
* This implementation should fail if pool cannot be constructed for some reason (ex. if config params cannot be met),
138+
* by logging failure reason, returning null and raising aws_error.
139+
*/
140+
typedef struct aws_s3_buffer_pool *(aws_s3_buffer_pool_factory_fn)(struct aws_allocator *allocator,
141+
struct aws_s3_buffer_pool_config config);
142+
143+
AWS_EXTERN_C_END
144+
AWS_POP_SANE_WARNING_LEVEL
145+
146+
#endif /* AWS_S3_BUFFER_POOL_H */

0 commit comments

Comments
 (0)