Skip to content

Commit e9d1bde

Browse files
authored
[fix]S3express backpressure (#612)
1 parent 3a2d31a commit e9d1bde

File tree

6 files changed

+105
-19
lines changed

6 files changed

+105
-19
lines changed

include/aws/s3/s3_client.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,8 +601,11 @@ struct aws_s3_client_config {
601601
* If true, each meta request has a flow-control window that shrinks as
602602
* response body data is downloaded (headers do not affect the window).
603603
* `initial_read_window` determines the starting size of each meta request's window.
604-
* You will stop downloading data whenever the flow-control window reaches 0
605-
* You must call aws_s3_meta_request_increment_read_window() to keep data flowing.
604+
*
605+
* - You will stop receiving response body data whenever the flow-control window reaches 0
606+
* - If the window size remaining is smaller than the part download, client will buffer the part until
607+
* the window opens up to delivery the full part.
608+
* - You must call aws_s3_meta_request_increment_read_window() to keep data flowing.
606609
*
607610
* WARNING: This feature is experimental.
608611
* Currently, backpressure is only applied to GetObject requests which are split into multiple parts,
@@ -1220,7 +1223,7 @@ struct aws_future_void *aws_s3_meta_request_write(
12201223
* no backpressure is being applied and data is being downloaded as fast as possible.
12211224
*
12221225
* WARNING: This feature is experimental.
1223-
* Currently, backpressure is only applied to GetObject requests which are split into multiple parts,
1226+
* Currently, backpressure is applied to GetObject requests,
12241227
* - If you set body_callback, no more data will be delivered once the window reaches 0.
12251228
* - If you set body_callback_ex, you may still receive some data after the window reaches 0. TODO: fix it.
12261229
*/

source/s3_meta_request.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,6 +1985,26 @@ static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics
19851985
return NULL;
19861986
}
19871987

1988+
static bool s_should_apply_backpressure(struct aws_s3_request *request) {
1989+
struct aws_s3_meta_request *meta_request = request->meta_request;
1990+
if (!meta_request->client->enable_read_backpressure) {
1991+
/* Backpressure is disabled. */
1992+
return false;
1993+
}
1994+
if (!meta_request->body_callback) {
1995+
/* No callback to deliver the body, don't apply backpressure */
1996+
return false;
1997+
}
1998+
/* Apply backpressure only for GetObject request */
1999+
if (meta_request->type == AWS_S3_META_REQUEST_TYPE_GET_OBJECT) {
2000+
return true;
2001+
}
2002+
if (aws_string_eq_c_str(request->operation_name, "GetObject")) {
2003+
return true;
2004+
}
2005+
return false;
2006+
}
2007+
19882008
/* Deliver events in event_delivery_array.
19892009
* This task runs on the meta-request's io_event_loop thread. */
19902010
static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
@@ -2059,9 +2079,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20592079
break;
20602080
}
20612081

2062-
if (meta_request->body_callback && meta_request->client->enable_read_backpressure) {
2063-
/* If customer set the body callback, make sure we are not delivery them more than asked via the
2064-
* callback. */
2082+
if (s_should_apply_backpressure(request)) {
2083+
/* Apply backpressure for the request, only deliver the bytes that allowed to deliver. */
20652084
aws_byte_cursor_advance(&response_body, bytes_delivered_for_request);
20662085
if (response_body.len > (size_t)bytes_allowed_to_deliver) {
20672086
response_body.len = (size_t)bytes_allowed_to_deliver;

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ add_net_test_case(test_s3_get_object_backpressure_initial_size_zero)
8989
add_net_test_case(test_s3_get_object_backpressure_small_increments_recv_filepath)
9090
add_net_test_case(test_s3_get_object_backpressure_big_increments_recv_filepath)
9191
add_net_test_case(test_s3_get_object_backpressure_initial_size_zero_recv_filepath)
92+
add_net_test_case(test_s3_get_object_backpressure_small_increments_s3express)
93+
add_net_test_case(test_s3_get_object_backpressure_big_increments_s3express)
94+
add_net_test_case(test_s3_get_object_backpressure_initial_size_s3express)
9295
add_net_test_case(test_s3_get_object_part)
9396
add_net_test_case(test_s3_no_signing)
9497
add_net_test_case(test_s3_signing_override)

tests/s3_data_plane_tests.c

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1803,14 +1803,16 @@ static int s_test_s3_get_object_backpressure_helper(
18031803
size_t part_size,
18041804
size_t window_initial_size,
18051805
uint64_t window_increment_size,
1806-
bool file_on_disk) {
1806+
bool file_on_disk,
1807+
bool s3express) {
18071808

18081809
struct aws_s3_tester tester;
18091810
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
18101811

18111812
struct aws_s3_client_config client_config = {
18121813
.part_size = part_size,
18131814
.enable_read_backpressure = true,
1815+
.enable_s3express = s3express,
18141816
.initial_read_window = window_initial_size,
18151817
};
18161818

@@ -1822,15 +1824,25 @@ static int s_test_s3_get_object_backpressure_helper(
18221824

18231825
struct aws_string *host_name =
18241826
aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region);
1825-
1827+
struct aws_byte_cursor host_cursor = aws_byte_cursor_from_string(host_name);
1828+
if (s3express) {
1829+
host_cursor = g_test_s3express_bucket_usw2_az1_endpoint;
1830+
}
18261831
/* Put together a simple S3 Get Object request. */
1827-
struct aws_http_message *message = aws_s3_test_get_object_request_new(
1828-
allocator, aws_byte_cursor_from_string(host_name), g_pre_existing_object_1MB);
1832+
struct aws_http_message *message =
1833+
aws_s3_test_get_object_request_new(allocator, host_cursor, g_pre_existing_object_1MB);
18291834

1835+
struct aws_signing_config_aws s3express_signing_config = {
1836+
.algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS,
1837+
.service = g_s3express_service_name,
1838+
};
18301839
struct aws_s3_meta_request_options options = {
18311840
.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
18321841
.message = message,
18331842
};
1843+
if (s3express) {
1844+
options.signing_config = &s3express_signing_config;
1845+
}
18341846
struct aws_string *filepath_str = NULL;
18351847
if (file_on_disk) {
18361848
filepath_str = aws_s3_tester_create_file(allocator, g_pre_existing_object_1MB, NULL);
@@ -1895,7 +1907,7 @@ static int s_test_s3_get_object_backpressure_small_increments(struct aws_allocat
18951907
size_t window_initial_size = 1024;
18961908
uint64_t window_increment_size = part_size / 4;
18971909
return s_test_s3_get_object_backpressure_helper(
1898-
allocator, part_size, window_initial_size, window_increment_size, false);
1910+
allocator, part_size, window_initial_size, window_increment_size, false, false);
18991911
}
19001912

19011913
AWS_TEST_CASE(test_s3_get_object_backpressure_big_increments, s_test_s3_get_object_backpressure_big_increments)
@@ -1908,7 +1920,7 @@ static int s_test_s3_get_object_backpressure_big_increments(struct aws_allocator
19081920
size_t window_initial_size = 1024;
19091921
uint64_t window_increment_size = part_size * 3;
19101922
return s_test_s3_get_object_backpressure_helper(
1911-
allocator, part_size, window_initial_size, window_increment_size, false);
1923+
allocator, part_size, window_initial_size, window_increment_size, false, false);
19121924
}
19131925

19141926
AWS_TEST_CASE(test_s3_get_object_backpressure_initial_size_zero, s_test_s3_get_object_backpressure_initial_size_zero)
@@ -1920,7 +1932,7 @@ static int s_test_s3_get_object_backpressure_initial_size_zero(struct aws_alloca
19201932
size_t window_initial_size = 0;
19211933
uint64_t window_increment_size = part_size / 2;
19221934
return s_test_s3_get_object_backpressure_helper(
1923-
allocator, part_size, window_initial_size, window_increment_size, false);
1935+
allocator, part_size, window_initial_size, window_increment_size, false, false);
19241936
}
19251937

19261938
AWS_TEST_CASE(
@@ -1937,7 +1949,7 @@ static int s_test_s3_get_object_backpressure_small_increments_recv_filepath(
19371949
size_t window_initial_size = 1024;
19381950
uint64_t window_increment_size = part_size / 2;
19391951
return s_test_s3_get_object_backpressure_helper(
1940-
allocator, part_size, window_initial_size, window_increment_size, true);
1952+
allocator, part_size, window_initial_size, window_increment_size, true, false);
19411953
}
19421954

19431955
AWS_TEST_CASE(
@@ -1952,7 +1964,7 @@ static int s_test_s3_get_object_backpressure_big_increments_recv_filepath(struct
19521964
size_t window_initial_size = 1024;
19531965
uint64_t window_increment_size = part_size * 3;
19541966
return s_test_s3_get_object_backpressure_helper(
1955-
allocator, part_size, window_initial_size, window_increment_size, true);
1967+
allocator, part_size, window_initial_size, window_increment_size, true, false);
19561968
}
19571969

19581970
AWS_TEST_CASE(
@@ -1968,7 +1980,51 @@ static int s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath(
19681980
size_t window_initial_size = 0;
19691981
uint64_t window_increment_size = part_size / 2;
19701982
return s_test_s3_get_object_backpressure_helper(
1971-
allocator, part_size, window_initial_size, window_increment_size, true);
1983+
allocator, part_size, window_initial_size, window_increment_size, true, false);
1984+
}
1985+
1986+
AWS_TEST_CASE(
1987+
test_s3_get_object_backpressure_small_increments_s3express,
1988+
s_test_s3_get_object_backpressure_small_increments_s3express)
1989+
static int s_test_s3_get_object_backpressure_small_increments_s3express(struct aws_allocator *allocator, void *ctx) {
1990+
/* Test increments smaller than part-size with S3 Express.
1991+
* Only 1 part at a time should be in flight */
1992+
(void)ctx;
1993+
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
1994+
size_t part_size = file_size / 4;
1995+
size_t window_initial_size = 1024;
1996+
uint64_t window_increment_size = part_size / 4;
1997+
return s_test_s3_get_object_backpressure_helper(
1998+
allocator, part_size, window_initial_size, window_increment_size, false, true);
1999+
}
2000+
2001+
AWS_TEST_CASE(
2002+
test_s3_get_object_backpressure_big_increments_s3express,
2003+
s_test_s3_get_object_backpressure_big_increments_s3express)
2004+
static int s_test_s3_get_object_backpressure_big_increments_s3express(struct aws_allocator *allocator, void *ctx) {
2005+
/* Test increments larger than part-size with S3 Express.
2006+
* Multiple parts should be in flight at a time */
2007+
(void)ctx;
2008+
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
2009+
size_t part_size = file_size / 8;
2010+
size_t window_initial_size = 1024;
2011+
uint64_t window_increment_size = part_size * 3;
2012+
return s_test_s3_get_object_backpressure_helper(
2013+
allocator, part_size, window_initial_size, window_increment_size, false, true);
2014+
}
2015+
2016+
AWS_TEST_CASE(
2017+
test_s3_get_object_backpressure_initial_size_s3express,
2018+
s_test_s3_get_object_backpressure_initial_size_s3express)
2019+
static int s_test_s3_get_object_backpressure_initial_size_s3express(struct aws_allocator *allocator, void *ctx) {
2020+
/* Test with initial window size of zero with S3 Express */
2021+
(void)ctx;
2022+
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
2023+
size_t part_size = file_size / 4;
2024+
size_t window_initial_size = 0;
2025+
uint64_t window_increment_size = part_size / 2;
2026+
return s_test_s3_get_object_backpressure_helper(
2027+
allocator, part_size, window_initial_size, window_increment_size, false, true);
19722028
}
19732029

19742030
AWS_TEST_CASE(test_s3_get_object_part, s_test_s3_get_object_part)

tests/test_helper/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,15 @@ python3 test_helper.py clean
4848
+ Add the lifecycle to automatic clean up the `upload/` and clean up incomplete multipart uploads after one day.
4949
+ Upload files:
5050
- `pre-existing-10MB` 10MB file.
51+
- `pre-existing-1MB` 1MB file.
5152
- with `--large_objects` enabled
5253
- `pre-existing-2GB`
5354

5455
* Create directory bucket `<BUCKET_NAME>--use1-az4--x-s3` in us-east-1
5556
+ Add the lifecycle to automatic clean up the `upload/` and clean up incomplete multipart uploads after one day.
5657
+ Upload files:
5758
- `pre-existing-10MB` 10MB file.
59+
- `pre-existing-1MB` 1MB file.
5860
- with `--large_objects` enabled
5961
- `pre-existing-2GB`
6062

tests/test_helper/test_helper.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ def create_bucket_with_lifecycle(availability_zone=None, client=s3_client, regio
189189

190190
put_pre_existing_objects(
191191
10*MB, 'pre-existing-10MB', bucket=bucket_name, client=client)
192+
put_pre_existing_objects(
193+
1*MB, 'pre-existing-1MB', bucket=bucket_name, client=client)
192194

193195
if args.large_objects:
194196
put_pre_existing_objects(
@@ -201,8 +203,6 @@ def create_bucket_with_lifecycle(availability_zone=None, client=s3_client, regio
201203
10*MB, 'pre-existing-10MB-aes256', sse='aes256', bucket=bucket_name)
202204
put_pre_existing_objects(
203205
10*MB, 'pre-existing-10MB-kms', sse='kms', bucket=bucket_name)
204-
put_pre_existing_objects(
205-
1*MB, 'pre-existing-1MB', bucket=bucket_name)
206206
put_pre_existing_objects(
207207
1*MB, 'pre-existing-1MB-@', bucket=bucket_name)
208208
put_pre_existing_objects(
@@ -244,11 +244,14 @@ def cleanup(bucket_name, availability_zone=None, client=s3_client):
244244

245245
print(f"s3://{bucket_name}/* - Listing objects...")
246246
try:
247-
objects = client.list_objects_v2(Bucket=bucket_name)["Contents"]
247+
response = client.list_objects_v2(Bucket=bucket_name)
248+
# 'Contents' key only exists when there are objects in the bucket
249+
objects = response.get("Contents", [])
248250
except botocore.exceptions.ClientError as e:
249251
if e.response['Error']['Code'] == 'NoSuchBucket':
250252
print(f"s3://{bucket_name} - Did not exist. Moving on...")
251253
return
254+
raise
252255
objects = list(map(lambda x: {"Key": x["Key"]}, objects))
253256
if objects:
254257
print(f"s3://{bucket_name}/* - Deleting {len(objects)} objects...")

0 commit comments

Comments
 (0)