Skip to content

Commit a109fa6

Browse files
authored
Delivery exact bytes for read window (#600)
1 parent 372ffea commit a109fa6

File tree

4 files changed

+104
-25
lines changed

4 files changed

+104
-25
lines changed

include/aws/s3/private/s3_meta_request_impl.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ struct aws_s3_meta_request_event {
6666
/* data for AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY */
6767
struct {
6868
struct aws_s3_request *completed_request;
69+
size_t bytes_delivered;
6970
} response_body;
7071

7172
/* data for AWS_S3_META_REQUEST_EVENT_PROGRESS */
@@ -224,12 +225,15 @@ struct aws_s3_meta_request {
224225

225226
/* Task for delivering events on the meta-request's io_event_loop thread.
226227
* We do this to ensure a meta-request's callbacks are fired sequentially and non-overlapping.
227-
* If `event_delivery_array` has items in it, then this task is scheduled.
228+
* If `event_delivery_task_scheduled` is true, then this task is scheduled.
228229
* If `event_delivery_active` is true, then this task is actively running.
229230
* Delivery is not 100% complete until `event_delivery_array` is empty AND `event_delivery_active` is false
230231
* (use aws_s3_meta_request_are_events_out_for_delivery_synced() to check) */
231232
struct aws_task event_delivery_task;
232233

234+
/* Whether or not event delivery is currently scheduled. */
235+
uint32_t event_delivery_task_scheduled : 1;
236+
233237
/* Array of `struct aws_s3_meta_request_event` to deliver when the `event_delivery_task` runs. */
234238
struct aws_array_list event_delivery_array;
235239

@@ -292,6 +296,10 @@ struct aws_s3_meta_request {
292296

293297
/* The range start for the next response body delivery */
294298
uint64_t next_deliver_range_start;
299+
300+
/* Total number of bytes that have been attempted to be delivered. (Will equal the sum of succeeded and
301+
* failed.)*/
302+
uint64_t num_bytes_delivery_completed;
295303
} io_threaded_data;
296304

297305
const bool should_compute_content_md5;

include/aws/s3/s3_client.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,8 @@ struct aws_s3_client_config {
606606
*
607607
* WARNING: This feature is experimental.
608608
* Currently, backpressure is only applied to GetObject requests which are split into multiple parts,
609-
* and you may still receive some data after the window reaches 0.
609+
* - If you set body_callback, no more data will be delivered once the window reaches 0.
610+
* - If you set body_callback_ex, you may still receive some data after the window reaches 0. TODO: fix it.
610611
*/
611612
bool enable_read_backpressure;
612613

@@ -1220,7 +1221,8 @@ struct aws_future_void *aws_s3_meta_request_write(
12201221
*
12211222
* WARNING: This feature is experimental.
12221223
* Currently, backpressure is only applied to GetObject requests which are split into multiple parts,
1223-
* and you may still receive some data after the window reaches 0.
1224+
* - If you set body_callback, no more data will be delivered once the window reaches 0.
1225+
* - If you set body_callback_ex, you may still receive some data after the window reaches 0. TODO: fix it.
12241226
*/
12251227
AWS_S3_API
12261228
void aws_s3_meta_request_increment_read_window(struct aws_s3_meta_request *meta_request, uint64_t bytes);

source/s3_meta_request.c

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ void aws_s3_meta_request_increment_read_window(struct aws_s3_meta_request *meta_
412412
/* Response will never approach UINT64_MAX, so do a saturating sum instead of worrying about overflow */
413413
meta_request->synced_data.read_window_running_total =
414414
aws_add_u64_saturating(bytes, meta_request->synced_data.read_window_running_total);
415+
/* Try to schedule the delivery task again. */
416+
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, NULL);
415417

416418
aws_s3_meta_request_unlock_synced_data(meta_request);
417419
/* END CRITICAL SECTION */
@@ -1882,12 +1884,13 @@ void aws_s3_meta_request_add_event_for_delivery_synced(
18821884
const struct aws_s3_meta_request_event *event) {
18831885

18841886
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
1887+
if (event) {
1888+
aws_array_list_push_back(&meta_request->synced_data.event_delivery_array, event);
1889+
}
18851890

1886-
aws_array_list_push_back(&meta_request->synced_data.event_delivery_array, event);
1887-
1888-
/* If the array was empty before, schedule task to deliver all events in the array.
1889-
* If the array already had things in it, then the task is already scheduled and will run soon. */
1890-
if (aws_array_list_length(&meta_request->synced_data.event_delivery_array) == 1) {
1891+
/* If the event delivery task is not scheduled before, and there are more to be delivered. */
1892+
if (!meta_request->synced_data.event_delivery_task_scheduled &&
1893+
aws_array_list_length(&meta_request->synced_data.event_delivery_array) > 0) {
18911894
aws_s3_meta_request_acquire(meta_request);
18921895

18931896
aws_task_init(
@@ -1896,6 +1899,7 @@ void aws_s3_meta_request_add_event_for_delivery_synced(
18961899
meta_request,
18971900
"s3_meta_request_event_delivery");
18981901
aws_event_loop_schedule_task_now(meta_request->io_event_loop, &meta_request->synced_data.event_delivery_task);
1902+
meta_request->synced_data.event_delivery_task_scheduled = true;
18991903
}
19001904
}
19011905

@@ -1976,24 +1980,37 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
19761980
struct aws_array_list *event_delivery_array = &meta_request->io_threaded_data.event_delivery_array;
19771981
AWS_FATAL_ASSERT(aws_array_list_length(event_delivery_array) == 0);
19781982

1983+
struct aws_array_list incomplete_deliver_events_array;
1984+
aws_array_list_init_dynamic(
1985+
&incomplete_deliver_events_array, meta_request->allocator, 1, sizeof(struct aws_s3_meta_request_event));
1986+
19791987
/* If an error occurs, don't fire callbacks anymore. */
19801988
int error_code = AWS_ERROR_SUCCESS;
19811989
uint32_t num_parts_delivered = 0;
1990+
uint64_t bytes_allowed_to_deliver = 0;
1991+
uint64_t read_window_to_increment = 0;
19821992

19831993
/* BEGIN CRITICAL SECTION */
19841994
{
19851995
aws_s3_meta_request_lock_synced_data(meta_request);
19861996

19871997
aws_array_list_swap_contents(event_delivery_array, &meta_request->synced_data.event_delivery_array);
19881998
meta_request->synced_data.event_delivery_active = true;
1999+
meta_request->synced_data.event_delivery_task_scheduled = false;
19892000

19902001
if (aws_s3_meta_request_has_finish_result_synced(meta_request)) {
19912002
error_code = AWS_ERROR_S3_CANCELED;
19922003
}
19932004

2005+
bytes_allowed_to_deliver = meta_request->synced_data.read_window_running_total -
2006+
meta_request->io_threaded_data.num_bytes_delivery_completed;
2007+
19942008
aws_s3_meta_request_unlock_synced_data(meta_request);
19952009
}
19962010
/* END CRITICAL SECTION */
2011+
if (bytes_allowed_to_deliver > SIZE_MAX) {
2012+
bytes_allowed_to_deliver = SIZE_MAX;
2013+
}
19972014

19982015
/* Deliver all events */
19992016
for (size_t event_i = 0; event_i < aws_array_list_length(event_delivery_array); ++event_i) {
@@ -2003,15 +2020,44 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20032020

20042021
case AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY: {
20052022
struct aws_s3_request *request = event.u.response_body.completed_request;
2023+
size_t bytes_delivered_for_request = event.u.response_body.bytes_delivered;
20062024
AWS_ASSERT(meta_request == request->meta_request);
2025+
bool delivery_incomplete = false;
20072026
struct aws_byte_cursor response_body = aws_byte_cursor_from_buf(&request->send_data.response_body);
2027+
if (response_body.len == 0) {
2028+
/* Nothing to delivery, finish this delivery event and break out. */
2029+
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);
2030+
2031+
++num_parts_delivered;
2032+
request->send_data.metrics =
2033+
s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request);
2034+
2035+
aws_s3_request_release(request);
2036+
break;
2037+
}
2038+
2039+
if (meta_request->body_callback && meta_request->client->enable_read_backpressure) {
2040+
/* If customer set the body callback, make sure we are not delivery them more than asked via the
2041+
* callback. */
2042+
aws_byte_cursor_advance(&response_body, bytes_delivered_for_request);
2043+
if (response_body.len > (size_t)bytes_allowed_to_deliver) {
2044+
response_body.len = (size_t)bytes_allowed_to_deliver;
2045+
delivery_incomplete = true;
2046+
}
2047+
/* Update the remaining bytes we allow to delivery. */
2048+
bytes_allowed_to_deliver -= response_body.len;
2049+
} else {
2050+
/* We should not have any incomplete delivery in this case. */
2051+
AWS_FATAL_ASSERT(bytes_delivered_for_request == 0);
2052+
}
2053+
uint64_t delivery_range_start = request->part_range_start + bytes_delivered_for_request;
20082054

20092055
AWS_ASSERT(request->part_number >= 1);
20102056
if (request->part_number == 1) {
2011-
meta_request->io_threaded_data.next_deliver_range_start = request->part_range_start;
2057+
meta_request->io_threaded_data.next_deliver_range_start = delivery_range_start;
20122058
}
20132059
/* Make sure the response body is delivered in the sequential order */
2014-
AWS_FATAL_ASSERT(request->part_range_start == meta_request->io_threaded_data.next_deliver_range_start);
2060+
AWS_FATAL_ASSERT(delivery_range_start == meta_request->io_threaded_data.next_deliver_range_start);
20152061
meta_request->io_threaded_data.next_deliver_range_start += response_body.len;
20162062

20172063
if (error_code == AWS_ERROR_SUCCESS && response_body.len > 0) {
@@ -2047,15 +2093,15 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20472093
aws_error_name(error_code));
20482094
}
20492095
if (meta_request->client->enable_read_backpressure) {
2050-
aws_s3_meta_request_increment_read_window(meta_request, response_body.len);
2096+
read_window_to_increment += response_body.len;
20512097
}
20522098
} else if (
20532099
meta_request->body_callback_ex != NULL &&
20542100
meta_request->body_callback_ex(
20552101
meta_request,
20562102
&response_body,
20572103
(struct aws_s3_meta_request_receive_body_extra_info){
2058-
.range_start = request->part_range_start, .ticket = request->ticket},
2104+
.range_start = delivery_range_start, .ticket = request->ticket},
20592105
meta_request->user_data)) {
20602106
error_code = aws_last_error_or_unknown();
20612107
AWS_LOGF_ERROR(
@@ -2067,7 +2113,7 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20672113
} else if (
20682114
meta_request->body_callback != NULL &&
20692115
meta_request->body_callback(
2070-
meta_request, &response_body, request->part_range_start, meta_request->user_data)) {
2116+
meta_request, &response_body, delivery_range_start, meta_request->user_data)) {
20712117

20722118
error_code = aws_last_error_or_unknown();
20732119
AWS_LOGF_ERROR(
@@ -2087,13 +2133,25 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20872133
}
20882134
}
20892135
}
2090-
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);
2136+
event.u.response_body.bytes_delivered += response_body.len;
2137+
meta_request->io_threaded_data.num_bytes_delivery_completed += response_body.len;
20912138

2092-
++num_parts_delivered;
2093-
request->send_data.metrics =
2094-
s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request);
2139+
if (!delivery_incomplete || error_code != AWS_ERROR_SUCCESS) {
2140+
/* We completed the delivery for this request. */
2141+
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);
20952142

2096-
aws_s3_request_release(request);
2143+
++num_parts_delivered;
2144+
request->send_data.metrics =
2145+
s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request);
2146+
2147+
aws_s3_request_release(request);
2148+
} else {
2149+
/* We didn't complete the delivery for this request and no error happened */
2150+
/* Push to the front of the queue and wait for the next tick to deliver the rest of the bytes. */
2151+
/* Note: we push to the front of the array since when we move those incomplete events back to the
2152+
* synced_queue, we need to make sure it still has the same order. */
2153+
aws_array_list_push_front(&incomplete_deliver_events_array, &event);
2154+
}
20972155
} break;
20982156

