Skip to content

Commit b8155a3

Browse files
move reserve earlier
1 parent b41d6d0 commit b8155a3

File tree

3 files changed

+115
-78
lines changed

3 files changed

+115
-78
lines changed

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ typedef void(aws_s3_meta_request_prepare_request_callback_fn)(
4646
struct aws_s3_prepare_request_payload {
4747
struct aws_allocator *allocator;
4848
struct aws_s3_request *request;
49-
struct aws_event_loop *event_loop;
5049
struct aws_task task;
51-
struct aws_future_s3_buffer_ticket *async_buffer_reserve;
5250
/* async step: wait for vtable->prepare_request() call to complete */
5351
struct aws_future_void *asyncstep_prepare_request;
5452
/* callback to invoke when all request preparation work is complete */

source/s3_client.c

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,109 @@ static bool s_s3_client_should_update_meta_request(
17911791
return true;
17921792
}
17931793

1794+
struct aws_s3_reserve_memory_payload {
1795+
struct aws_allocator *allocator;
1796+
struct aws_s3_request *request;
1797+
struct aws_future_s3_buffer_ticket *buffer_future;
1798+
aws_s3_meta_request_prepare_request_callback_fn *callback;
1799+
void *user_data;
1800+
};
1801+
1802+
static void s_s3_prepare_acquire_mem_callback_and_destroy(
1803+
struct aws_s3_reserve_memory_payload *payload,
1804+
int error_code) {
1805+
AWS_PRECONDITION(payload);
1806+
AWS_PRECONDITION(payload->request);
1807+
1808+
struct aws_s3_meta_request *meta_request = payload->request->meta_request;
1809+
AWS_PRECONDITION(meta_request);
1810+
1811+
++payload->request->num_times_prepared;
1812+
1813+
if (error_code) {
1814+
AWS_LOGF_ERROR(
1815+
AWS_LS_S3_META_REQUEST,
1816+
"id=%p Could not prepare request %p due to error %d (%s).",
1817+
(void *)meta_request,
1818+
(void *)payload->request,
1819+
error_code,
1820+
aws_error_str(error_code));
1821+
1822+
/* BEGIN CRITICAL SECTION */
1823+
aws_s3_meta_request_lock_synced_data(meta_request);
1824+
aws_s3_meta_request_set_fail_synced(meta_request, payload->request, error_code);
1825+
aws_s3_meta_request_unlock_synced_data(meta_request);
1826+
/* END CRITICAL SECTION */
1827+
}
1828+
1829+
if (payload->callback != NULL) {
1830+
payload->callback(meta_request, payload->request, error_code, payload->user_data);
1831+
}
1832+
1833+
aws_future_s3_buffer_ticket_release(payload->buffer_future);
1834+
aws_mem_release(payload->allocator, payload);
1835+
}
1836+
1837+
static void s_on_pool_buffer_reserved(void *user_data) {
1838+
struct aws_s3_reserve_memory_payload *payload = user_data;
1839+
AWS_PRECONDITION(payload);
1840+
1841+
struct aws_s3_request *request = payload->request;
1842+
AWS_PRECONDITION(request);
1843+
1844+
struct aws_s3_meta_request *meta_request = request->meta_request;
1845+
AWS_PRECONDITION(meta_request);
1846+
1847+
struct aws_future_s3_buffer_ticket *future_ticket = payload->buffer_future;
1848+
1849+
int error_code = aws_future_s3_buffer_ticket_get_error(future_ticket);
1850+
if (error_code != AWS_ERROR_SUCCESS) {
1851+
AWS_LOGF_ERROR(
1852+
AWS_LS_S3_META_REQUEST,
1853+
"id=%p Could not allocate buffer for request with tag %d for the meta request.",
1854+
(void *)meta_request,
1855+
request->request_tag);
1856+
1857+
s_s3_prepare_acquire_mem_callback_and_destroy(payload, AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED);
1858+
return;
1859+
}
1860+
1861+
request->ticket = aws_future_s3_buffer_ticket_get_result_by_move(future_ticket);
1862+
1863+
aws_future_s3_buffer_ticket_release(payload->buffer_future);
1864+
aws_mem_release(payload->allocator, payload);
1865+
1866+
aws_s3_meta_request_prepare_request(request->meta_request,
1867+
request, payload->callback, payload->user_data);
1868+
return;
1869+
}
1870+
1871+
void s_acquire_mem_and_prepare_request(struct aws_s3_client *client,
1872+
struct aws_s3_request *request,
1873+
aws_s3_meta_request_prepare_request_callback_fn *callback,
1874+
void *user_data) {
1875+
1876+
if (request->ticket == NULL && request->should_allocate_buffer_from_pool) {
1877+
struct aws_allocator *allocator = request->allocator;
1878+
struct aws_s3_meta_request *meta_request = request->meta_request;
1879+
struct aws_s3_buffer_pool_reserve_meta meta = {
1880+
.client = client, .meta_request = meta_request, .size = meta_request->part_size};
1881+
1882+
struct aws_s3_reserve_memory_payload *payload = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_reserve_memory_payload));
1883+
1884+
payload->allocator = allocator;
1885+
payload->request = request;
1886+
payload->callback = callback;
1887+
payload->user_data = user_data;
1888+
payload->buffer_future = aws_s3_buffer_pool_reserve(request->meta_request->client->buffer_pool, meta);
1889+
1890+
aws_future_s3_buffer_ticket_register_callback(payload->buffer_future, s_on_pool_buffer_reserved, payload);
1891+
return;
1892+
}
1893+
1894+
aws_s3_meta_request_prepare_request(request->meta_request, request, callback, user_data);
1895+
}
1896+
17941897
void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
17951898
AWS_PRECONDITION(client);
17961899

