@@ -1976,9 +1976,14 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
19761976 struct aws_array_list * event_delivery_array = & meta_request -> io_threaded_data .event_delivery_array ;
19771977 AWS_FATAL_ASSERT (aws_array_list_length (event_delivery_array ) == 0 );
19781978
1979+ struct aws_array_list incomplete_deliver_events_array ;
1980+ aws_array_list_init_dynamic (
1981+ & incomplete_deliver_events_array , meta_request -> allocator , 1 , sizeof (struct aws_s3_meta_request_event ));
1982+
19791983 /* If an error occurs, don't fire callbacks anymore. */
19801984 int error_code = AWS_ERROR_SUCCESS ;
19811985 uint32_t num_parts_delivered = 0 ;
1986+ uint64_t bytes_allowed_to_deliver = 0 ;
19821987
19831988 /* BEGIN CRITICAL SECTION */
19841989 {
@@ -1991,6 +1996,9 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
19911996 error_code = AWS_ERROR_S3_CANCELED ;
19921997 }
19931998
1999+ bytes_allowed_to_deliver = meta_request -> synced_data .read_window_running_total -
2000+ meta_request -> io_threaded_data .num_bytes_delivery_completed ;
2001+
19942002 aws_s3_meta_request_unlock_synced_data (meta_request );
19952003 }
19962004 /* END CRITICAL SECTION */
@@ -2003,15 +2011,42 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20032011
20042012 case AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY : {
20052013 struct aws_s3_request * request = event .u .response_body .completed_request ;
2014+ uint64_t bytes_delivered_for_request = event .u .response_body .bytes_delivered ;
20062015 AWS_ASSERT (meta_request == request -> meta_request );
2016+ bool delivery_incomplete = false;
20072017 struct aws_byte_cursor response_body = aws_byte_cursor_from_buf (& request -> send_data .response_body );
2018+ if (response_body .len == 0 ) {
2019+ /* Nothing to delivery, finish this delivery event and break out. */
2020+ aws_atomic_fetch_sub (& client -> stats .num_requests_streaming_response , 1 );
2021+
2022+ ++ num_parts_delivered ;
2023+ request -> send_data .metrics =
2024+ s_s3_request_finish_up_and_release_metrics (request -> send_data .metrics , meta_request );
2025+
2026+ aws_s3_request_release (request );
2027+ break ;
2028+ }
2029+
2030+ if (meta_request -> body_callback ) {
2031+ /* If customer set the body callback, make sure we are not delivery them more than asked via the
2032+ * callback. */
2033+ aws_byte_cursor_advance (& response_body , bytes_delivered_for_request );
2034+ if (response_body .len > bytes_allowed_to_deliver ) {
2035+ response_body .len = bytes_allowed_to_deliver ;
2036+ delivery_incomplete = true;
2037+ }
2038+ } else {
2039+ /* We should not have any incomplete delivery in this case. */
2040+ AWS_FATAL_ASSERT (bytes_delivered_for_request == 0 );
2041+ }
2042+ uint64_t delivery_range_start = request -> part_range_start + bytes_delivered_for_request ;
20082043
20092044 AWS_ASSERT (request -> part_number >= 1 );
20102045 if (request -> part_number == 1 ) {
2011- meta_request -> io_threaded_data .next_deliver_range_start = request -> part_range_start ;
2046+ meta_request -> io_threaded_data .next_deliver_range_start = delivery_range_start ;
20122047 }
20132048 /* Make sure the response body is delivered in the sequential order */
2014- AWS_FATAL_ASSERT (request -> part_range_start == meta_request -> io_threaded_data .next_deliver_range_start );
2049+ AWS_FATAL_ASSERT (delivery_range_start == meta_request -> io_threaded_data .next_deliver_range_start );
20152050 meta_request -> io_threaded_data .next_deliver_range_start += response_body .len ;
20162051
20172052 if (error_code == AWS_ERROR_SUCCESS && response_body .len > 0 ) {
@@ -2055,7 +2090,7 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20552090 meta_request ,
20562091 & response_body ,
20572092 (struct aws_s3_meta_request_receive_body_extra_info ){
2058- .range_start = request -> part_range_start , .ticket = request -> ticket },
2093+ .range_start = delivery_range_start , .ticket = request -> ticket },
20592094 meta_request -> user_data )) {
20602095 error_code = aws_last_error_or_unknown ();
20612096 AWS_LOGF_ERROR (
@@ -2067,7 +2102,7 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20672102 } else if (
20682103 meta_request -> body_callback != NULL &&
20692104 meta_request -> body_callback (
2070- meta_request , & response_body , request -> part_range_start , meta_request -> user_data )) {
2105+ meta_request , & response_body , delivery_range_start , meta_request -> user_data )) {
20712106
20722107 error_code = aws_last_error_or_unknown ();
20732108 AWS_LOGF_ERROR (
@@ -2087,13 +2122,25 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20872122 }
20882123 }
20892124 }
2090- aws_atomic_fetch_sub (& client -> stats .num_requests_streaming_response , 1 );
2125+ event .u .response_body .bytes_delivered += response_body .len ;
2126+ meta_request -> io_threaded_data .num_bytes_delivery_completed += response_body .len ;
2127+
2128+ if (!delivery_incomplete || error_code != AWS_ERROR_SUCCESS ) {
2129+ /* We completed the delivery for this request. */
2130+ aws_atomic_fetch_sub (& client -> stats .num_requests_streaming_response , 1 );
20912131
2092- ++ num_parts_delivered ;
2093- request -> send_data .metrics =
2094- s_s3_request_finish_up_and_release_metrics (request -> send_data .metrics , meta_request );
2132+ ++ num_parts_delivered ;
2133+ request -> send_data .metrics =
2134+ s_s3_request_finish_up_and_release_metrics (request -> send_data .metrics , meta_request );
20952135
2096- aws_s3_request_release (request );
2136+ aws_s3_request_release (request );
2137+ } else {
2138+ /* We didn't complete the delivery for this request and no error happened */
2139+ /* Push to the front of the queue and wait for the next tick to deliver the rest of the bytes. */
2140+ /* Note: we push to the front of the array since when we move those incomplete events back to the
2141+ * synced_queue, we need to make sure it still has the same order. */
2142+ aws_array_list_push_front (& incomplete_deliver_events_array , & event );
2143+ }
20972144 } break ;
20982145
20992146 case AWS_S3_META_REQUEST_EVENT_PROGRESS : {
@@ -2149,6 +2196,13 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
21492196 if (error_code != AWS_ERROR_SUCCESS ) {
21502197 aws_s3_meta_request_set_fail_synced (meta_request , NULL , error_code );
21512198 }
2199+ /* Push the iocompileted events back to the queue */
2200+ for (size_t i = 0 ; i < aws_array_list_length (& incomplete_deliver_events_array ); ++ i ) {
2201+ struct aws_s3_meta_request_event event ;
2202+ aws_array_list_get_at (& incomplete_deliver_events_array , & event , i );
2203+ /* Push the incomplete one to the front of the queue. */
2204+ aws_array_list_push_front (& meta_request -> synced_data .event_delivery_array , & event );
2205+ }
21522206
21532207 meta_request -> synced_data .num_parts_delivery_completed += num_parts_delivered ;
21542208 meta_request -> synced_data .event_delivery_active = false;
0 commit comments