Skip to content

Commit 6d9cc79

Browse files
wip async reserve
1 parent 46e335e commit 6d9cc79

14 files changed

Lines changed: 211 additions & 146 deletions

include/aws/s3/private/s3_default_buffer_pool.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
#include <aws/s3/s3.h>
10+
#include <aws/s3/s3_buffer_pool.h>
1011

1112
/*
1213
* S3 buffer pool.
@@ -95,19 +96,9 @@ AWS_S3_API void aws_s3_default_buffer_pool_destroy(struct aws_s3_default_buffer_
9596
* If you MUST acquire a buffer now (waiting to reserve a ticket would risk deadlock),
9697
* use aws_s3_buffer_pool_acquire_forced_buffer() instead.
9798
*/
98-
AWS_S3_API struct aws_s3_buffer_ticket *aws_s3_default_buffer_pool_reserve(
99+
AWS_S3_API struct aws_future_s3_buffer_ticket *aws_s3_default_buffer_pool_reserve(
99100
struct aws_s3_default_buffer_pool *buffer_pool,
100-
size_t size);
101-
102-
/*
103-
* Whether pool has a reservation hold.
104-
*/
105-
AWS_S3_API bool aws_s3_default_buffer_pool_has_reservation_hold(struct aws_s3_default_buffer_pool *buffer_pool);
106-
107-
/*
108-
* Remove reservation hold on pool.
109-
*/
110-
AWS_S3_API void aws_s3_default_buffer_pool_remove_reservation_hold(struct aws_s3_default_buffer_pool *buffer_pool);
101+
struct aws_s3_buffer_pool_reserve_meta meta);
111102

112103
/*
113104
* Trades in the ticket for a buffer.

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ typedef void(aws_s3_meta_request_prepare_request_callback_fn)(
4646
struct aws_s3_prepare_request_payload {
4747
struct aws_allocator *allocator;
4848
struct aws_s3_request *request;
49+
struct aws_event_loop *event_loop;
4950
struct aws_task task;
51+
struct aws_future_s3_buffer_ticket *async_buffer_reserve;
5052
/* async step: wait for vtable->prepare_request() call to complete */
5153
struct aws_future_void *asyncstep_prepare_request;
5254
/* callback to invoke when all request preparation work is complete */

include/aws/s3/private/s3_request.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ struct aws_s3_meta_request;
2020

2121
enum aws_s3_request_flags {
2222
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS = 0x00000001,
23-
AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY = 0x00000002,
24-
AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000004,
25-
AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY = 0x00000008,
23+
AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000002,
24+
AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL = 0x00000004,
2625
};
2726

2827
/**
@@ -224,11 +223,8 @@ struct aws_s3_request {
224223
/* When true, response headers from the request will be stored in the request's response_headers variable. */
225224
uint32_t record_response_headers : 1;
226225

227-
/* When true, the response body buffer will be allocated in the size of a part. */
228-
uint32_t has_part_size_response_body : 1;
229-
230-
/* When true, the request body buffer will be allocated in the size of a part. */
231-
uint32_t has_part_size_request_body : 1;
226+
/* Indicates whether buffer should be allocated for the request from the pool. */
227+
uint32_t should_allocate_buffer_from_pool : 1;
232228

233229
/* When true, this request is being tracked by the client for limiting the amount of in-flight-requests/stats. */
234230
uint32_t tracked_by_client : 1;

include/aws/s3/s3.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ enum aws_s3_errors {
5050
AWS_ERROR_S3_RECV_FILE_NOT_FOUND,
5151
AWS_ERROR_S3_REQUEST_TIMEOUT,
5252
AWS_ERROR_S3_TOKEN_EXPIRED,
53+
AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED,
5354

5455
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
5556
};

include/aws/s3/s3_buffer_pool.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#ifndef AWS_S3_BUFFER_POOL_H
2+
#define AWS_S3_BUFFER_POOL_H
3+
4+
#include <aws/io/future.h>
5+
#include <aws/s3/s3.h>
6+
7+
/**
8+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
9+
* SPDX-License-Identifier: Apache-2.0.
10+
*/
11+
AWS_PUSH_SANE_WARNING_LEVEL
12+
AWS_EXTERN_C_BEGIN
13+
struct aws_s3_buffer_ticket;
14+
15+
/**
16+
* aws_future<aws_s3_buffer_ticket*>
17+
*/
18+
AWS_FUTURE_T_POINTER_WITH_RELEASE_DECLARATION(aws_future_s3_buffer_ticket, struct aws_s3_buffer_ticket, AWS_S3_API)
19+
20+
struct aws_s3_buffer_pool_reserve_meta {
21+
struct aws_s3_client *client;
22+
struct aws_s3_meta_request *meta_request;
23+
size_t size;
24+
bool can_block;
25+
};
26+
27+
struct aws_s3_buffer_pool {
28+
void (*destroy)(struct aws_s3_buffer_pool *pool);
29+
30+
struct aws_future_s3_buffer_ticket *(*reserve)(struct aws_s3_buffer_pool *pool,
31+
struct aws_s3_buffer_pool_reserve_meta meta);
32+
struct aws_byte_buf (*claim)(struct aws_s3_buffer_pool *pool, struct aws_s3_buffer_ticket *ticket);
33+
void (*release)(struct aws_s3_buffer_pool *pool, struct aws_s3_buffer_ticket *ticket);
34+
35+
void (*trim)(struct aws_s3_buffer_pool *pool);
36+
37+
struct aws_allocator *allocator;
38+
void *user_data;
39+
};
40+
41+
typedef struct aws_s3_buffer_pool *(aws_s3_buffer_pool_factory_fn)(struct aws_allocator *allocator,
42+
uint64_t part_size,
43+
uint64_t mem_limit);
44+
45+
AWS_EXTERN_C_END
46+
AWS_POP_SANE_WARNING_LEVEL
47+
48+
#endif /* AWS_S3_BUFFER_POOL_H */