@@ -1835,9 +1938,6 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
18351938
struct aws_s3_request *request = NULL;
18361939

18371940
/* Try to grab the next request from the meta request. */
1838-
/* TODO: should we bail out if request fails to update due to mem or
1839-
* continue going and hopping that following reqs can fit into mem?
1840-
* check if avail space is at least part size? */
18411941
bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);
18421942

18431943
if (work_remaining) {
@@ -1856,8 +1956,8 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
18561956
num_requests_in_flight =
18571957
(uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;
18581958

1859-
aws_s3_meta_request_prepare_request(
1860-
meta_request, request, s_s3_client_prepare_callback_queue_request, client);
1959+
s_acquire_mem_and_prepare_request(
1960+
client, request, s_s3_client_prepare_callback_queue_request, client);
18611961
}
18621962
} else {
18631963
s_s3_client_remove_meta_request_threaded(client, meta_request);

source/s3_meta_request.c

Lines changed: 10 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,6 @@ static void s_s3_prepare_request_payload_callback_and_destroy(
634634
payload->callback(meta_request, payload->request, error_code, payload->user_data);
635635
}
636636

637-
aws_future_s3_buffer_ticket_release(payload->async_buffer_reserve);
638637
aws_future_void_release(payload->asyncstep_prepare_request);
639638
aws_mem_release(payload->allocator, payload);
640639
}
@@ -697,67 +696,12 @@ void aws_s3_meta_request_schedule_prepare_request_default_impl(
697696
/* To support reading in parallel, schedule task on any I/O thread in the streaming elg.
698697
* Otherwise, we wouldn't get any parallelism. */
699698
struct aws_event_loop *loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg);
700-
payload->event_loop = loop;
701699
aws_event_loop_schedule_task_now(loop, &payload->task);
702700
} else {
703-
payload->event_loop = meta_request->io_event_loop;
704701
aws_event_loop_schedule_task_now(meta_request->io_event_loop, &payload->task);
705702
}
706703
}
707704

