Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ struct aws_s3_request {
* retried.*/
struct aws_byte_buf request_body;

/**
* Ticket to acquire the buffer.
*/
struct aws_s3_buffer_ticket *ticket;

/* Beginning range of this part. */
Expand Down
3 changes: 3 additions & 0 deletions include/aws/s3/s3_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
* Note: in some cases pipeline can stall if new buffer cannot be allocated (ex. async writes flow).
* In this case reserve request will indicate that not granting the ticket can block and buffer pool should try to
* allocate ticket right away (or wait and call waker when mem is allocated for the case of async writes).
* Note for custom pool implementations: Scheduler keeps track of all outstanding futures and will error them out when
* request is paused or cancelled. Its still fine for memory pool implementation to deliver ticket (it will just be
* released by future right away with no side effects) or just ignore the future if its already in error state.
*/

AWS_PUSH_SANE_WARNING_LEVEL
Expand Down
8 changes: 7 additions & 1 deletion source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,9 @@ static void s_s3_prepare_acquire_mem_callback_and_destroy(

/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
aws_linked_list_remove(&payload->request->pending_buffer_future_list_node);
payload->request->synced_data.buffer_future =
aws_future_s3_buffer_ticket_release(payload->request->synced_data.buffer_future);
aws_s3_meta_request_set_fail_synced(meta_request, payload->request, error_code);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
Expand Down Expand Up @@ -1909,7 +1912,10 @@ void s_acquire_mem_and_prepare_request(
}
/* END CRITICAL SECTION */

aws_future_s3_buffer_ticket_register_callback(payload->buffer_future, s_on_pool_buffer_reserved, payload);
/* Note: run callback async on event loop. this is done to avoid any deadlocks between cancelling
which requires a lock on meta request and buffer acquire callback which also requires a lock. */
aws_future_s3_buffer_ticket_register_event_loop_callback(
payload->buffer_future, meta_request->io_event_loop, s_on_pool_buffer_reserved, payload);
return;
}

Expand Down
3 changes: 2 additions & 1 deletion source/s3_default_buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ void s_default_pool_trim(struct aws_s3_buffer_pool *pool) {

static struct aws_s3_buffer_pool_vtable s_default_pool_vtable = {
.reserve = s_default_pool_reserve,
.trim = s_default_pool_trim};
.trim = s_default_pool_trim,
};

struct aws_s3_buffer_pool *aws_s3_default_buffer_pool_new(
struct aws_allocator *allocator,
Expand Down
9 changes: 4 additions & 5 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1832,13 +1832,12 @@ void aws_s3_meta_request_cancel_pending_buffer_futures_synced(
struct aws_s3_meta_request *meta_request,
int error_code) {
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
while (!aws_linked_list_empty(&meta_request->synced_data.pending_buffer_futures)) {

struct aws_linked_list_node *request_node =
aws_linked_list_pop_front(&meta_request->synced_data.pending_buffer_futures);
for (struct aws_linked_list_node *node = aws_linked_list_begin(&meta_request->synced_data.pending_buffer_futures);
node != aws_linked_list_end(&meta_request->synced_data.pending_buffer_futures);
node = aws_linked_list_next(node)) {

struct aws_s3_request *request =
AWS_CONTAINER_OF(request_node, struct aws_s3_request, pending_buffer_future_list_node);
struct aws_s3_request *request = AWS_CONTAINER_OF(node, struct aws_s3_request, pending_buffer_future_list_node);

aws_future_s3_buffer_ticket_set_error(request->synced_data.buffer_future, error_code);
}
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ add_test_case(test_s3_buffer_pool_trim)
add_test_case(test_s3_buffer_pool_reserve_over_limit)
add_test_case(test_s3_buffer_pool_too_small)
add_net_test_case(test_s3_put_object_buffer_pool_trim)
add_net_test_case(test_s3_put_object_buffer_acquire_error)
add_test_case(test_s3_buffer_pool_forced_buffer)
add_test_case(test_s3_buffer_pool_forced_buffer_after_limit_hit)
add_test_case(test_s3_buffer_pool_forced_buffer_wont_stop_reservations)
Expand Down
85 changes: 85 additions & 0 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -3350,6 +3350,91 @@ static int s_test_s3_put_object_sse_c_aes256_multipart_with_checksum(struct aws_
return 0;
}

struct aws_future_s3_buffer_ticket *s_failing_pool_reserve(
struct aws_s3_buffer_pool *pool,
struct aws_s3_buffer_pool_reserve_meta meta) {
(void)meta;

struct aws_future_s3_buffer_ticket *future = aws_future_s3_buffer_ticket_new((struct aws_allocator *)pool->impl);
aws_future_s3_buffer_ticket_set_error(future, AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED);
return future;
}

void s_failing_pool_trim(struct aws_s3_buffer_pool *pool) {
(void)pool;
}

static struct aws_s3_buffer_pool_vtable s_failing_pool_vtable = {
.reserve = s_failing_pool_reserve,
.trim = s_failing_pool_trim,
};

void s_failing_pool_destroy(struct aws_s3_buffer_pool *buffer_pool_wrapper) {
aws_mem_release((struct aws_allocator *)buffer_pool_wrapper->impl, buffer_pool_wrapper);
}

struct aws_s3_buffer_pool *s_always_error_buffer_pool_fn(
struct aws_allocator *allocator,
struct aws_s3_buffer_pool_config config) {
(void)config;
struct aws_s3_buffer_pool *pool = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_buffer_pool));
pool->impl = allocator;
pool->vtable = &s_failing_pool_vtable;
aws_ref_count_init(&pool->ref_count, pool, (aws_simple_completion_callback *)s_failing_pool_destroy);

return pool;
}

AWS_TEST_CASE(test_s3_put_object_buffer_acquire_error, s_test_s3_put_object_buffer_acquire_error)
static int s_test_s3_put_object_buffer_acquire_error(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_client_config client_config = {
.part_size = 8 * 1024 * 1024, .buffer_pool_factory_fn = s_always_error_buffer_pool_fn};

ASSERT_SUCCESS(aws_s3_tester_bind_client(
&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

ASSERT_TRUE(client != NULL);

struct aws_byte_buf path_buf;
AWS_ZERO_STRUCT(path_buf);

ASSERT_SUCCESS(aws_s3_tester_upload_file_path_init(
tester.allocator, &path_buf, aws_byte_cursor_from_c_str("/prefix/round_trip/test_acquire_buffer_error.txt")));

struct aws_byte_cursor object_path = aws_byte_cursor_from_buf(&path_buf);

struct aws_s3_tester_meta_request_options put_options = {
.allocator = allocator,
.meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.client = client,
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE,
.put_options =
{
.object_size_mb = 10,
.object_path_override = object_path,
},
};

struct aws_s3_meta_request_test_results meta_request_test_results;
aws_s3_meta_request_test_results_init(&meta_request_test_results, allocator);
ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &meta_request_test_results));
ASSERT_INT_EQUALS(AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED, meta_request_test_results.finished_error_code);
client = aws_s3_client_release(client);

aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);
aws_byte_buf_clean_up(&path_buf);
aws_s3_tester_clean_up(&tester);

return 0;
}

static int s_test_s3_put_object_content_md5_helper(
struct aws_allocator *allocator,
bool multipart_upload,
Expand Down
Loading