Skip to content

Commit 2bfe376

Browse files
committed
fix the read window update from the same thread
1 parent a109fa6 commit 2bfe376

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

source/s3_meta_request.c

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1988,7 +1988,6 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
19881988
int error_code = AWS_ERROR_SUCCESS;
19891989
uint32_t num_parts_delivered = 0;
19901990
uint64_t bytes_allowed_to_deliver = 0;
1991-
uint64_t read_window_to_increment = 0;
19921991

19931992
/* BEGIN CRITICAL SECTION */
19941993
{
@@ -2093,7 +2092,7 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
20932092
aws_error_name(error_code));
20942093
}
20952094
if (meta_request->client->enable_read_backpressure) {
2096-
read_window_to_increment += response_body.len;
2095+
aws_s3_meta_request_increment_read_window(meta_request, response_body.len);
20972096
}
20982097
} else if (
20992098
meta_request->body_callback_ex != NULL &&
@@ -2207,20 +2206,31 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
22072206
if (error_code != AWS_ERROR_SUCCESS) {
22082207
aws_s3_meta_request_set_fail_synced(meta_request, NULL, error_code);
22092208
}
2210-
/* Push the iocompileted events back to the queue */
2211-
for (size_t i = 0; i < aws_array_list_length(&incomplete_deliver_events_array); ++i) {
2212-
struct aws_s3_meta_request_event event;
2213-
aws_array_list_get_at(&incomplete_deliver_events_array, &event, i);
2214-
/* Push the incomplete one to the front of the queue. */
2215-
aws_array_list_push_front(&meta_request->synced_data.event_delivery_array, &event);
2209+
if (aws_array_list_length(&incomplete_deliver_events_array) > 0) {
2210+
/* Only if we don't have any window to deliver the bytes, we will have incomplete parts. */
2211+
AWS_FATAL_ASSERT(bytes_allowed_to_deliver == 0);
2212+
/* Push the iocompileted events back to the queue */
2213+
for (size_t i = 0; i < aws_array_list_length(&incomplete_deliver_events_array); ++i) {
2214+
struct aws_s3_meta_request_event event;
2215+
aws_array_list_get_at(&incomplete_deliver_events_array, &event, i);
2216+
/* Push the incomplete one to the front of the queue. */
2217+
aws_array_list_push_front(&meta_request->synced_data.event_delivery_array, &event);
2218+
}
2219+
/* As we push to the event delivery array, we check if we need to schedule another delivery by if there
2220+
* is space to make the delivery or not. */
2221+
bytes_allowed_to_deliver = meta_request->synced_data.read_window_running_total -
2222+
meta_request->io_threaded_data.num_bytes_delivery_completed;
2223+
if (bytes_allowed_to_deliver > 0 && error_code == AWS_ERROR_SUCCESS) {
2224+
/* We have more space now, let's try another delivery now. */
2225+
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, NULL);
2226+
}
22162227
}
22172228

22182229
meta_request->synced_data.num_parts_delivery_completed += num_parts_delivered;
22192230
meta_request->synced_data.event_delivery_active = false;
22202231
aws_s3_meta_request_unlock_synced_data(meta_request);
22212232
}
22222233
/* END CRITICAL SECTION */
2223-
aws_s3_meta_request_increment_read_window(meta_request, read_window_to_increment);
22242234
aws_array_list_clean_up(&incomplete_deliver_events_array);
22252235

22262236
aws_s3_client_schedule_process_work(client);

0 commit comments

Comments
 (0)