708-
static void s_kick_off_prepare_request(struct aws_s3_prepare_request_payload *payload) {
709-
struct aws_s3_request *request = payload->request;
710-
AWS_PRECONDITION(request);
711-
712-
struct aws_s3_meta_request *meta_request = request->meta_request;
713-
AWS_PRECONDITION(meta_request);
714-
715-
const struct aws_s3_meta_request_vtable *vtable = meta_request->vtable;
716-
AWS_PRECONDITION(vtable);
717-
718-
if (!request->always_send && aws_s3_meta_request_has_finish_result(meta_request)) {
719-
s_s3_prepare_request_payload_callback_and_destroy(payload, AWS_ERROR_S3_CANCELED);
720-
return;
721-
}
722-
723-
/* Kick off the async vtable->prepare_request()
724-
* Each subclass has its own implementation of this. */
725-
payload->asyncstep_prepare_request = vtable->prepare_request(request);
726-
aws_future_void_register_callback(
727-
payload->asyncstep_prepare_request, s_s3_meta_request_on_request_prepared, payload);
728-
return;
729-
}
730-
731-
static void s_on_pool_buffer_reserved(void *user_data) {
732-
struct aws_s3_prepare_request_payload *payload = user_data;
733-
AWS_PRECONDITION(payload);
734-
735-
struct aws_s3_request *request = payload->request;
736-
AWS_PRECONDITION(request);
737-
738-
struct aws_s3_meta_request *meta_request = request->meta_request;
739-
AWS_PRECONDITION(meta_request);
740-
741-
struct aws_future_s3_buffer_ticket *future_ticket = payload->async_buffer_reserve;
742-
743-
int error_code = aws_future_s3_buffer_ticket_get_error(future_ticket);
744-
if (error_code != AWS_ERROR_SUCCESS) {
745-
AWS_LOGF_ERROR(
746-
AWS_LS_S3_META_REQUEST,
747-
"id=%p Could not allocate buffer for request with tag %d for the meta request.",
748-
(void *)meta_request,
749-
request->request_tag);
750-
751-
s_s3_prepare_request_payload_callback_and_destroy(payload, AWS_ERROR_S3_BUFFER_ALLOCATION_FAILED);
752-
return;
753-
}
754-
755-
request->ticket = aws_future_s3_buffer_ticket_get_result_by_move(future_ticket);
756-
757-
s_kick_off_prepare_request(payload);
758-
return;
759-
}
760-
761705
static void s_s3_meta_request_prepare_request_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
762706
(void)task;
763707
(void)task_status;
@@ -771,27 +715,22 @@ static void s_s3_meta_request_prepare_request_task(struct aws_task *task, void *
771715
struct aws_s3_meta_request *meta_request = request->meta_request;
772716
AWS_PRECONDITION(meta_request);
773717

718+
const struct aws_s3_meta_request_vtable *vtable = meta_request->vtable;
719+
AWS_PRECONDITION(vtable);
720+
774721
/* Client owns this event loop group. A cancel should not be possible. */
775722
AWS_ASSERT(task_status == AWS_TASK_STATUS_RUN_READY);
776723

777-
/**
778-
* TODO: is this always safe?
779-
* can we get into situation where we are waiting on mem on a req, which cannot be allocated until something else
780-
* finishes? i.e. you allocated mem for parts 2..n for get, but they cannot be delivered until part 1 finishes, but
781-
* part 1 is waiting on mem allocation?
782-
*/
783-
if (request->ticket == NULL && request->should_allocate_buffer_from_pool) {
784-
struct aws_s3_buffer_pool_reserve_meta meta = {
785-
.client = meta_request->client, .meta_request = meta_request, .size = meta_request->part_size};
786-
787-
payload->async_buffer_reserve = aws_s3_buffer_pool_reserve(meta_request->client->buffer_pool, meta);
788-
789-
aws_future_s3_buffer_ticket_register_event_loop_callback(
790-
payload->async_buffer_reserve, payload->event_loop, s_on_pool_buffer_reserved, payload);
724+
if (!request->always_send && aws_s3_meta_request_has_finish_result(meta_request)) {
725+
s_s3_prepare_request_payload_callback_and_destroy(payload, AWS_ERROR_S3_CANCELED);
791726
return;
792727
}
793728

794-
s_kick_off_prepare_request(payload);
729+
/* Kick off the async vtable->prepare_request()
730+
* Each subclass has its own implementation of this. */
731+
payload->asyncstep_prepare_request = vtable->prepare_request(request);
732+
aws_future_void_register_callback(
733+
payload->asyncstep_prepare_request, s_s3_meta_request_on_request_prepared, payload);
795734
return;
796735
}
797736

0 commit comments

Comments
 (0)