Skip to content

Commit 4025561

Browse files
authored
Merge branch 'main' into revamp-checksum
2 parents 15bf597 + 70aacd2 commit 4025561

File tree

3 files changed

+186
-14
lines changed

3 files changed

+186
-14
lines changed

source/s3_default_buffer_pool.c

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ struct aws_s3_default_buffer_pool {
109109
struct s3_pending_reserve {
110110
struct aws_linked_list_node node;
111111
struct aws_future_s3_buffer_ticket *ticket_future;
112+
struct aws_s3_default_buffer_ticket *ticket;
112113
struct aws_s3_buffer_pool_reserve_meta meta;
113114
};
114115

@@ -292,12 +293,13 @@ void aws_s3_default_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool_w
292293

293294
aws_array_list_clean_up(&buffer_pool->blocks);
294295

295-
for (struct aws_linked_list_node *node = aws_linked_list_begin(&buffer_pool->pending_reserves);
296-
node != aws_linked_list_end(&buffer_pool->pending_reserves);
297-
node = aws_linked_list_next(node)) {
296+
while (!aws_linked_list_empty(&buffer_pool->pending_reserves)) {
297+
struct aws_linked_list_node *node = aws_linked_list_front(&buffer_pool->pending_reserves);
298298
struct s3_pending_reserve *pending = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
299+
AWS_FATAL_ASSERT(aws_future_s3_buffer_ticket_is_done(pending->ticket_future));
299300
aws_future_s3_buffer_ticket_release(pending->ticket_future);
300-
aws_linked_list_remove(node);
301+
aws_linked_list_pop_front(&buffer_pool->pending_reserves);
302+
aws_mem_release(buffer_pool->base_allocator, pending);
301303
}
302304

303305
aws_mutex_clean_up(&buffer_pool->mutex);
@@ -389,25 +391,65 @@ static void s_aws_ticket_wrapper_destroy(void *data) {
389391
aws_mem_release(buffer_pool->base_allocator, ticket);
390392
aws_mem_release(buffer_pool->base_allocator, ticket_wrapper);
391393

392-
struct aws_future_s3_buffer_ticket *pending_ticket_future = NULL;
394+
struct aws_linked_list pending_reserves_to_remove;
395+
aws_linked_list_init(&pending_reserves_to_remove);
393396

394-
if (!aws_linked_list_empty(&buffer_pool->pending_reserves)) {
395-
struct aws_linked_list_node *node = aws_linked_list_front(&buffer_pool->pending_reserves);
397+
struct aws_linked_list pending_reserves_to_complete;
398+
aws_linked_list_init(&pending_reserves_to_complete);
399+
400+
/* Capture all the pending reserves that are done (currently can only happen when request is canceled, which cancels
401+
* pending futures) */
402+
struct aws_linked_list_node *node = aws_linked_list_begin(&buffer_pool->pending_reserves);
403+
while (node != aws_linked_list_end(&buffer_pool->pending_reserves)) {
404+
struct s3_pending_reserve *pending_reserve = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
405+
struct aws_linked_list_node *current_node = node;
406+
node = aws_linked_list_next(node);
407+
if (aws_future_s3_buffer_ticket_is_done(pending_reserve->ticket_future)) {
408+
AWS_FATAL_ASSERT(aws_future_s3_buffer_ticket_get_error(pending_reserve->ticket_future) != AWS_OP_SUCCESS);
409+
aws_linked_list_remove(current_node);
410+
aws_linked_list_push_back(&pending_reserves_to_remove, current_node);
411+
}
412+
}
413+
414+
/* Capture all the pending reserves that can be completed. They will actually be completed once outside the mutex.
415+
*/
416+
while (!aws_linked_list_empty(&buffer_pool->pending_reserves)) {
417+
node = aws_linked_list_front(&buffer_pool->pending_reserves);
396418
struct s3_pending_reserve *pending_reserve = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
397419

398-
struct aws_s3_default_buffer_ticket *new_ticket = s_try_reserve(pool, pending_reserve->meta);
420+
pending_reserve->ticket = s_try_reserve(pool, pending_reserve->meta);
399421

400-
if (new_ticket != NULL) {
401-
struct aws_s3_buffer_ticket *new_ticket_wrapper = s_wrap_default_ticket(new_ticket);
402-
pending_ticket_future = pending_reserve->ticket_future;
403-
aws_future_s3_buffer_ticket_set_result_by_move(pending_ticket_future, &new_ticket_wrapper);
422+
if (pending_reserve->ticket != NULL) {
404423
aws_linked_list_pop_front(&buffer_pool->pending_reserves);
405-
aws_mem_release(buffer_pool->base_allocator, pending_reserve);
424+
aws_linked_list_push_back(&pending_reserves_to_complete, node);
425+
} else {
426+
break;
406427
}
407428
}
408429

409430
aws_mutex_unlock(&buffer_pool->mutex);
410-
aws_future_s3_buffer_ticket_release(pending_ticket_future);
431+
432+
/* release completed pending nodes outside of lock to avoid any deadlocks */
433+
while (!aws_linked_list_empty(&pending_reserves_to_remove)) {
434+
node = aws_linked_list_front(&pending_reserves_to_remove);
435+
struct s3_pending_reserve *pending = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
436+
aws_future_s3_buffer_ticket_release(pending->ticket_future);
437+
aws_linked_list_pop_front(&pending_reserves_to_remove);
438+
aws_mem_release(buffer_pool->base_allocator, pending);
439+
}
440+
441+
/* fill the next pending future */
442+
while (!aws_linked_list_empty(&pending_reserves_to_complete)) {
443+
node = aws_linked_list_front(&pending_reserves_to_complete);
444+
struct s3_pending_reserve *pending = AWS_CONTAINER_OF(node, struct s3_pending_reserve, node);
445+
446+
struct aws_s3_buffer_ticket *new_ticket_wrapper = s_wrap_default_ticket(pending->ticket);
447+
aws_future_s3_buffer_ticket_set_result_by_move(pending->ticket_future, &new_ticket_wrapper);
448+
449+
aws_future_s3_buffer_ticket_release(pending->ticket_future);
450+
aws_linked_list_pop_front(&pending_reserves_to_complete);
451+
aws_mem_release(buffer_pool->base_allocator, pending);
452+
}
411453
}
412454

413455
struct aws_s3_default_buffer_ticket *s_try_reserve(

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,8 @@ add_test_case(test_s3_buffer_pool_forced_buffer)
404404
add_test_case(test_s3_buffer_pool_forced_buffer_after_limit_hit)
405405
add_test_case(test_s3_buffer_pool_forced_buffer_wont_stop_reservations)
406406
add_test_case(test_s3_buffer_pool_reserve_over_limit_instant_release)
407+
add_test_case(test_s3_buffer_pool_reserve_over_limit_cancel)
408+
add_test_case(test_s3_buffer_pool_reserve_over_limit_multi)
407409

408410
add_net_test_case(client_update_upload_part_timeout)
409411
add_net_test_case(client_meta_request_override_part_size)

tests/s3_default_buffer_pool_tests.c

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,74 @@ static int s_test_s3_buffer_pool_reserve_over_limit(struct aws_allocator *alloca
268268
};
269269
AWS_TEST_CASE(test_s3_buffer_pool_reserve_over_limit, s_test_s3_buffer_pool_reserve_over_limit)
270270

271+
/* test that one big ticket release can complete several pending reserves*/
272+
static int s_test_s3_buffer_pool_reserve_over_limit_multi(struct aws_allocator *allocator, void *ctx) {
273+
(void)allocator;
274+
(void)ctx;
275+
276+
struct aws_s3_buffer_pool *buffer_pool = aws_s3_default_buffer_pool_new(
277+
allocator, (struct aws_s3_buffer_pool_config){.part_size = MB_TO_BYTES(8), .memory_limit = GB_TO_BYTES(1)});
278+
279+
struct aws_s3_buffer_ticket *tickets[109];
280+
struct aws_future_s3_buffer_ticket *ticket_futures[109];
281+
282+
ticket_futures[0] = aws_s3_default_buffer_pool_reserve(
283+
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(32)});
284+
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(ticket_futures[0]));
285+
ASSERT_INT_EQUALS(aws_future_s3_buffer_ticket_get_error(ticket_futures[0]), AWS_OP_SUCCESS);
286+
tickets[0] = aws_future_s3_buffer_ticket_get_result_by_move(ticket_futures[0]);
287+
struct aws_byte_buf buf0 = aws_s3_buffer_ticket_claim(tickets[0]);
288+
ASSERT_NOT_NULL(buf0.buffer);
289+
290+
for (size_t i = 1; i < 109; ++i) {
291+
ticket_futures[i] = aws_s3_default_buffer_pool_reserve(
292+
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
293+
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(ticket_futures[i]));
294+
ASSERT_INT_EQUALS(aws_future_s3_buffer_ticket_get_error(ticket_futures[i]), AWS_OP_SUCCESS);
295+
tickets[i] = aws_future_s3_buffer_ticket_get_result_by_move(ticket_futures[i]);
296+
struct aws_byte_buf buf = aws_s3_buffer_ticket_claim(tickets[i]);
297+
ASSERT_NOT_NULL(buf.buffer);
298+
}
299+
300+
struct aws_future_s3_buffer_ticket *over_future1 = aws_s3_default_buffer_pool_reserve(
301+
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
302+
303+
ASSERT_FALSE(aws_future_s3_buffer_ticket_is_done(over_future1));
304+
struct s_reserve_state state1 = {.future = over_future1};
305+
aws_future_s3_buffer_ticket_register_callback(over_future1, s_on_pool_buffer_reserved, &state1);
306+
307+
struct aws_future_s3_buffer_ticket *over_future2 = aws_s3_default_buffer_pool_reserve(
308+
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
309+
310+
ASSERT_FALSE(aws_future_s3_buffer_ticket_is_done(over_future2));
311+
struct s_reserve_state state2 = {.future = over_future2};
312+
aws_future_s3_buffer_ticket_register_callback(over_future2, s_on_pool_buffer_reserved, &state2);
313+
314+
/* Release big ticket */
315+
aws_s3_buffer_ticket_release(tickets[0]);
316+
aws_future_s3_buffer_ticket_release(ticket_futures[0]);
317+
318+
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(over_future1));
319+
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(over_future2));
320+
ASSERT_NOT_NULL(state1.ticket);
321+
ASSERT_NOT_NULL(state2.ticket);
322+
323+
for (size_t i = 1; i < 109; ++i) {
324+
aws_s3_buffer_ticket_release(tickets[i]);
325+
aws_future_s3_buffer_ticket_release(ticket_futures[i]);
326+
}
327+
328+
aws_s3_buffer_ticket_release(state1.ticket);
329+
aws_s3_buffer_ticket_release(state2.ticket);
330+
aws_future_s3_buffer_ticket_release(over_future1);
331+
aws_future_s3_buffer_ticket_release(over_future2);
332+
333+
aws_s3_default_buffer_pool_destroy(buffer_pool);
334+
335+
return 0;
336+
};
337+
AWS_TEST_CASE(test_s3_buffer_pool_reserve_over_limit_multi, s_test_s3_buffer_pool_reserve_over_limit_multi)
338+
271339
static void s_on_pool_buffer_reserved_instant_release(void *user_data) {
272340
struct s_reserve_state *state = user_data;
273341

@@ -324,6 +392,66 @@ AWS_TEST_CASE(
324392
test_s3_buffer_pool_reserve_over_limit_instant_release,
325393
s_test_s3_buffer_pool_reserve_over_limit_instant_release)
326394

395+
static void s_on_pool_buffer_reserved_cancel(void *user_data) {
396+
struct s_reserve_state *state = user_data;
397+
398+
if (aws_future_s3_buffer_ticket_get_error(state->future) == AWS_OP_SUCCESS) {
399+
state->ticket = aws_future_s3_buffer_ticket_get_result_by_move(state->future);
400+
}
401+
}
402+
403+
/* make sure that cancelling pending futures does not break the pool */
404+
static int s_test_s3_buffer_pool_reserve_over_limit_cancel(struct aws_allocator *allocator, void *ctx) {
405+
(void)allocator;
406+
(void)ctx;
407+
408+
struct aws_s3_buffer_pool *buffer_pool = aws_s3_default_buffer_pool_new(
409+
allocator, (struct aws_s3_buffer_pool_config){.part_size = MB_TO_BYTES(8), .memory_limit = GB_TO_BYTES(1)});
410+
411+
struct aws_s3_buffer_ticket *tickets[112];
412+
struct aws_future_s3_buffer_ticket *ticket_futures[112];
413+
for (size_t i = 0; i < 112; ++i) {
414+
ticket_futures[i] = aws_s3_default_buffer_pool_reserve(
415+
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
416+
ASSERT_TRUE(aws_future_s3_buffer_ticket_is_done(ticket_futures[i]));
417+
ASSERT_INT_EQUALS(aws_future_s3_buffer_ticket_get_error(ticket_futures[i]), AWS_OP_SUCCESS);
418+
tickets[i] = aws_future_s3_buffer_ticket_get_result_by_move(ticket_futures[i]);
419+
struct aws_byte_buf buf = aws_s3_buffer_ticket_claim(tickets[i]);
420+
ASSERT_NOT_NULL(buf.buffer);
421+
}
422+
423+
struct aws_future_s3_buffer_ticket *over_future = aws_s3_default_buffer_pool_reserve(
424+
buffer_pool, (struct aws_s3_buffer_pool_reserve_meta){.size = MB_TO_BYTES(8)});
425+
426+
ASSERT_FALSE(aws_future_s3_buffer_ticket_is_done(over_future));
427+
428+
struct s_reserve_state state = {.future = over_future};
429+
430+
aws_future_s3_buffer_ticket_register_callback(over_future, s_on_pool_buffer_reserved_cancel, &state);
431+
432+
/* simulate request cancel by erroring out the future */
433+
aws_future_s3_buffer_ticket_set_error(over_future, AWS_ERROR_S3_CANCELED);
434+
435+
for (size_t i = 0; i < 112; ++i) {
436+
437+
aws_s3_buffer_ticket_release(tickets[i]);
438+
439+
aws_future_s3_buffer_ticket_release(ticket_futures[i]);
440+
}
441+
442+
/* make sure that errored future was never filled */
443+
ASSERT_NULL(state.ticket);
444+
445+
aws_s3_buffer_ticket_release(state.ticket);
446+
447+
aws_future_s3_buffer_ticket_release(over_future);
448+
449+
aws_s3_default_buffer_pool_destroy(buffer_pool);
450+
451+
return 0;
452+
};
453+
AWS_TEST_CASE(test_s3_buffer_pool_reserve_over_limit_cancel, s_test_s3_buffer_pool_reserve_over_limit_cancel)
454+
327455
static int s_test_s3_buffer_pool_too_small(struct aws_allocator *allocator, void *ctx) {
328456
(void)allocator;
329457
(void)ctx;

0 commit comments

Comments
 (0)