Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ struct aws_future_void *aws_s3_meta_request_write(
* no backpressure is being applied and data is being downloaded as fast as possible.
*
* WARNING: This feature is experimental.
* Currently, backpressure is only applied to GetObject requests which are split into multiple parts,
* Currently, backpressure is applied to GetObject requests,
* - If you set body_callback, no more data will be delivered once the window reaches 0.
* - If you set body_callback_ex, you may still receive some data after the window reaches 0. TODO: fix it.
*/
Expand Down
22 changes: 19 additions & 3 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,23 @@ static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics
return NULL;
}

static bool s_apply_backpressure(struct aws_s3_request *request) {
struct aws_s3_meta_request *meta_request = request->meta_request;
if (!meta_request->body_callback) {
return false;
}
if (!meta_request->client->enable_read_backpressure) {
return false;
}
if (meta_request->type == AWS_S3_META_REQUEST_TYPE_GET_OBJECT) {
return true;
}
if (aws_string_eq_c_str(request->operation_name, "GetObject")) {
return true;
}
return false;
}

/* Deliver events in event_delivery_array.
* This task runs on the meta-request's io_event_loop thread. */
static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
Expand Down Expand Up @@ -2059,9 +2076,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
break;
}

if (meta_request->body_callback && meta_request->client->enable_read_backpressure) {
/* If customer set the body callback, make sure we are not delivery them more than asked via the
* callback. */
if (s_apply_backpressure(request)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (s_apply_backpressure(request)) {
if (s_should_apply_backpressure(request)) {

/* Apply backpressure for the request, only deliver the bytes that allowed to deliver. */
aws_byte_cursor_advance(&response_body, bytes_delivered_for_request);
if (response_body.len > (size_t)bytes_allowed_to_deliver) {
response_body.len = (size_t)bytes_allowed_to_deliver;
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ add_net_test_case(test_s3_get_object_backpressure_initial_size_zero)
add_net_test_case(test_s3_get_object_backpressure_small_increments_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_big_increments_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_initial_size_zero_recv_filepath)
add_net_test_case(test_s3_get_object_backpressure_small_increments_s3express)
add_net_test_case(test_s3_get_object_backpressure_big_increments_s3express)
add_net_test_case(test_s3_get_object_backpressure_initial_size_s3express)
add_net_test_case(test_s3_get_object_part)
add_net_test_case(test_s3_no_signing)
add_net_test_case(test_s3_signing_override)
Expand Down
76 changes: 66 additions & 10 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1803,14 +1803,16 @@ static int s_test_s3_get_object_backpressure_helper(
size_t part_size,
size_t window_initial_size,
uint64_t window_increment_size,
bool file_on_disk) {
bool file_on_disk,
bool s3express) {

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_client_config client_config = {
.part_size = part_size,
.enable_read_backpressure = true,
.enable_s3express = s3express,
.initial_read_window = window_initial_size,
};

Expand All @@ -1822,15 +1824,25 @@ static int s_test_s3_get_object_backpressure_helper(

struct aws_string *host_name =
aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region);

struct aws_byte_cursor host_cursor = aws_byte_cursor_from_string(host_name);
if (s3express) {
host_cursor = g_test_s3express_bucket_usw2_az1_endpoint;
}
/* Put together a simple S3 Get Object request. */
struct aws_http_message *message = aws_s3_test_get_object_request_new(
allocator, aws_byte_cursor_from_string(host_name), g_pre_existing_object_1MB);
struct aws_http_message *message =
aws_s3_test_get_object_request_new(allocator, host_cursor, g_pre_existing_object_1MB);

struct aws_signing_config_aws s3express_signing_config = {
.algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS,
.service = g_s3express_service_name,
};
struct aws_s3_meta_request_options options = {
.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
.message = message,
};
if (s3express) {
options.signing_config = &s3express_signing_config;
}
struct aws_string *filepath_str = NULL;
if (file_on_disk) {
filepath_str = aws_s3_tester_create_file(allocator, g_pre_existing_object_1MB, NULL);
Expand Down Expand Up @@ -1895,7 +1907,7 @@ static int s_test_s3_get_object_backpressure_small_increments(struct aws_allocat
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size / 4;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false);
allocator, part_size, window_initial_size, window_increment_size, false, false);
}

AWS_TEST_CASE(test_s3_get_object_backpressure_big_increments, s_test_s3_get_object_backpressure_big_increments)
Expand All @@ -1908,7 +1920,7 @@ static int s_test_s3_get_object_backpressure_big_increments(struct aws_allocator
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size * 3;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false);
allocator, part_size, window_initial_size, window_increment_size, false, false);
}

AWS_TEST_CASE(test_s3_get_object_backpressure_initial_size_zero, s_test_s3_get_object_backpressure_initial_size_zero)
Expand All @@ -1920,7 +1932,7 @@ static int s_test_s3_get_object_backpressure_initial_size_zero(struct aws_alloca
size_t window_initial_size = 0;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false);
allocator, part_size, window_initial_size, window_increment_size, false, false);
}

AWS_TEST_CASE(
Expand All @@ -1937,7 +1949,7 @@ static int s_test_s3_get_object_backpressure_small_increments_recv_filepath(
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, true);
allocator, part_size, window_initial_size, window_increment_size, true, false);
}

