diff --git a/include/aws/s3/s3_client.h b/include/aws/s3/s3_client.h index 7604b2e1b..0c151c5e8 100644 --- a/include/aws/s3/s3_client.h +++ b/include/aws/s3/s3_client.h @@ -601,8 +601,11 @@ struct aws_s3_client_config { * If true, each meta request has a flow-control window that shrinks as * response body data is downloaded (headers do not affect the window). * `initial_read_window` determines the starting size of each meta request's window. - * You will stop downloading data whenever the flow-control window reaches 0 - * You must call aws_s3_meta_request_increment_read_window() to keep data flowing. + * + * - You will stop receiving response body data whenever the flow-control window reaches 0 + * - If the window size remaining is smaller than the part download, client will buffer the part until + * the window opens up to delivery the full part. + * - You must call aws_s3_meta_request_increment_read_window() to keep data flowing. * * WARNING: This feature is experimental. * Currently, backpressure is only applied to GetObject requests which are split into multiple parts, @@ -1220,7 +1223,7 @@ struct aws_future_void *aws_s3_meta_request_write( * no backpressure is being applied and data is being downloaded as fast as possible. * * WARNING: This feature is experimental. - * Currently, backpressure is only applied to GetObject requests which are split into multiple parts, + * Currently, backpressure is applied to GetObject requests, * - If you set body_callback, no more data will be delivered once the window reaches 0. * - If you set body_callback_ex, you may still receive some data after the window reaches 0. TODO: fix it. */ diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 486975f53..39178cc7b 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -1985,6 +1985,26 @@ static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics return NULL; } +static bool s_should_apply_backpressure(struct aws_s3_request *request) { + struct aws_s3_meta_request *meta_request = request->meta_request; + if (!meta_request->client->enable_read_backpressure) { + /* Backpressure is disabled. */ + return false; + } + if (!meta_request->body_callback) { + /* No callback to deliver the body, don't apply backpressure */ + return false; + } + /* Apply backpressure only for GetObject request */ + if (meta_request->type == AWS_S3_META_REQUEST_TYPE_GET_OBJECT) { + return true; + } + if (aws_string_eq_c_str(request->operation_name, "GetObject")) { + return true; + } + return false; +} + /* Deliver events in event_delivery_array. * This task runs on the meta-request's io_event_loop thread. */ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *arg, enum aws_task_status task_status) { @@ -2059,9 +2079,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a break; } - if (meta_request->body_callback && meta_request->client->enable_read_backpressure) { - /* If customer set the body callback, make sure we are not delivery them more than asked via the - * callback. */ + if (s_should_apply_backpressure(request)) { + /* Apply backpressure for the request, only deliver the bytes that allowed to deliver. */ aws_byte_cursor_advance(&response_body, bytes_delivered_for_request); if (response_body.len > (size_t)bytes_allowed_to_deliver) { response_body.len = (size_t)bytes_allowed_to_deliver; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c6e729350..876840e81 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -89,6 +89,9 @@ add_net_test_case(test_s3_get_object_backpressure_initial_size_zero) add_net_test_case(test_s3_get_object_backpressure_small_increments_recv_filepath) add_net_test_case(test_s3_get_object_backpressure_big_increments_recv_filepath) add_net_test_case(test_s3_get_object_backpressure_initial_size_zero_recv_filepath) +add_net_test_case(test_s3_get_object_backpressure_small_increments_s3express) +add_net_test_case(test_s3_get_object_backpressure_big_increments_s3express) +add_net_test_case(test_s3_get_object_backpressure_initial_size_s3express) add_net_test_case(test_s3_get_object_part) add_net_test_case(test_s3_no_signing) add_net_test_case(test_s3_signing_override) diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index d5d96e17e..8323647aa 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -1803,7 +1803,8 @@ static int s_test_s3_get_object_backpressure_helper( size_t part_size, size_t window_initial_size, uint64_t window_increment_size, - bool file_on_disk) { + bool file_on_disk, + bool s3express) { struct aws_s3_tester tester; ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); @@ -1811,6 +1812,7 @@ static int s_test_s3_get_object_backpressure_helper( struct aws_s3_client_config client_config = { .part_size = part_size, .enable_read_backpressure = true, + .enable_s3express = s3express, .initial_read_window = window_initial_size, }; @@ -1822,15 +1824,25 @@ static int s_test_s3_get_object_backpressure_helper( struct aws_string *host_name = aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region); - + struct aws_byte_cursor host_cursor = aws_byte_cursor_from_string(host_name); + if (s3express) { + host_cursor = g_test_s3express_bucket_usw2_az1_endpoint; + } /* Put together a simple S3 Get Object request. */ - struct aws_http_message *message = aws_s3_test_get_object_request_new( - allocator, aws_byte_cursor_from_string(host_name), g_pre_existing_object_1MB); + struct aws_http_message *message = + aws_s3_test_get_object_request_new(allocator, host_cursor, g_pre_existing_object_1MB); + struct aws_signing_config_aws s3express_signing_config = { + .algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS, + .service = g_s3express_service_name, + }; struct aws_s3_meta_request_options options = { .type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT, .message = message, }; + if (s3express) { + options.signing_config = &s3express_signing_config; + } struct aws_string *filepath_str = NULL; if (file_on_disk) { filepath_str = aws_s3_tester_create_file(allocator, g_pre_existing_object_1MB, NULL); @@ -1895,7 +1907,7 @@ static int s_test_s3_get_object_backpressure_small_increments(struct aws_allocat size_t window_initial_size = 1024; uint64_t window_increment_size = part_size / 4; return s_test_s3_get_object_backpressure_helper( - allocator, part_size, window_initial_size, window_increment_size, false); + allocator, part_size, window_initial_size, window_increment_size, false, false); } AWS_TEST_CASE(test_s3_get_object_backpressure_big_increments, s_test_s3_get_object_backpressure_big_increments) @@ -1908,7 +1920,7 @@ static int s_test_s3_get_object_backpressure_big_increments(struct aws_allocator size_t window_initial_size = 1024; uint64_t window_increment_size = part_size * 3; return s_test_s3_get_object_backpressure_helper( - allocator, part_size, window_initial_size, window_increment_size, false); + allocator, part_size, window_initial_size, window_increment_size, false, false); } AWS_TEST_CASE(test_s3_get_object_backpressure_initial_size_zero, s_test_s3_get_object_backpressure_initial_size_zero) @@ -1920,7 +1932,7 @@ static int s_test_s3_get_object_backpressure_initial_size_zero(struct aws_alloca size_t window_initial_size = 0; uint64_t window_increment_size = part_size / 2; return s_test_s3_get_object_backpressure_helper( - allocator, part_size, window_initial_size, window_increment_size, false); + allocator, part_size, window_initial_size, window_increment_size, false, false); } AWS_TEST_CASE( @@ -1937,7 +1949,7 @@ static int s_test_s3_get_object_backpressure_small_increments_recv_filepath( size_t window_initial_size = 1024; uint64_t window_increment_size = part_size / 2; return s_test_s3_get_object_backpressure_helper( - allocator, part_size, window_initial_size, window_increment_size, true); + allocator, part_size, window_initial_size, window_increment_size, true, false); } AWS_TEST_CASE( @@ -1952,7 +1964,7 @@ static int s_test_s3_get_object_backpressure_big_increments_recv_filepath(struct size_t window_initial_size = 1024; uint64_t window_increment_size = part_size * 3; return s_test_s3_get_object_backpressure_helper( - allocator, part_size, window_initial_size, window_increment_size, true); + allocator, part_size, window_initial_size, window_increment_size, true, false); } AWS_TEST_CASE( @@ -1968,7 +1980,51 @@ static int s_test_s3_get_object_backpressure_initial_size_zero_recv_filepath( size_t window_initial_size = 0; uint64_t window_increment_size = part_size / 2; return s_test_s3_get_object_backpressure_helper( - allocator, part_size, window_initial_size, window_increment_size, true); + allocator, part_size, window_initial_size, window_increment_size, true, false); +} + +AWS_TEST_CASE( + test_s3_get_object_backpressure_small_increments_s3express, + s_test_s3_get_object_backpressure_small_increments_s3express) +static int s_test_s3_get_object_backpressure_small_increments_s3express(struct aws_allocator *allocator, void *ctx) { + /* Test increments smaller than part-size with S3 Express. + * Only 1 part at a time should be in flight */ + (void)ctx; + size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */ + size_t part_size = file_size / 4; + size_t window_initial_size = 1024; + uint64_t window_increment_size = part_size / 4; + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, false, true); +} + +AWS_TEST_CASE( + test_s3_get_object_backpressure_big_increments_s3express, + s_test_s3_get_object_backpressure_big_increments_s3express) +static int s_test_s3_get_object_backpressure_big_increments_s3express(struct aws_allocator *allocator, void *ctx) { + /* Test increments larger than part-size with S3 Express. + * Multiple parts should be in flight at a time */ + (void)ctx; + size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */ + size_t part_size = file_size / 8; + size_t window_initial_size = 1024; + uint64_t window_increment_size = part_size * 3; + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, false, true); +} + +AWS_TEST_CASE( + test_s3_get_object_backpressure_initial_size_s3express, + s_test_s3_get_object_backpressure_initial_size_s3express) +static int s_test_s3_get_object_backpressure_initial_size_s3express(struct aws_allocator *allocator, void *ctx) { + /* Test with initial window size of zero with S3 Express */ + (void)ctx; + size_t file_size = 1 * 1024 * 1024; /* Test downloads 1MB file */ + size_t part_size = file_size / 4; + size_t window_initial_size = 0; + uint64_t window_increment_size = part_size / 2; + return s_test_s3_get_object_backpressure_helper( + allocator, part_size, window_initial_size, window_increment_size, false, true); } AWS_TEST_CASE(test_s3_get_object_part, s_test_s3_get_object_part) diff --git a/tests/test_helper/README.md b/tests/test_helper/README.md index 6e205dcc5..bab63363a 100644 --- a/tests/test_helper/README.md +++ b/tests/test_helper/README.md @@ -48,6 +48,7 @@ python3 test_helper.py clean + Add the lifecycle to automatic clean up the `upload/` and clean up incomplete multipart uploads after one day. + Upload files: - `pre-existing-10MB` 10MB file. + - `pre-existing-1MB` 1MB file. - with `--large_objects` enabled - `pre-existing-2GB` @@ -55,6 +56,7 @@ python3 test_helper.py clean + Add the lifecycle to automatic clean up the `upload/` and clean up incomplete multipart uploads after one day. + Upload files: - `pre-existing-10MB` 10MB file. + - `pre-existing-1MB` 1MB file. - with `--large_objects` enabled - `pre-existing-2GB` diff --git a/tests/test_helper/test_helper.py b/tests/test_helper/test_helper.py index f8200fdd1..5baa8eec8 100755 --- a/tests/test_helper/test_helper.py +++ b/tests/test_helper/test_helper.py @@ -189,6 +189,8 @@ def create_bucket_with_lifecycle(availability_zone=None, client=s3_client, regio put_pre_existing_objects( 10*MB, 'pre-existing-10MB', bucket=bucket_name, client=client) + put_pre_existing_objects( + 1*MB, 'pre-existing-1MB', bucket=bucket_name, client=client) if args.large_objects: put_pre_existing_objects( @@ -201,8 +203,6 @@ def create_bucket_with_lifecycle(availability_zone=None, client=s3_client, regio 10*MB, 'pre-existing-10MB-aes256', sse='aes256', bucket=bucket_name) put_pre_existing_objects( 10*MB, 'pre-existing-10MB-kms', sse='kms', bucket=bucket_name) - put_pre_existing_objects( - 1*MB, 'pre-existing-1MB', bucket=bucket_name) put_pre_existing_objects( 1*MB, 'pre-existing-1MB-@', bucket=bucket_name) put_pre_existing_objects( @@ -244,11 +244,14 @@ def cleanup(bucket_name, availability_zone=None, client=s3_client): print(f"s3://{bucket_name}/* - Listing objects...") try: - objects = client.list_objects_v2(Bucket=bucket_name)["Contents"] + response = client.list_objects_v2(Bucket=bucket_name) + # 'Contents' key only exists when there are objects in the bucket + objects = response.get("Contents", []) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == 'NoSuchBucket': print(f"s3://{bucket_name} - Did not exist. Moving on...") return + raise objects = list(map(lambda x: {"Key": x["Key"]}, objects)) if objects: print(f"s3://{bucket_name}/* - Deleting {len(objects)} objects...")