20992157
case AWS_S3_META_REQUEST_EVENT_PROGRESS: {
@@ -2149,12 +2207,21 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
21492207
if (error_code != AWS_ERROR_SUCCESS) {
21502208
aws_s3_meta_request_set_fail_synced(meta_request, NULL, error_code);
21512209
}
2210+
/* Push the iocompileted events back to the queue */
2211+
for (size_t i = 0; i < aws_array_list_length(&incomplete_deliver_events_array); ++i) {
2212+
struct aws_s3_meta_request_event event;
2213+
aws_array_list_get_at(&incomplete_deliver_events_array, &event, i);
2214+
/* Push the incomplete one to the front of the queue. */
2215+
aws_array_list_push_front(&meta_request->synced_data.event_delivery_array, &event);
2216+
}
21522217

21532218
meta_request->synced_data.num_parts_delivery_completed += num_parts_delivered;
21542219
meta_request->synced_data.event_delivery_active = false;
21552220
aws_s3_meta_request_unlock_synced_data(meta_request);
21562221
}
21572222
/* END CRITICAL SECTION */
2223+
aws_s3_meta_request_increment_read_window(meta_request, read_window_to_increment);
2224+
aws_array_list_clean_up(&incomplete_deliver_events_array);
21582225

21592226
aws_s3_client_schedule_process_work(client);
21602227
aws_s3_meta_request_release(meta_request);

