-
Notifications
You must be signed in to change notification settings - Fork 50
introduce max concurrent streams for stream manager #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
71342a8
cfdf8bd
e0163e4
f2630ff
b6deece
eab3565
ae542f4
878c282
6da4fa3
d8fbec4
092c182
1ac273b
8a5c303
847aff1
9a350ce
0fc6168
daea2fd
a6d309e
5ab0c96
fd03531
4a9d506
3ef299b
4ab0d80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,20 @@ static void s_sm_log_stats_synced(struct aws_http2_stream_manager *stream_manage | |
| stream_manager->synced_data.holding_connections_count); | ||
| } | ||
|
|
||
| /* Check against the max concurrent streams limit, return true if more streams are allowed. */ | ||
| static bool s_sm_allows_more_concurrent_streams(struct aws_http2_stream_manager *stream_manager) { | ||
| if (stream_manager->max_concurrent_streams > 0) { | ||
| size_t total_active_streams = | ||
| stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM] + | ||
| stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS]; | ||
| if (total_active_streams >= stream_manager->max_concurrent_streams) { | ||
| /* total active streams are equal or more than the setting */ | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| /* The count acquire and release all needs to be invoked helding the lock */ | ||
| static void s_sm_count_increase_synced( | ||
| struct aws_http2_stream_manager *stream_manager, | ||
|
|
@@ -151,6 +165,18 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced( | |
| struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) { | ||
|
|
||
| AWS_ASSERT(pending_stream_acquisition->sm_connection == NULL); | ||
|
|
||
| /* Check if we've reached the max_concurrent_streams limit */ | ||
| if (!s_sm_allows_more_concurrent_streams(stream_manager)) { | ||
| /* We've reached the limit, cannot assign a connection yet */ | ||
| STREAM_MANAGER_LOGF( | ||
| DEBUG, | ||
| stream_manager, | ||
| "acquisition:%p waiting - max_concurrent_streams limit reached (%" PRIu64 ")", | ||
| (void *)pending_stream_acquisition, | ||
| (uint64_t)stream_manager->max_concurrent_streams); | ||
| return; | ||
| } | ||
| int errored = 0; | ||
| if (aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set)) { | ||
| /** | ||
|
|
@@ -160,7 +186,10 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced( | |
| s_get_best_sm_connection_from_set(&stream_manager->synced_data.ideal_available_set); | ||
| AWS_ASSERT(chosen_connection); | ||
| pending_stream_acquisition->sm_connection = chosen_connection; | ||
| chosen_connection->num_streams_assigned++; | ||
| if (chosen_connection->num_streams_assigned++ == 0) { | ||
|
TingDaoK marked this conversation as resolved.
Outdated
|
||
| /* If bump from 0, acquire the refcount for streams */ | ||
| aws_ref_count_acquire(&chosen_connection->ref_count); | ||
| } | ||
|
|
||
| STREAM_MANAGER_LOGF( | ||
| DEBUG, | ||
|
|
@@ -212,7 +241,10 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced( | |
| s_get_best_sm_connection_from_set(&stream_manager->synced_data.nonideal_available_set); | ||
| AWS_ASSERT(chosen_connection); | ||
| pending_stream_acquisition->sm_connection = chosen_connection; | ||
| chosen_connection->num_streams_assigned++; | ||
| if (chosen_connection->num_streams_assigned++ == 0) { | ||
| /* If bump from 0, acquire the refcount for streams */ | ||
| aws_ref_count_acquire(&chosen_connection->ref_count); | ||
| } | ||
|
|
||
| STREAM_MANAGER_LOGF( | ||
| DEBUG, | ||
|
|
@@ -299,6 +331,14 @@ static void s_finish_pending_stream_acquisitions_task(struct aws_task *task, voi | |
| /* helper function for building the transaction: how many new connections we should request */ | ||
| static void s_check_new_connections_needed_synced(struct aws_http2_stream_management_transaction *work) { | ||
| struct aws_http2_stream_manager *stream_manager = work->stream_manager; | ||
|
|
||
| /* avoid acquiring for connection when no more streams are allowed */ | ||
| if (!s_sm_allows_more_concurrent_streams(stream_manager)) { | ||
| /* We've reached the limit, avoid creating new connection in case of having connection not being tracked by | ||
| * streams. */ | ||
| return; | ||
| } | ||
|
|
||
| /* The ideal new connection we need to fit all the pending stream acquisitions */ | ||
| size_t ideal_new_connection_count = | ||
| stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] / | ||
|
|
@@ -342,6 +382,17 @@ static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2 | |
| struct aws_http2_stream_manager *stream_manager = work->stream_manager; | ||
| if (stream_manager->synced_data.state == AWS_H2SMST_READY) { | ||
|
|
||
| /* Steps 0: Check if we've reached the max_concurrent_streams limit */ | ||
| if (!s_sm_allows_more_concurrent_streams(stream_manager)) { | ||
| /* We've reached the limit, skip building the transactions */ | ||
| STREAM_MANAGER_LOGF( | ||
| DEBUG, | ||
| stream_manager, | ||
| "stream manager waiting - max_concurrent_streams limit reached (%" PRIu64 ")", | ||
| (uint64_t)stream_manager->max_concurrent_streams); | ||
| return; | ||
| } | ||
|
|
||
| /* Steps 1: Pending acquisitions of stream */ | ||
| while (!aws_linked_list_empty(&stream_manager->synced_data.pending_stream_acquisitions)) { | ||
| struct aws_linked_list_node *node = | ||
|
|
@@ -495,6 +546,7 @@ static void s_connection_ping_task(struct aws_channel_task *task, void *arg, enu | |
|
|
||
| static void s_sm_connection_destroy(void *user_data) { | ||
| struct aws_h2_sm_connection *sm_connection = user_data; | ||
|
|
||
| aws_mem_release(sm_connection->allocator, sm_connection); | ||
| } | ||
|
|
||
|
|
@@ -515,6 +567,7 @@ static struct aws_h2_sm_connection *s_sm_connection_new( | |
| sm_connection->stream_manager = stream_manager; | ||
| sm_connection->state = AWS_H2SMCST_IDEAL; | ||
| aws_ref_count_init(&sm_connection->ref_count, sm_connection, s_sm_connection_destroy); | ||
|
|
||
| if (stream_manager->connection_ping_period_ns) { | ||
| struct aws_channel *channel = aws_http_connection_get_channel(connection); | ||
| uint64_t schedule_time = 0; | ||
|
|
@@ -531,15 +584,20 @@ static struct aws_h2_sm_connection *s_sm_connection_new( | |
|
|
||
| static void s_sm_connection_release_connection(struct aws_h2_sm_connection *sm_connection) { | ||
| AWS_ASSERT(sm_connection->num_streams_assigned == 0); | ||
| if (sm_connection->connection) { | ||
| /* Should only be invoked from the connection thread. */ | ||
| AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection))); | ||
| int error = aws_http_connection_manager_release_connection( | ||
| sm_connection->stream_manager->connection_manager, sm_connection->connection); | ||
| AWS_ASSERT(!error); | ||
| (void)error; | ||
| sm_connection->connection = NULL; | ||
| } | ||
| AWS_ASSERT(sm_connection->connection != NULL); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is that a hard guarantee that connection will always be non-null?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, that's the only place releasing the connection back to the connection manager and setting this to be null, other than the start destroy.
|
||
| /* Should only be invoked from the connection thread. */ | ||
| AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection))); | ||
|
|
||
| s_lock_synced_data(sm_connection->stream_manager); | ||
| /* Remove this connection from the connection tracking list within the lock as altering the list in synced_data. */ | ||
| aws_linked_list_remove(&sm_connection->node); | ||
| s_unlock_synced_data(sm_connection->stream_manager); | ||
|
|
||
| int error = aws_http_connection_manager_release_connection( | ||
| sm_connection->stream_manager->connection_manager, sm_connection->connection); | ||
| AWS_ASSERT(!error); | ||
| (void)error; | ||
| sm_connection->connection = NULL; | ||
| aws_ref_count_release(&sm_connection->ref_count); | ||
| } | ||
|
|
||
|
|
@@ -612,6 +670,8 @@ static void s_sm_on_connection_acquired(struct aws_http_connection *connection, | |
| should_release_connection = true; | ||
| } else { | ||
| struct aws_h2_sm_connection *sm_connection = s_sm_connection_new(stream_manager, connection); | ||
| /* Add this connection to the all_held_connections tracking list */ | ||
| aws_linked_list_push_back(&stream_manager->synced_data.all_held_connections, &sm_connection->node); | ||
| bool added = false; | ||
| re_error |= | ||
| aws_random_access_set_add(&stream_manager->synced_data.ideal_available_set, sm_connection, &added); | ||
|
|
@@ -768,7 +828,10 @@ static void s_sm_connection_on_scheduled_stream_finishes( | |
| { /* BEGIN CRITICAL SECTION */ | ||
| s_lock_synced_data(stream_manager); | ||
| s_sm_count_decrease_synced(stream_manager, AWS_SMCT_OPEN_STREAM, 1); | ||
| --sm_connection->num_streams_assigned; | ||
| if (--sm_connection->num_streams_assigned == 0) { | ||
| /* Release the refcount for streams */ | ||
| aws_ref_count_release(&sm_connection->ref_count); | ||
| } | ||
| if (!connection_available) { | ||
| /* It might be removed already, but, it's fine */ | ||
| aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection); | ||
|
|
@@ -777,8 +840,8 @@ static void s_sm_connection_on_scheduled_stream_finishes( | |
| s_update_sm_connection_set_on_stream_finishes_synced(sm_connection, stream_manager); | ||
| } | ||
| s_aws_http2_stream_manager_build_transaction_synced(&work); | ||
| /* After we build transaction, if the sm_connection still have zero assigned stream, we can kill the | ||
| * sm_connection */ | ||
| /* After we build transaction, if the sm_connection still have zero assigned stream, we can release the | ||
| * sm_connection back to the pool */ | ||
| if (sm_connection->num_streams_assigned == 0) { | ||
| /* It might be removed already, but, it's fine */ | ||
| aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection); | ||
|
|
@@ -1019,12 +1082,42 @@ static void s_stream_manager_start_destroy(void *user_data) { | |
| struct aws_http2_stream_manager *stream_manager = user_data; | ||
| STREAM_MANAGER_LOG(TRACE, stream_manager, "Stream Manager reaches the condition to destroy, start to destroy"); | ||
| /* If there is no outstanding streams, the connections set should be empty. */ | ||
| AWS_ASSERT(aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set) == 0); | ||
| AWS_ASSERT(aws_random_access_set_get_size(&stream_manager->synced_data.nonideal_available_set) == 0); | ||
| AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING] == 0); | ||
| AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM] == 0); | ||
| AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS] == 0); | ||
| AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] == 0); | ||
|
|
||
| /* Iterate through all connections and clean up any that remain */ | ||
| struct aws_linked_list_node *node = aws_linked_list_begin(&stream_manager->synced_data.all_held_connections); | ||
|
|
||
| /* No lock needed when stream manager started destroying process. Since there should have no other threads still | ||
| * working. */ | ||
| const struct aws_linked_list_node *end_node = | ||
| aws_linked_list_end(&stream_manager->synced_data.all_held_connections); | ||
| while (node != end_node) { | ||
| struct aws_h2_sm_connection *sm_connection = AWS_CONTAINER_OF(node, struct aws_h2_sm_connection, node); | ||
| /* Move to next node before potentially destroying current connection */ | ||
| node = aws_linked_list_next(node); | ||
|
|
||
| /* Verify this connection has no outstanding streams */ | ||
| AWS_FATAL_ASSERT(sm_connection->num_streams_assigned == 0); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this really fatal? can there be any potential case where this occurs in some corner case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary to be fatal, but we should only start to destroy the stream manager when there is no active streams, which is checked before starting the destroy process |
||
| /* Release the connection back to connection manager */ | ||
| if (sm_connection->connection) { | ||
| aws_http_connection_manager_release_connection( | ||
| stream_manager->connection_manager, sm_connection->connection); | ||
| sm_connection->connection = NULL; | ||
| --stream_manager->synced_data.holding_connections_count; | ||
| } | ||
|
|
||
| /* Remove this connection from the connection tracking list */ | ||
| aws_linked_list_remove(&sm_connection->node); | ||
| aws_ref_count_release(&sm_connection->ref_count); | ||
| } | ||
|
|
||
| /* All connections should be cleaned up now */ | ||
| AWS_ASSERT(aws_linked_list_empty(&stream_manager->synced_data.all_held_connections)); | ||
| AWS_ASSERT(stream_manager->synced_data.holding_connections_count == 0); | ||
|
|
||
| AWS_ASSERT(stream_manager->connection_manager); | ||
| struct aws_http_connection_manager *cm = stream_manager->connection_manager; | ||
| stream_manager->connection_manager = NULL; | ||
|
|
@@ -1069,6 +1162,7 @@ struct aws_http2_stream_manager *aws_http2_stream_manager_new( | |
| aws_mem_calloc(allocator, 1, sizeof(struct aws_http2_stream_manager)); | ||
| stream_manager->allocator = allocator; | ||
| aws_linked_list_init(&stream_manager->synced_data.pending_stream_acquisitions); | ||
| aws_linked_list_init(&stream_manager->synced_data.all_held_connections); | ||
|
|
||
| if (aws_mutex_init(&stream_manager->synced_data.lock)) { | ||
| goto on_error; | ||
|
|
@@ -1156,6 +1250,7 @@ struct aws_http2_stream_manager *aws_http2_stream_manager_new( | |
| stream_manager->max_concurrent_streams_per_connection = | ||
| options->max_concurrent_streams_per_connection ? options->max_concurrent_streams_per_connection : UINT32_MAX; | ||
| stream_manager->max_connections = options->max_connections; | ||
| stream_manager->max_concurrent_streams = options->max_concurrent_streams; /* 0 means no limit */ | ||
| stream_manager->close_connection_on_server_error = options->close_connection_on_server_error; | ||
|
|
||
| return stream_manager; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.