Skip to content

Commit bd5f363

Browse files
authored
Buffer pool refactor (#584)
1 parent fadd3dc commit bd5f363

22 files changed

+2577
-221
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ jobs:
223223
uses: actions/checkout@v4
224224
- name: Build ${{ env.PACKAGE_NAME }} + consumers
225225
run: |
226+
python3 -m venv .venv
227+
source .venv/bin/activate
226228
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
227229
chmod a+x builder
228230
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DASSERT_LOCK_HELD=ON

include/aws/s3/private/s3_auto_ranged_get.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ struct aws_s3_auto_ranged_get {
2323

2424
/* Estimated object stored part size based on ETag analysis */
2525
uint64_t estimated_object_stored_part_size;
26+
/* Number of parts stored in S3. We derive this from ETag, if ETag is not formatted as expected, this will be
27+
* default to 1.
28+
* Note: For S3Express Append, the object will be treated as a single part, even though, it can be multiple parts
29+
* stored in S3.
30+
*/
31+
uint64_t num_stored_parts;
2632
/* Part size was set or not from user for this meta request. */
2733
bool part_size_set;
2834
bool force_dynamic_part_size;

include/aws/s3/private/s3_default_buffer_pool.h

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
* SPDX-License-Identifier: Apache-2.0.
77
*/
88

9+
#include <aws/common/hash_table.h>
10+
#include <aws/common/mutex.h>
911
#include <aws/s3/s3.h>
1012
#include <aws/s3/s3_buffer_pool.h>
1113

@@ -59,11 +61,79 @@ struct aws_s3_default_buffer_pool_usage_stats {
5961
/* Secondary memory reserved, but not yet used. Accurate, maps directly to base allocator. */
6062
size_t secondary_reserved;
6163

64+
/* Overall memory allocated for special-sized blocks. */
65+
size_t special_blocks_allocated;
66+
/* Number of special block sizes created. */
67+
size_t special_blocks_num;
68+
/* Memory reserved in special-sized blocks. */
69+
size_t special_blocks_reserved;
70+
/* Memory used in special-sized blocks. */
71+
size_t special_blocks_used;
72+
6273
/* Bytes used in "forced" buffers (created even if they exceed memory limits).
6374
* This is always <= primary_used + secondary_used */
6475
size_t forced_used;
6576
};
6677

78+
/* Structure to track special-sized blocks */
79+
struct s3_special_block_list {
80+
struct aws_allocator *allocator;
81+
size_t buffer_size; /* Size of buffers in this list */
82+
struct aws_array_list blocks; /* Array of uint8_t* pointers to allocated blocks */
83+
};
84+
85+
struct aws_s3_default_buffer_pool {
86+
struct aws_allocator *base_allocator;
87+
struct aws_mutex mutex;
88+
89+
size_t block_size;
90+
size_t chunk_size;
91+
/* size at which allocations should go to secondary */
92+
size_t primary_size_cutoff;
93+
94+
/* NOTE: See aws_s3_buffer_pool_usage_stats for descriptions of most fields */
95+
96+
size_t mem_limit;
97+
98+
size_t primary_allocated;
99+
size_t primary_reserved;
100+
size_t primary_used;
101+
102+
size_t special_blocks_allocated;
103+
size_t special_blocks_reserved;
104+
size_t special_blocks_used;
105+
106+
size_t secondary_reserved;
107+
size_t secondary_used;
108+
109+
size_t forced_used;
110+
111+
struct aws_array_list blocks;
112+
113+
struct aws_linked_list pending_reserves;
114+
115+
/* Special-sized blocks: hash table mapping size -> struct s3_special_block_list * */
116+
/* TODO: let's discuss about the special list lifetime. Should we just keep it with the memory pool? Concern is that
117+
* the pool will live with the client, and may result in all sorts of special lists to be around. */
118+
struct aws_hash_table special_blocks;
119+
120+
/* TEST ONLY: to force the special blocks alive during trim. */
121+
bool force_keeping_special_blocks;
122+
};
123+
124+
struct s3_pending_reserve {
125+
struct aws_linked_list_node node;
126+
struct aws_future_s3_buffer_ticket *ticket_future;
127+
struct aws_s3_default_buffer_ticket *ticket;
128+
struct aws_s3_buffer_pool_reserve_meta meta;
129+
};
130+
131+
struct s3_buffer_pool_block {
132+
size_t block_size;
133+
uint8_t *block_ptr;
134+
uint16_t alloc_bit_mask;
135+
};
136+
67137
/*
68138
* Create new buffer pool.
69139
* chunk_size - specifies the size of memory that will most commonly be acquired

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ struct aws_s3_meta_request {
157157

158158
/* Part size to use for uploads and downloads. Passed down by the creating client. */
159159
const size_t part_size;
160+
/* Hard limit on max connections set through the meta request option. */
161+
const uint32_t max_active_connections_override;
160162

161163
struct aws_cached_signing_config_aws *cached_signing_config;
162164

@@ -166,6 +168,9 @@ struct aws_s3_meta_request {
166168

167169
struct aws_s3_endpoint *endpoint;
168170

171+
/* Number of requests being sent/received over network for the meta request. */
172+
struct aws_atomic_var num_requests_network;
173+
169174
/* Event loop to schedule IO work related on, ie, reading from streams, streaming parts back to the caller, etc...
170175
* After the meta request is finished, this will be reset along with the client reference.*/
171176
struct aws_event_loop *io_event_loop;
@@ -185,6 +190,10 @@ struct aws_s3_meta_request {
185190

186191
enum aws_s3_meta_request_type type;
187192
struct aws_string *s3express_session_host;
193+
/* Is the meta request made to s3express bucket or not. */
194+
bool is_express;
195+
/* If the buffer pool optimized for the specific size or not. */
196+
bool buffer_pool_optimized;
188197

189198
struct {
190199
struct aws_mutex lock;
@@ -269,6 +278,9 @@ struct aws_s3_meta_request {
269278
/* True if this meta request is currently in the client's list. */
270279
bool scheduled;
271280

281+
/* Track the number of requests being prepared for this meta request. */
282+
size_t num_request_being_prepared;
283+
272284
} client_process_work_threaded_data;
273285

274286
/* Anything in this structure should only ever be accessed by the meta-request from its io_event_loop thread. */

include/aws/s3/private/s3_util.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ extern const uint64_t g_default_max_part_size;
169169

170170
AWS_S3_API
171171
extern const uint64_t g_s3_optimal_range_size_alignment;
172+
173+
AWS_S3_API
174+
extern const uint32_t g_s3express_connection_limitation;
175+
AWS_S3_API
176+
extern const uint64_t g_s3express_connection_limitation_part_size_threshold;
177+
AWS_S3_API
178+
extern const uint64_t g_s3express_connection_limitation_object_size_threshold;
179+
172180
/**
173181
* Returns AWS_S3_REQUEST_TYPE_UNKNOWN if name doesn't map to an enum value.
174182
*/
@@ -359,13 +367,15 @@ int aws_s3_calculate_client_optimal_range_size(
359367
*
360368
* @param client_optimal_range_size The client-level optimal range size from initialization
361369
* @param estimated_object_stored_part_size Estimated size of object stored parts in S3
370+
* @param is_express If the request is a s3express request or not.
362371
* @param out_request_optimal_range_size Output parameter for calculated request-level optimal range size
363372
* @return AWS_OP_SUCCESS on success, AWS_OP_ERR on failure (caller should fall back to client size)
364373
*/
365374
AWS_S3_API
366375
int aws_s3_calculate_request_optimal_range_size(
367376
uint64_t client_optimal_range_size,
368377
uint64_t estimated_object_stored_part_size,
378+
bool is_express,
369379
uint64_t *out_request_optimal_range_size);
370380

371381
/**

include/aws/s3/s3_buffer_pool.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ struct aws_s3_buffer_pool_vtable {
102102
**/
103103
void (*trim)(struct aws_s3_buffer_pool *pool);
104104

105+
/**
106+
* Optimize the buffer pool for allocations of a specific size.
107+
* Creates a separate list of blocks dedicated to this size for better memory efficiency.
108+
*/
109+
int (*add_special_size)(struct aws_s3_buffer_pool *pool, size_t buffer_size);
110+
111+
/**
112+
* Release all special-sized blocks from the buffer pool.
113+
* This frees all memory allocated for the special size optimization.
114+
*/
115+
void (*release_special_size)(struct aws_s3_buffer_pool *pool, size_t buffer_size);
116+
117+
/**
118+
* Align a range size to the buffer pool's allocation strategy.
119+
* Returns the optimal aligned size based on the buffer pool's configuration.
120+
*/
121+
uint64_t (*derive_aligned_buffer_size)(struct aws_s3_buffer_pool *pool, uint64_t size);
122+
105123
/* Implement below for custom ref count behavior. Alternatively set those to null and init the ref count. */
106124
struct aws_s3_buffer_pool *(*acquire)(struct aws_s3_buffer_pool *pool);
107125
struct aws_s3_buffer_pool *(*release)(struct aws_s3_buffer_pool *pool);
@@ -144,6 +162,41 @@ typedef struct aws_s3_buffer_pool *(aws_s3_buffer_pool_factory_fn)(struct aws_al
144162
struct aws_s3_buffer_pool_config config,
145163
void *user_data);
146164

165+
/**
166+
* Optimize the buffer pool for allocations of a specific size.
167+
* Creates a separate list of blocks dedicated to this size for better memory efficiency.
168+
* Allocations of exactly this size will use these special blocks instead of the regular primary/secondary storage.
169+
*
170+
* @param buffer_pool The buffer pool to optimize
171+
* @param buffer_size The size to optimize for (must be > 0)
172+
* @return AWS_OP_SUCCESS on success, AWS_OP_ERR on failure
173+
*/
174+
AWS_S3_API
175+
int aws_s3_buffer_pool_add_special_size(struct aws_s3_buffer_pool *buffer_pool, size_t buffer_size);
176+
177+
/**
178+
* Release the special-sized blocks from the buffer pool.
179+
* Should be called when done with the special-sized allocations.
180+
*
181+
* @param buffer_pool The buffer pool
182+
* @param buffer_size The special size to release blocks for
183+
*/
184+
AWS_S3_API
185+
void aws_s3_buffer_pool_release_special_size(struct aws_s3_buffer_pool *buffer_pool, size_t buffer_size);
186+
187+
/**
188+
* Align a range size to the buffer pool's allocation strategy.
189+
* This function determines the optimal aligned size based on the buffer pool's configuration.
190+
* For sizes within the primary allocation range, it aligns to chunk boundaries.
191+
* For larger sizes that go to secondary storage, it returns the size as-is.
192+
*
193+
* @param buffer_pool The buffer pool to use for alignment (can be NULL, in which case size is returned unchanged)
194+
* @param size The size to align
195+
* @return The aligned size that's optimal for the buffer pool's allocation strategy
196+
*/
197+
AWS_S3_API
198+
uint64_t aws_s3_buffer_pool_derive_aligned_buffer_size(struct aws_s3_buffer_pool *buffer_pool, uint64_t size);
199+
147200
AWS_EXTERN_C_END
148201
AWS_POP_SANE_WARNING_LEVEL
149202

include/aws/s3/s3_client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,10 @@ struct aws_s3_meta_request_options {
10051005
* This will be ignored for other operations.
10061006
*/
10071007
struct aws_byte_cursor copy_source_uri;
1008+
1009+
/* When set, this will cap the number of active connections for the meta request. When 0, the client will determine
1010+
* it based on client side settings. (Recommended) */
1011+
uint32_t max_active_connections_override;
10081012
};
10091013

10101014
/* Result details of a meta request.

source/s3_auto_ranged_get.c

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ static void s_s3_auto_ranged_get_request_finished(
787787
error_code = AWS_ERROR_S3_MISSING_ETAG;
788788
goto update_synced_data;
789789
}
790-
/* Extract number of parts from ETag and calculate estimated part size */
790+
/* Extract number of parts stored in S3 from ETag and calculate estimated part size */
791791
uint32_t num_parts = 0;
792792
if (aws_s3_extract_parts_from_etag(etag_header_value, &num_parts) == AWS_OP_SUCCESS && num_parts > 0) {
793793
auto_ranged_get->estimated_object_stored_part_size = object_size / num_parts;
@@ -801,6 +801,7 @@ static void s_s3_auto_ranged_get_request_finished(
801801
num_parts,
802802
auto_ranged_get->estimated_object_stored_part_size);
803803
} else {
804+
num_parts = 1;
804805
/* Failed to parse ETags */
805806
AWS_LOGF_WARN(
806807
AWS_LS_S3_META_REQUEST,
@@ -809,24 +810,49 @@ static void s_s3_auto_ranged_get_request_finished(
809810
auto_ranged_get->estimated_object_stored_part_size = g_default_part_size_fallback;
810811
goto update_synced_data;
811812
}
813+
auto_ranged_get->num_stored_parts = num_parts;
812814
}
813815

814816
/* If we were able to discover the object-range/content length successfully, then any error code that was passed
815817
* into this function is being handled and does not indicate an overall failure.*/
816818
error_code = AWS_ERROR_SUCCESS;
817819
found_object_size = true;
820+
uint32_t max_connections = aws_s3_client_get_max_active_connections(meta_request->client, meta_request);
818821

819822
if (auto_ranged_get->force_dynamic_part_size ||
820823
(!auto_ranged_get->part_size_set && !meta_request->client->part_size_set)) {
821824
/* No part size has been set from user. Now we use the optimal part size based on the throughput and memory
822825
* limit */
823826
uint64_t out_request_optimal_range_size = 0;
827+
824828
if (aws_s3_calculate_request_optimal_range_size(
825829
meta_request->client->optimal_range_size,
826830
auto_ranged_get->estimated_object_stored_part_size,
831+
meta_request->is_express,
827832
&out_request_optimal_range_size) == AWS_OP_SUCCESS) {
833+
/* Apply a buffer pool alignment to the calculated result. */
834+
out_request_optimal_range_size = aws_s3_buffer_pool_derive_aligned_buffer_size(
835+
meta_request->client->buffer_pool, out_request_optimal_range_size);
836+
AWS_LOGF_INFO(
837+
AWS_LS_S3_META_REQUEST,
838+
"id=%p: Override the part size to be optimal. part_size=%" PRIu64 ".",
839+
(void *)meta_request,
840+
out_request_optimal_range_size);
828841
/* Override the part size to be optimal */
829842
*((size_t *)&meta_request->part_size) = (size_t)out_request_optimal_range_size;
843+
uint64_t parts_threshold = aws_mul_u64_saturating(max_connections, 2);
844+
if (auto_ranged_get->num_stored_parts > parts_threshold) {
845+
/* If the number of parts is greater than the threshold, so that we will be reusing the buffers
846+
* enough from the buffer pool. Let's add a special block for the buffer pool to optimize the
847+
* case.*/
848+
AWS_LOGF_INFO(
849+
AWS_LS_S3_META_REQUEST,
850+
"id=%p: Apply buffer pool optimization for the size=%zu.",
851+
(void *)meta_request,
852+
meta_request->part_size);
853+
aws_s3_buffer_pool_add_special_size(meta_request->client->buffer_pool, meta_request->part_size);
854+
meta_request->buffer_pool_optimized = true;
855+
}
830856
if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_HEAD_OBJECT) {
831857
/* Update the first part size as well, if we haven't made the request yet. */
832858
first_part_size = meta_request->part_size;
@@ -847,6 +873,31 @@ static void s_s3_auto_ranged_get_request_finished(
847873
}
848874
}
849875

876+
if (meta_request->is_express &&
877+
meta_request->part_size < g_s3express_connection_limitation_part_size_threshold &&
878+
object_size > g_s3express_connection_limitation_object_size_threshold) {
879+
/**
880+
* TODO: THIS IS A TEMP WORKAROUND, not the long term solution.
881+
* 1. If the Part Size we set is larger than the possible size to hit the limitation, we are safe to
882+
* make as many connections as we want.
883+
* 2. If the object size is less than the threshold, we keep our previous behavior, as it's less likely
884+
* to hit the server side limitation.
885+
*
886+
* Otherwise, we need to make sure the number of concurrent connections is lower than the limitation.
887+
*/
888+
uint32_t max_active_connections_override = aws_min_u32(g_s3express_connection_limitation, max_connections);
889+
if (max_active_connections_override < max_connections) {
890+
/* Override the max active connections to be the limitation. */
891+
*((uint32_t *)&meta_request->max_active_connections_override) =
892+
(uint32_t)max_active_connections_override;
893+
AWS_LOGF_WARN(
894+
AWS_LS_S3_META_REQUEST,
895+
"id=%p: Override the max active connections for the meta request to be the limitation: %d",
896+
(void *)meta_request,
897+
max_active_connections_override);
898+
}
899+
}
900+
850901
if (!empty_file_error && meta_request->headers_callback != NULL) {
851902
/* Modify the header received to fake the header for the whole meta request. */
852903
if (request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_OBJECT_WITH_RANGE ||
@@ -936,8 +987,8 @@ static void s_s3_auto_ranged_get_request_finished(
936987
if (empty_file_error) {
937988
/*
938989
* Try to download the object again using GET_OBJECT_WITH_PART_NUMBER_1. If the file is still
939-
* empty, successful response headers will be provided to users. If not, the newer version of the
940-
* file will be downloaded.
990+
* empty, successful response headers will be provided to users. If not, the newer version of
991+
* the file will be downloaded.
941992
*/
942993
auto_ranged_get->synced_data.num_parts_requested = 0;
943994
auto_ranged_get->synced_data.object_range_known = 0;
@@ -999,7 +1050,8 @@ static void s_s3_auto_ranged_get_request_finished(
9991050
}
10001051
aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
10011052
if (error_code == AWS_ERROR_S3_RESPONSE_CHECKSUM_MISMATCH) {
1002-
/* It's a mismatch of checksum, tell user that we validated the checksum and the algorithm we validated
1053+
/* It's a mismatch of checksum, tell user that we validated the checksum and the algorithm we
1054+
* validated
10031055
*/
10041056
meta_request->synced_data.finish_result.did_validate = true;
10051057
meta_request->synced_data.finish_result.validation_algorithm = request->validation_algorithm;

0 commit comments

Comments
 (0)