include/aws/s3/s3_client.h

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <aws/common/ref_count.h>
1111
#include <aws/io/retry_strategy.h>
1212
#include <aws/s3/s3.h>
13+
#include <aws/s3/s3_buffer_pool.h>
1314

1415
AWS_PUSH_SANE_WARNING_LEVEL
1516

@@ -373,25 +374,6 @@ typedef struct aws_s3express_credentials_provider *(
373374
void *shutdown_user_data,
374375
void *factory_user_data);
375376

376-
struct aws_s3_buffer_ticket;
377-
378-
struct aws_s3_buffer_pool {
379-
void (*destroy)(struct aws_s3_buffer_pool *pool);
380-
381-
struct aws_s3_buffer_ticket *(*reserve)(struct aws_s3_buffer_pool *pool, size_t size);
382-
struct aws_byte_buf (*claim)(struct aws_s3_buffer_pool *pool, struct aws_s3_buffer_ticket *ticket);
383-
void (*release)(struct aws_s3_buffer_pool *pool, struct aws_s3_buffer_ticket *ticket);
384-
385-
void (*trim)(struct aws_s3_buffer_pool *pool);
386-
387-
struct aws_allocator *allocator;
388-
void *user_data;
389-
};
390-
391-
typedef struct aws_s3_buffer_pool *(aws_s3_buffer_pool_factory_fn)(struct aws_allocator *allocator,
392-
uint64_t part_size,
393-
uint64_t mem_limit);
394-
395377
/* Keepalive properties are TCP only.
396378
* If interval or timeout are zero, then default values are used.
397379
*/

source/s3.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ static struct aws_error_info s_errors[] = {
5353
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_RECV_FILE_NOT_FOUND, "The receive file doesn't exist, cannot create as configuration required."),
5454
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIMEOUT, "RequestTimeout error received from S3."),
5555
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_TOKEN_EXPIRED, "Token expired - needs a refresh."),
56+
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED, "Could not reserve memory for object."),
5657
};
5758
/* clang-format on */
5859

source/s3_auto_ranged_get.c

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ static bool s_s3_auto_ranged_get_update(
225225
auto_ranged_get->synced_data.num_parts_requested > 0) {
226226
goto has_work_remaining;
227227
}
228-
struct aws_s3_buffer_ticket *ticket = NULL;
229228
switch (s_s3_get_request_type_for_discovering_object_size(meta_request)) {
230229
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_HEAD_OBJECT:
231230
AWS_LOGF_INFO(
@@ -246,20 +245,13 @@ static bool s_s3_auto_ranged_get_update(
246245
"id=%p: Doing a 'GET_OBJECT_WITH_PART_NUMBER_1' to discover the size of the object and get "
247246
"the first part",
248247
(void *)meta_request);
249-
ticket = meta_request->client->buffer_pool->reserve(
250-
meta_request->client->buffer_pool, meta_request->part_size);
251-
252-
if (ticket == NULL) {
253-
goto has_work_remaining;
254-
}
255248

256249
request = aws_s3_request_new(
257250
meta_request,
258251
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_OBJECT_WITH_PART_NUMBER_1,
259252
AWS_S3_REQUEST_TYPE_GET_OBJECT,
260253
1 /*part_number*/,
261-
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);
262-
request->ticket = ticket;
254+
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL);
263255
++auto_ranged_get->synced_data.num_parts_requested;
264256

265257
break;
@@ -294,28 +286,12 @@ static bool s_s3_auto_ranged_get_update(
294286
"id=%p: Doing a ranged get to discover the size of the object and get the first part",
295287
(void *)meta_request);
296288

297-
if (first_part_size >= s_min_size_response_for_pooling) {
298-
/* Note: explicitly reserving the whole part size
299-
* even if expect to receive less data. Pool will
300-
* reserve the whole part size for it anyways, so no
301-
* reason getting a smaller chunk. */
302-
ticket = meta_request->client->buffer_pool->reserve(
303-
meta_request->client->buffer_pool, (size_t)meta_request->part_size);
304-
305-
if (ticket == NULL) {
306-
goto has_work_remaining;
307-
}
308-
} else {
309-
ticket = NULL;
310-
}
311-
312289
request = aws_s3_request_new(
313290
meta_request,
314291
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_OBJECT_WITH_RANGE,
315292
AWS_S3_REQUEST_TYPE_GET_OBJECT,
316293
1 /*part_number*/,
317-
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);
318-
request->ticket = ticket;
294+
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL);
319295
request->part_range_start = part_range_start;
320296
request->part_range_end = part_range_start + first_part_size - 1; /* range-end is inclusive */
321297
++auto_ranged_get->synced_data.num_parts_requested;
@@ -361,21 +337,12 @@ static bool s_s3_auto_ranged_get_update(
361337
auto_ranged_get->synced_data.read_window_warning_issued = 0;
362338
}
363339