AWS_TEST_CASE(
Expand All @@ -1952,7 +1964,7 @@ static int s_test_s3_get_object_backpressure_big_increments_recv_filepath(struct
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size * 3;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, true);
allocator, part_size, window_initial_size, window_increment_size, true, false);
}

AWS_TEST_CASE(
Expand All @@ -1968,7 +1980,51 @@ static int s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath(
size_t window_initial_size = 0;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, true);
allocator, part_size, window_initial_size, window_increment_size, true, false);
}

AWS_TEST_CASE(
test_s3_get_object_backpressure_small_increments_s3express,
s_test_s3_get_object_backpressure_small_increments_s3express)
static int s_test_s3_get_object_backpressure_small_increments_s3express(struct aws_allocator *allocator, void *ctx) {
/* Test increments smaller than part-size with S3 Express.
* Only 1 part at a time should be in flight */
(void)ctx;
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
size_t part_size = file_size / 4;
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size / 4;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false, true);
}

AWS_TEST_CASE(
test_s3_get_object_backpressure_big_increments_s3express,
s_test_s3_get_object_backpressure_big_increments_s3express)
static int s_test_s3_get_object_backpressure_big_increments_s3express(struct aws_allocator *allocator, void *ctx) {
/* Test increments larger than part-size with S3 Express.
* Multiple parts should be in flight at a time */
(void)ctx;
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
size_t part_size = file_size / 8;
size_t window_initial_size = 1024;
uint64_t window_increment_size = part_size * 3;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false, true);
}

AWS_TEST_CASE(
test_s3_get_object_backpressure_initial_size_s3express,
s_test_s3_get_object_backpressure_initial_size_s3express)
static int s_test_s3_get_object_backpressure_initial_size_s3express(struct aws_allocator *allocator, void *ctx) {
/* Test with initial window size of zero with S3 Express */
(void)ctx;
size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */
size_t part_size = file_size / 4;
size_t window_initial_size = 0;
uint64_t window_increment_size = part_size / 2;
return s_test_s3_get_object_backpressure_helper(
allocator, part_size, window_initial_size, window_increment_size, false, true);
}

AWS_TEST_CASE(test_s3_get_object_part, s_test_s3_get_object_part)
Expand Down
2 changes: 2 additions & 0 deletions tests/test_helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ python3 test_helper.py clean
+ Add the lifecycle to automatic clean up the `upload/` and clean up incomplete multipart uploads after one day.
+ Upload files:
- `pre-existing-10MB` 10MB file.
- `pre-existing-1MB` 1MB file.
- with `--large_objects` enabled
- `pre-existing-2GB`

* Create directory bucket `<BUCKET_NAME>--use1-az4--x-s3` in us-east-1
+ Add the lifecycle to automatic clean up the `upload/` and clean up incomplete multipart uploads after one day.
+ Upload files:
- `pre-existing-10MB` 10MB file.
- `pre-existing-1MB` 1MB file.
- with `--large_objects` enabled
- `pre-existing-2GB`

Expand Down
9 changes: 6 additions & 3 deletions tests/test_helper/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ def create_bucket_with_lifecycle(availability_zone=None, client=s3_client, regio

put_pre_existing_objects(
10*MB, 'pre-existing-10MB', bucket=bucket_name, client=client)
put_pre_existing_objects(
1*MB, 'pre-existing-1MB', bucket=bucket_name, client=client)

if args.large_objects:
put_pre_existing_objects(
Expand All @@ -201,8 +203,6 @@ def create_bucket_with_lifecycle(availability_zone=None, client=s3_client, regio
10*MB, 'pre-existing-10MB-aes256', sse='aes256', bucket=bucket_name)
put_pre_existing_objects(
10*MB, 'pre-existing-10MB-kms', sse='kms', bucket=bucket_name)
put_pre_existing_objects(
1*MB, 'pre-existing-1MB', bucket=bucket_name)
put_pre_existing_objects(
1*MB, 'pre-existing-1MB-@', bucket=bucket_name)
put_pre_existing_objects(
Expand Down Expand Up @@ -244,11 +244,14 @@ def cleanup(bucket_name, availability_zone=None, client=s3_client):

print(f"s3://{bucket_name}/* - Listing objects...")
try:
objects = client.list_objects_v2(Bucket=bucket_name)["Contents"]
response = client.list_objects_v2(Bucket=bucket_name)
# 'Contents' key only exists when there are objects in the bucket
objects = response.get("Contents", [])
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
print(f"s3://{bucket_name} - Did not exist. Moving on...")
return
raise
objects = list(map(lambda x: {"Key": x["Key"]}, objects))
if objects:
print(f"s3://{bucket_name}/* - Deleting {len(objects)} objects...")
Expand Down
Loading