Skip to content
70 changes: 56 additions & 14 deletions source/s3_default_buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ struct aws_s3_default_buffer_pool {
struct s3_pending_reserve {
struct aws_linked_list_node node;
struct aws_future_s3_buffer_ticket *ticket_future;
struct aws_s3_default_buffer_ticket *ticket;
struct aws_s3_buffer_pool_reserve_meta meta;
};

Expand Down Expand Up @@ -292,12 +293,13 @@ void aws_s3_default_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool_w

aws_array_list_clean_up(&buffer_pool->blocks);

for (struct aws_linked_list_node *node = aws_linked_list_begin(&buffer_pool->pending_reserves);
node != aws_linked_list_end(&buffer_pool->pending_reserves);
node = aws_linked_list_next(node)) {
while (!aws_linked_list_empty(&buffer_pool->pending_reserves)) {
struct aws_linked_list_node *node = aws_linked_list_front(&buffer_pool->pending_reserves);
struct s3_pending_reserve *pending = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
AWS_FATAL_ASSERT(aws_future_s3_buffer_ticket_is_done(pending->ticket_future));
aws_future_s3_buffer_ticket_release(pending->ticket_future);
aws_linked_list_remove(node);
aws_linked_list_pop_front(&buffer_pool->pending_reserves);
aws_mem_release(buffer_pool->base_allocator, pending);
}

aws_mutex_clean_up(&buffer_pool->mutex);
Expand Down Expand Up @@ -389,25 +391,65 @@ static void s_aws_ticket_wrapper_destroy(void *data) {
aws_mem_release(buffer_pool->base_allocator, ticket);
aws_mem_release(buffer_pool->base_allocator, ticket_wrapper);

struct aws_future_s3_buffer_ticket *pending_ticket_future = NULL;
struct aws_linked_list pending_reserves_to_remove;
aws_linked_list_init(&pending_reserves_to_remove);

if (!aws_linked_list_empty(&buffer_pool->pending_reserves)) {
struct aws_linked_list_node *node = aws_linked_list_front(&buffer_pool->pending_reserves);
struct aws_linked_list pending_reserves_to_complete;
aws_linked_list_init(&pending_reserves_to_complete);

/* Capture all the pending reserves that are done (currently can only happen when request is canceled, which cancels
* pending futures) */
struct aws_linked_list_node *node = aws_linked_list_begin(&buffer_pool->pending_reserves);
while (node != aws_linked_list_end(&buffer_pool->pending_reserves)) {
struct s3_pending_reserve *pending_reserve = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
struct aws_linked_list_node *current_node = node;
node = aws_linked_list_next(node);
if (aws_future_s3_buffer_ticket_is_done(pending_reserve->ticket_future)) {
AWS_FATAL_ASSERT(aws_future_s3_buffer_ticket_get_error(pending_reserve->ticket_future) != AWS_OP_SUCCESS);
aws_linked_list_remove(current_node);
aws_linked_list_push_back(&pending_reserves_to_remove, current_node);
}
}

/* Capture all the pending reserves that can be completed. They will actually be completed once outside the mutex.
*/
while (!aws_linked_list_empty(&buffer_pool->pending_reserves)) {
node = aws_linked_list_front(&buffer_pool->pending_reserves);
struct s3_pending_reserve *pending_reserve = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);

struct aws_s3_default_buffer_ticket *new_ticket = s_try_reserve(pool, pending_reserve->meta);
pending_reserve->ticket = s_try_reserve(pool, pending_reserve->meta);

if (new_ticket != NULL) {
struct aws_s3_buffer_ticket *new_ticket_wrapper = s_wrap_default_ticket(new_ticket);
pending_ticket_future = pending_reserve->ticket_future;
aws_future_s3_buffer_ticket_set_result_by_move(pending_ticket_future, &new_ticket_wrapper);
if (pending_reserve->ticket != NULL) {
aws_linked_list_pop_front(&buffer_pool->pending_reserves);
aws_mem_release(buffer_pool->base_allocator, pending_reserve);
aws_linked_list_push_back(&pending_reserves_to_complete, node);
} else {
break;
}
}

aws_mutex_unlock(&buffer_pool->mutex);
aws_future_s3_buffer_ticket_release(pending_ticket_future);

/* release completed pending nodes outside of lock to avoid any deadlocks */
while (!aws_linked_list_empty(&pending_reserves_to_remove)) {
node = aws_linked_list_front(&pending_reserves_to_remove);
struct s3_pending_reserve *pending = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
aws_future_s3_buffer_ticket_release(pending->ticket_future);
aws_linked_list_pop_front(&pending_reserves_to_remove);
aws_mem_release(buffer_pool->base_allocator, pending);
}

/* fill the next pending future */
while (!aws_linked_list_empty(&pending_reserves_to_complete)) {
node = aws_linked_list_front(&pending_reserves_to_complete);
struct s3_pending_reserve *pending = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);

struct aws_s3_buffer_ticket *new_ticket_wrapper = s_wrap_default_ticket(pending->ticket);
aws_future_s3_buffer_ticket_set_result_by_move(pending->ticket_future, &new_ticket_wrapper);

aws_future_s3_buffer_ticket_release(pending->ticket_future);
aws_linked_list_pop_front(&pending_reserves_to_complete);
aws_mem_release(buffer_pool->base_allocator, pending);
}
}

struct aws_s3_default_buffer_ticket *s_try_reserve(
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ add_test_case(test_s3_buffer_pool_forced_buffer)
add_test_case(test_s3_buffer_pool_forced_buffer_after_limit_hit)
add_test_case(test_s3_buffer_pool_forced_buffer_wont_stop_reservations)
add_test_case(test_s3_buffer_pool_reserve_over_limit_instant_release)
add_test_case(test_s3_buffer_pool_reserve_over_limit_cancel)
add_test_case(test_s3_buffer_pool_reserve_over_limit_multi)

add_net_test_case(client_update_upload_part_timeout)
add_net_test_case(client_meta_request_override_part_size)
Expand Down
128 changes: 128 additions & 0 deletions tests/s3_default_buffer_pool_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,74 @@ static int s_test_s3_buffer_pool_reserve_over_limit(struct aws_allocator *alloca
};
AWS_TEST_CASE(test_s3_buffer_pool_reserve_over_limit, s_test_s3_buffer_pool_reserve_over_limit)

/* test that one big ticket release can complete several pending reserves*/
static int s_test_s3_buffer_pool_reserve_over_limit_multi(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_default_buffer_pool_new(
allocator, (struct aws_s3_buffer_pool_config){.part_size = MB_TO_BYTES(8), .memory_limit = GB_TO_BYTES(1)});

struct aws_s3_buffer_ticket *tickets[109];
struct aws_future_s3_buffer_ticket *ticket_futures[109];

ticket_futures[0] = aws_s3_default_buffer_pool_reserve(
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(32)});
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(ticket_futures[0]));
ASSERT_INT_EQUALS(aws_future_s3_buffer_ticket_get_error(ticket_futures[0]), AWS_OP_SUCCESS);
tickets[0] = aws_future_s3_buffer_ticket_get_result_by_move(ticket_futures[0]);
struct aws_byte_buf buf0 = aws_s3_buffer_ticket_claim(tickets[0]);
ASSERT_NOT_NULL(buf0.buffer);

for (size_t i = 1; i < 109; ++i) {
ticket_futures[i] = aws_s3_default_buffer_pool_reserve(
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(ticket_futures[i]));
ASSERT_INT_EQUALS(aws_future_s3_buffer_ticket_get_error(ticket_futures[i]), AWS_OP_SUCCESS);
tickets[i] = aws_future_s3_buffer_ticket_get_result_by_move(ticket_futures[i]);
struct aws_byte_buf buf = aws_s3_buffer_ticket_claim(tickets[i]);
ASSERT_NOT_NULL(buf.buffer);
}

struct aws_future_s3_buffer_ticket *over_future1 = aws_s3_default_buffer_pool_reserve(
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});

ASSERT_FALSE(aws_future_s3_buffer_ticket_is_done(over_future1));
struct s_reserve_state state1 = {.future = over_future1};
aws_future_s3_buffer_ticket_register_callback(over_future1, s_on_pool_buffer_reserved, &state1);

struct aws_future_s3_buffer_ticket *over_future2 = aws_s3_default_buffer_pool_reserve(
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});