tests/s3_data_plane_tests.c

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,6 +1726,7 @@ static int s_apply_backpressure_until_meta_request_finish(
17261726
size_t part_size,
17271727
size_t window_initial_size,
17281728
uint64_t window_increment_size) {
1729+
(void)part_size;
17291730

17301731
/* Remember the last time something happened (we received download data, or incremented read window) */
17311732
uint64_t last_time_something_happened;
@@ -1753,12 +1754,13 @@ static int s_apply_backpressure_until_meta_request_finish(
17531754
size_t received_body_size_delta = aws_atomic_exchange_int(&test_results->received_body_size_delta, 0);
17541755
accumulated_data_size += (uint64_t)received_body_size_delta;
17551756

1756-
/* Check that we haven't received more data than the window allows.
1757-
* TODO: Stop allowing "hacky wiggle room". The current implementation
1758-
* may push more bytes to the user (up to 1 part) than they've asked for. */
1759-
uint64_t hacky_wiggle_room = part_size;
1760-
uint64_t max_data_allowed = accumulated_window_increments + hacky_wiggle_room;
1761-
ASSERT_TRUE(accumulated_data_size <= max_data_allowed, "Received more data than the read window allows");
1757+
/* Check that we haven't received more data than the window allows */
1758+
uint64_t max_data_allowed = accumulated_window_increments;
1759+
ASSERT_TRUE(
1760+
accumulated_data_size <= max_data_allowed,
1761+
"Received more data than the read window allows accumulated_data_size: %zu, max_data_allowed: %zu",
1762+
(size_t)accumulated_data_size,
1763+
(size_t)max_data_allowed);
17621764

17631765
/* If we're done, we're done */
17641766
if (done) {
@@ -1891,7 +1893,7 @@ static int s_test_s3_get_object_backpressure_small_increments(struct aws_allocat
18911893
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
18921894
size_t part_size = file_size / 4;
18931895
size_t window_initial_size = 1024;
1894-
uint64_t window_increment_size = part_size / 2;
1896+
uint64_t window_increment_size = part_size / 4;
18951897
return s_test_s3_get_object_backpressure_helper(
18961898
allocator, part_size, window_initial_size, window_increment_size, false);
18971899
}

0 commit comments

Comments
 (0)