364-
struct aws_s3_buffer_ticket *ticket = meta_request->client->buffer_pool->reserve(
365-
meta_request->client->buffer_pool, meta_request->part_size);
366-
367-
if (ticket == NULL) {
368-
goto has_work_remaining;
369-
}
370-
371340
request = aws_s3_request_new(
372341
meta_request,
373342
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_OBJECT_WITH_RANGE,
374343
AWS_S3_REQUEST_TYPE_GET_OBJECT,
375344
auto_ranged_get->synced_data.num_parts_requested + 1 /*part_number*/,
376-
AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);
377-
378-
request->ticket = ticket;
345+
AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL);
379346

380347
aws_s3_calculate_auto_ranged_get_part_range(
381348
auto_ranged_get->synced_data.object_range_start,
@@ -530,11 +497,11 @@ static struct aws_future_void *s_s3_auto_ranged_get_prepare_request(struct aws_s
530497
/* Success! */
531498
AWS_LOGF_DEBUG(
532499
AWS_LS_S3_META_REQUEST,
533-
"id=%p: Created request %p for part %d part sized %d",
500+
"id=%p: Created request %p for part %d allocated from pool %d",
534501
(void *)meta_request,
535502
(void *)request,
536503
request->part_number,
537-
request->has_part_size_response_body);
504+
request->should_allocate_buffer_from_pool);
538505

539506
success = true;
540507

source/s3_auto_ranged_put.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -567,20 +567,21 @@ static bool s_s3_auto_ranged_put_update(
567567
AWS_FATAL_ASSERT(meta_request->synced_data.async_write.buffered_data_ticket);
568568
ticket = meta_request->synced_data.async_write.buffered_data_ticket;
569569
meta_request->synced_data.async_write.buffered_data_ticket = NULL;
570-
} else {
571-
/* Try to reserve a ticket */
572-
ticket = meta_request->client->buffer_pool->reserve(
573-
meta_request->client->buffer_pool, meta_request->part_size);
574570
}
575571

576572
if (ticket != NULL) {
577573
/* Allocate a request for another part. */
574+
uint32_t flags = AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS;
575+
if (meta_request->synced_data.async_write.ready_to_send) {
576+
flags |= AWS_S3_REQUEST_FLAG_ALLOCATE_BUFFER_FROM_POOL;
577+
}
578+
578579
request = aws_s3_request_new(
579580
meta_request,
580581
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART,
581582
AWS_S3_REQUEST_TYPE_UPLOAD_PART,
582583
0 /*part_number*/,
583-
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY);
584+
flags);
584585

585586
request->part_number = auto_ranged_put->threaded_update_data.next_part_number;
586587

source/s3_client.c

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,11 @@ static void s_default_buffer_pool_destroy(struct aws_s3_buffer_pool *pool) {
232232
aws_mem_release(pool->allocator, pool);
233233
}
234234

235-
struct aws_s3_buffer_ticket *s_default_pool_reserve(struct aws_s3_buffer_pool *pool, size_t size) {
235+
struct aws_future_s3_buffer_ticket *s_default_pool_reserve(struct aws_s3_buffer_pool *pool,
236+
struct aws_s3_buffer_pool_reserve_meta meta) {
236237
struct aws_s3_default_buffer_pool *default_pool = (struct aws_s3_default_buffer_pool *)pool->user_data;
237238

238-
return aws_s3_default_buffer_pool_reserve(default_pool, size);
239+
return aws_s3_default_buffer_pool_reserve(default_pool, meta);
239240
}
240241

241242
struct aws_byte_buf s_default_pool_claim(struct aws_s3_buffer_pool *pool, struct aws_s3_buffer_ticket *ticket) {
@@ -1868,11 +1869,6 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
18681869

18691870
const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);
18701871

1871-
/*
1872-
Do we still need it?
1873-
aws_s3_buffer_pool_remove_reservation_hold(client->buffer_pool);
1874-
*/
1875-
18761872
for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {
18771873

18781874
/**

0 commit comments

Comments
 (0)