ASSERT_FALSE(aws_future_s3_buffer_ticket_is_done(over_future2));
struct s_reserve_state state2 = {.future = over_future2};
aws_future_s3_buffer_ticket_register_callback(over_future2, s_on_pool_buffer_reserved, &state2);

/* Release big ticket */
aws_s3_buffer_ticket_release(tickets[0]);
aws_future_s3_buffer_ticket_release(ticket_futures[0]);

ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(over_future1));
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(over_future2));
ASSERT_NOT_NULL(state1.ticket);
ASSERT_NOT_NULL(state2.ticket);

for (size_t i = 1; i < 109; ++i) {
aws_s3_buffer_ticket_release(tickets[i]);
aws_future_s3_buffer_ticket_release(ticket_futures[i]);
}

aws_s3_buffer_ticket_release(state1.ticket);
aws_s3_buffer_ticket_release(state2.ticket);
aws_future_s3_buffer_ticket_release(over_future1);
aws_future_s3_buffer_ticket_release(over_future2);

aws_s3_default_buffer_pool_destroy(buffer_pool);

return 0;
};
AWS_TEST_CASE(test_s3_buffer_pool_reserve_over_limit_multi, s_test_s3_buffer_pool_reserve_over_limit_multi)

static void s_on_pool_buffer_reserved_instant_release(void *user_data) {
struct s_reserve_state *state = user_data;

Expand Down Expand Up @@ -324,6 +392,66 @@ AWS_TEST_CASE(
test_s3_buffer_pool_reserve_over_limit_instant_release,
s_test_s3_buffer_pool_reserve_over_limit_instant_release)

static void s_on_pool_buffer_reserved_cancel(void *user_data) {
struct s_reserve_state *state = user_data;

if (aws_future_s3_buffer_ticket_get_error(state->future) == AWS_OP_SUCCESS) {
state->ticket = aws_future_s3_buffer_ticket_get_result_by_move(state->future);
}
}

/* make sure that cancelling pending futures does not break the pool */
static int s_test_s3_buffer_pool_reserve_over_limit_cancel(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
(void)ctx;

struct aws_s3_buffer_pool *buffer_pool = aws_s3_default_buffer_pool_new(
allocator, (struct aws_s3_buffer_pool_config){.part_size = MB_TO_BYTES(8), .memory_limit = GB_TO_BYTES(1)});

struct aws_s3_buffer_ticket *tickets[112];
struct aws_future_s3_buffer_ticket *ticket_futures[112];
for (size_t i = 0; i < 112; ++i) {
ticket_futures[i] = aws_s3_default_buffer_pool_reserve(
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(ticket_futures[i]));
ASSERT_INT_EQUALS(aws_future_s3_buffer_ticket_get_error(ticket_futures[i]), AWS_OP_SUCCESS);
tickets[i] = aws_future_s3_buffer_ticket_get_result_by_move(ticket_futures[i]);
struct aws_byte_buf buf = aws_s3_buffer_ticket_claim(tickets[i]);
ASSERT_NOT_NULL(buf.buffer);
}

struct aws_future_s3_buffer_ticket *over_future = aws_s3_default_buffer_pool_reserve(
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});

ASSERT_FALSE(aws_future_s3_buffer_ticket_is_done(over_future));

struct s_reserve_state state = {.future = over_future};

aws_future_s3_buffer_ticket_register_callback(over_future, s_on_pool_buffer_reserved_cancel, &state);

/* simulate request cancel by erroring out the future */
aws_future_s3_buffer_ticket_set_error(over_future, AWS_ERROR_S3_CANCELED);

for (size_t i = 0; i < 112; ++i) {

aws_s3_buffer_ticket_release(tickets[i]);

aws_future_s3_buffer_ticket_release(ticket_futures[i]);
}

/* make sure that errored future was never filled */
ASSERT_NULL(state.ticket);

aws_s3_buffer_ticket_release(state.ticket);

aws_future_s3_buffer_ticket_release(over_future);

aws_s3_default_buffer_pool_destroy(buffer_pool);

return 0;
};
AWS_TEST_CASE(test_s3_buffer_pool_reserve_over_limit_cancel, s_test_s3_buffer_pool_reserve_over_limit_cancel)

static int s_test_s3_buffer_pool_too_small(struct aws_allocator *allocator, void *ctx) {
(void)allocator;
(void)ctx;
Expand Down