Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- 'main'

env:
BUILDER_VERSION: v0.9.72
BUILDER_VERSION: v0.9.90
BUILDER_SOURCE: releases
BUILDER_HOST: https://d19elf31gohf1l.cloudfront.net
PACKAGE_NAME: aws-c-http
Expand Down Expand Up @@ -135,6 +135,7 @@ jobs:

# Test downstream repos.
# This should not be required because we can run into a chicken and egg problem if there is a change that needs some fix in a downstream repo.
# bump to ubuntu-22 since ubuntu 18 failed to install the mock server deps with python3.6
downstream:
runs-on: ubuntu-24.04 # latest
steps:
Expand All @@ -146,7 +147,7 @@ jobs:
- name: Build ${{ env.PACKAGE_NAME }}
run: |
aws s3 cp s3://aws-crt-test-stuff/ci/${{ env.BUILDER_VERSION }}/linux-container-ci.sh ./linux-container-ci.sh && chmod a+x ./linux-container-ci.sh
./linux-container-ci.sh ${{ env.BUILDER_VERSION }} aws-crt-${{ env.LINUX_BASE_IMAGE }} build downstream -p ${{ env.PACKAGE_NAME }}
./linux-container-ci.sh ${{ env.BUILDER_VERSION }} aws-crt-ubuntu-22-x64 build downstream -p ${{ env.PACKAGE_NAME }}

windows:
runs-on: windows-2025 # latest
Expand Down Expand Up @@ -203,7 +204,7 @@ jobs:
python .\aws-c-http\build\deps\aws-c-common\scripts\appverifier_ctest.py --build_directory .\aws-c-http\build\aws-c-http

macos:
runs-on: macos-14
runs-on: macos-15
strategy:
fail-fast: false
matrix:
Expand All @@ -220,7 +221,7 @@ jobs:
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }}

macos-x64:
runs-on: macos-14-large
runs-on: macos-15-large
steps:
- uses: aws-actions/configure-aws-credentials@v4
with:
Expand All @@ -247,7 +248,7 @@ jobs:
python3 builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON --config Debug

localhost-test-macos:
runs-on: macos-14
runs-on: macos-15
strategy:
fail-fast: false
matrix:
Expand Down
12 changes: 10 additions & 2 deletions include/aws/http/http2_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,17 @@ struct aws_http2_stream_manager_options {
size_t max_concurrent_streams_per_connection;
/**
* Required.
* The max number of connections will be open at same time. If all the connections are full, manager will wait until
* available to vender more streams */
* The max number of connections that will be open at the same time. If all the connections are full, the manager
* will wait until a connection is available to vend more streams.
*/
size_t max_connections;
/**
* Optional.
* The max number of concurrent streams that can be active across all connections at the same time.
* 0 means no limit (default). When this limit is reached, the stream manager will wait for
* existing streams to complete before creating new ones, even if connections have available capacity.
*/
size_t max_concurrent_streams;
};

struct aws_http2_stream_manager_acquire_stream_options {
Expand Down
40 changes: 32 additions & 8 deletions include/aws/http/private/http2_stream_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ struct aws_h2_sm_connection {
} thread_data;

enum aws_h2_sm_connection_state_type state;

/**
* Node for tracking in the all_held_connections list,
* NOTE: lock required to alter the state of the list.
*/
struct aws_linked_list_node node;
};

/* Live from the user request to acquire a stream to the stream completed. */
Expand Down Expand Up @@ -114,6 +120,14 @@ struct aws_http2_stream_manager {
*/
size_t max_concurrent_streams_per_connection;

/**
* Optional. 0 means no limit (default).
* The max number of concurrent streams that can be active across all connections at the same time.
* When this limit is reached, the stream manager will wait for existing streams to complete
* before creating new ones, even if connections have available capacity.
*/
size_t max_concurrent_streams;

/**
* Task to invoke pending acquisition callbacks asynchronously if stream manager is shutting.
*/
Expand All @@ -129,26 +143,36 @@ struct aws_http2_stream_manager {
enum aws_h2_sm_state_type state;

/**
* A set of all connections that meet all requirement to use. Note: there will be connections not in this set,
* but hold by the stream manager, which can be tracked by the streams created on it. Set of `struct
* aws_h2_sm_connection *`
* A set of all connections that meet all requirement to use. Doesn't own the connection.
*
* Note: there will be connections not in this set, but hold by the stream manager, which can be tracked by the
* all_held_connections. Set of `struct aws_h2_sm_connection *`
*/
struct aws_random_access_set ideal_available_set;
/**
* A set of all available connections that exceed the soft limits set by users. Note: there will be connections
* not in this set, but hold by the stream manager, which can be tracked by the streams created. Set of `struct
* aws_h2_sm_connection *`
* A set of all available connections that exceed the soft limits set by users. Doesn't own the connection.
*
* Note: there will be connections not in this set, but hold by the stream manager, which can be tracked by the
* all_held_connections. Set of `struct aws_h2_sm_connection *`
*/
struct aws_random_access_set nonideal_available_set;
/* We don't mantain set for connections that is full or "dead" (Cannot make any new streams). We have streams
* opening from the connection tracking them */
/* We don't mantain set for connections that is full or "dead" (Cannot make any new streams). We have
* all_held_connections tracking them */

/**
* The set of all incomplete stream acquisition requests (haven't decide what connection to make the request
* to), list of `struct aws_h2_sm_pending_stream_acquisition*`
*/
struct aws_linked_list pending_stream_acquisitions;

/**
* List of all aws_h2_sm_connection that holding the HTTP connection from connection manager.
* This list tracks all aws_h2_sm_connection from getting the connection from connection manager until release
* it back.
* list of `struct aws_h2_sm_connection*`
*/
struct aws_linked_list all_held_connections;

/**
* The number of connections acquired from connection manager and not released yet.
*/
Expand Down
125 changes: 112 additions & 13 deletions source/http2_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ static void s_sm_log_stats_synced(struct aws_http2_stream_manager *stream_manage
stream_manager->synced_data.holding_connections_count);
}

/* Check against the max concurrent streams limit, return true if more streams are allowed. */
static bool s_sm_allows_more_concurrent_streams(struct aws_http2_stream_manager *stream_manager) {
if (stream_manager->max_concurrent_streams > 0) {
size_t total_active_streams =
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM] +
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS];
if (total_active_streams >= stream_manager->max_concurrent_streams) {
/* total active streams are equal or more than the setting */
return false;
}
}
return true;
}

/* The count acquire and release all needs to be invoked helding the lock */
static void s_sm_count_increase_synced(
struct aws_http2_stream_manager *stream_manager,
Expand Down Expand Up @@ -151,6 +165,18 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) {

AWS_ASSERT(pending_stream_acquisition->sm_connection == NULL);

/* Check if we've reached the max_concurrent_streams limit */
if (!s_sm_allows_more_concurrent_streams(stream_manager)) {
/* We've reached the limit, cannot assign a connection yet */
STREAM_MANAGER_LOGF(
DEBUG,
stream_manager,
"acquisition:%p waiting - max_concurrent_streams limit reached (%" PRIu64 ")",
(void *)pending_stream_acquisition,
(uint64_t)stream_manager->max_concurrent_streams);
return;
}
int errored = 0;
if (aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set)) {
/**
Expand All @@ -160,6 +186,10 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
s_get_best_sm_connection_from_set(&stream_manager->synced_data.ideal_available_set);
AWS_ASSERT(chosen_connection);
pending_stream_acquisition->sm_connection = chosen_connection;
if (chosen_connection->num_streams_assigned == 0) {
/* If bump from 0, acquire the refcount for streams */
aws_ref_count_acquire(&chosen_connection->ref_count);
}
chosen_connection->num_streams_assigned++;

STREAM_MANAGER_LOGF(
Expand Down Expand Up @@ -212,6 +242,10 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
s_get_best_sm_connection_from_set(&stream_manager->synced_data.nonideal_available_set);
AWS_ASSERT(chosen_connection);
pending_stream_acquisition->sm_connection = chosen_connection;
if (chosen_connection->num_streams_assigned == 0) {
/* If bump from 0, acquire the refcount for streams */
aws_ref_count_acquire(&chosen_connection->ref_count);
}
chosen_connection->num_streams_assigned++;

STREAM_MANAGER_LOGF(
Expand Down Expand Up @@ -299,6 +333,14 @@ static void s_finish_pending_stream_acquisitions_task(struct aws_task *task, voi
/* helper function for building the transaction: how many new connections we should request */
static void s_check_new_connections_needed_synced(struct aws_http2_stream_management_transaction *work) {
struct aws_http2_stream_manager *stream_manager = work->stream_manager;

/* avoid acquiring for connection when no more streams are allowed */
if (!s_sm_allows_more_concurrent_streams(stream_manager)) {
/* We've reached the limit, avoid creating new connection in case of having connection not being tracked by
* streams. */
return;
}

/* The ideal new connection we need to fit all the pending stream acquisitions */
size_t ideal_new_connection_count =
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] /
Expand Down Expand Up @@ -342,6 +384,17 @@ static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2
struct aws_http2_stream_manager *stream_manager = work->stream_manager;
if (stream_manager->synced_data.state == AWS_H2SMST_READY) {

/* Steps 0: Check if we've reached the max_concurrent_streams limit */
if (!s_sm_allows_more_concurrent_streams(stream_manager)) {
/* We've reached the limit, skip building the transactions */
STREAM_MANAGER_LOGF(
DEBUG,
stream_manager,
"stream manager waiting - max_concurrent_streams limit reached (%" PRIu64 ")",
(uint64_t)stream_manager->max_concurrent_streams);
return;
}

/* Steps 1: Pending acquisitions of stream */
while (!aws_linked_list_empty(&stream_manager->synced_data.pending_stream_acquisitions)) {
struct aws_linked_list_node *node =
Expand Down Expand Up @@ -495,6 +548,7 @@ static void s_connection_ping_task(struct aws_channel_task *task, void *arg, enu

static void s_sm_connection_destroy(void *user_data) {
struct aws_h2_sm_connection *sm_connection = user_data;

aws_mem_release(sm_connection->allocator, sm_connection);
}

Expand All @@ -515,6 +569,7 @@ static struct aws_h2_sm_connection *s_sm_connection_new(
sm_connection->stream_manager = stream_manager;
sm_connection->state = AWS_H2SMCST_IDEAL;
aws_ref_count_init(&sm_connection->ref_count, sm_connection, s_sm_connection_destroy);

if (stream_manager->connection_ping_period_ns) {
struct aws_channel *channel = aws_http_connection_get_channel(connection);
uint64_t schedule_time = 0;
Expand All @@ -531,15 +586,20 @@ static struct aws_h2_sm_connection *s_sm_connection_new(

static void s_sm_connection_release_connection(struct aws_h2_sm_connection *sm_connection) {
AWS_ASSERT(sm_connection->num_streams_assigned == 0);
if (sm_connection->connection) {
/* Should only be invoked from the connection thread. */
AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection)));
int error = aws_http_connection_manager_release_connection(
sm_connection->stream_manager->connection_manager, sm_connection->connection);
AWS_ASSERT(!error);
(void)error;
sm_connection->connection = NULL;
}
AWS_ASSERT(sm_connection->connection != NULL);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that a hard guarantee that connection will always be non-null?

Copy link
Copy Markdown
Contributor Author

@TingDaoK TingDaoK Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's the only place releasing the connection back to the connection manager and setting this to be null, other than the start destroy.

I can create a helper function to make it more clear. It's the helper function to release the connection back to the manager.

/* Should only be invoked from the connection thread. */
AWS_ASSERT(aws_channel_thread_is_callers_thread(aws_http_connection_get_channel(sm_connection->connection)));

s_lock_synced_data(sm_connection->stream_manager);
/* Remove this connection from the connection tracking list within the lock as altering the list in synced_data. */
aws_linked_list_remove(&sm_connection->node);
s_unlock_synced_data(sm_connection->stream_manager);

int error = aws_http_connection_manager_release_connection(
sm_connection->stream_manager->connection_manager, sm_connection->connection);
AWS_ASSERT(!error);
(void)error;
sm_connection->connection = NULL;
aws_ref_count_release(&sm_connection->ref_count);
}

Expand Down Expand Up @@ -612,6 +672,8 @@ static void s_sm_on_connection_acquired(struct aws_http_connection *connection,
should_release_connection = true;
} else {
struct aws_h2_sm_connection *sm_connection = s_sm_connection_new(stream_manager, connection);
/* Add this connection to the all_held_connections tracking list */
aws_linked_list_push_back(&stream_manager->synced_data.all_held_connections, &sm_connection->node);
bool added = false;
re_error |=
aws_random_access_set_add(&stream_manager->synced_data.ideal_available_set, sm_connection, &added);
Expand Down Expand Up @@ -769,6 +831,10 @@ static void s_sm_connection_on_scheduled_stream_finishes(
s_lock_synced_data(stream_manager);
s_sm_count_decrease_synced(stream_manager, AWS_SMCT_OPEN_STREAM, 1);
--sm_connection->num_streams_assigned;
if (sm_connection->num_streams_assigned == 0) {
/* Release the refcount for streams */
aws_ref_count_release(&sm_connection->ref_count);
}
if (!connection_available) {
/* It might be removed already, but, it's fine */
aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection);
Expand All @@ -777,8 +843,8 @@ static void s_sm_connection_on_scheduled_stream_finishes(
s_update_sm_connection_set_on_stream_finishes_synced(sm_connection, stream_manager);
}
s_aws_http2_stream_manager_build_transaction_synced(&work);
/* After we build transaction, if the sm_connection still have zero assigned stream, we can kill the
* sm_connection */
/* After we build transaction, if the sm_connection still have zero assigned stream, we can release the
* sm_connection back to the pool */
if (sm_connection->num_streams_assigned == 0) {
/* It might be removed already, but, it's fine */
aws_random_access_set_remove(&stream_manager->synced_data.ideal_available_set, sm_connection);
Expand Down Expand Up @@ -1019,12 +1085,43 @@ static void s_stream_manager_start_destroy(void *user_data) {
struct aws_http2_stream_manager *stream_manager = user_data;
STREAM_MANAGER_LOG(TRACE, stream_manager, "Stream Manager reaches the condition to destroy, start to destroy");
/* If there is no outstanding streams, the connections set should be empty. */
AWS_ASSERT(aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set) == 0);
AWS_ASSERT(aws_random_access_set_get_size(&stream_manager->synced_data.nonideal_available_set) == 0);
AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_CONNECTIONS_ACQUIRING] == 0);
AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM] == 0);
AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS] == 0);
AWS_ASSERT(stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION] == 0);

/* Iterate through all connections and clean up any that remain */
struct aws_linked_list_node *node = aws_linked_list_begin(&stream_manager->synced_data.all_held_connections);

/* No lock needed when stream manager started destroying process. Since there should have no other threads still
* working. */
const struct aws_linked_list_node *end_node =
aws_linked_list_end(&stream_manager->synced_data.all_held_connections);
while (node != end_node) {
struct aws_h2_sm_connection *sm_connection = AWS_CONTAINER_OF(node, struct aws_h2_sm_connection, node);
/* Move to next node before potentially destroying current connection */
node = aws_linked_list_next(node);

/* Verify this connection has no outstanding streams, which should be the precondition before stream manager
* start the destroy process. */
AWS_ASSERT(sm_connection->num_streams_assigned == 0);
/* Release the connection back to connection manager */
if (sm_connection->connection) {
aws_http_connection_manager_release_connection(
stream_manager->connection_manager, sm_connection->connection);
sm_connection->connection = NULL;
--stream_manager->synced_data.holding_connections_count;
}

/* Remove this connection from the connection tracking list */
aws_linked_list_remove(&sm_connection->node);
aws_ref_count_release(&sm_connection->ref_count);
}

/* All connections should be cleaned up now */
AWS_ASSERT(aws_linked_list_empty(&stream_manager->synced_data.all_held_connections));
AWS_ASSERT(stream_manager->synced_data.holding_connections_count == 0);

AWS_ASSERT(stream_manager->connection_manager);
struct aws_http_connection_manager *cm = stream_manager->connection_manager;
stream_manager->connection_manager = NULL;
Expand Down Expand Up @@ -1069,6 +1166,7 @@ struct aws_http2_stream_manager *aws_http2_stream_manager_new(
aws_mem_calloc(allocator, 1, sizeof(struct aws_http2_stream_manager));
stream_manager->allocator = allocator;
aws_linked_list_init(&stream_manager->synced_data.pending_stream_acquisitions);
aws_linked_list_init(&stream_manager->synced_data.all_held_connections);

if (aws_mutex_init(&stream_manager->synced_data.lock)) {
goto on_error;
Expand Down Expand Up @@ -1156,6 +1254,7 @@ struct aws_http2_stream_manager *aws_http2_stream_manager_new(
stream_manager->max_concurrent_streams_per_connection =
options->max_concurrent_streams_per_connection ? options->max_concurrent_streams_per_connection : UINT32_MAX;
stream_manager->max_connections = options->max_connections;
stream_manager->max_concurrent_streams = options->max_concurrent_streams; /* 0 means no limit */
stream_manager->close_connection_on_server_error = options->close_connection_on_server_error;

return stream_manager;
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,9 @@ add_net_test_case(h2_sm_mock_goaway)
add_net_test_case(h2_sm_connection_ping)
add_net_test_case(h2_sm_with_flow_control_err)
add_net_test_case(h2_sm_with_initial_settings)
add_net_test_case(h2_sm_mock_max_concurrent_streams)
add_net_test_case(h2_sm_mock_max_concurrent_streams_multiple_connections)
add_net_test_case(h2_sm_mock_max_concurrent_streams_zero_means_no_limit)

# Tests against real world server
add_net_test_case(h2_sm_acquire_stream)
Expand Down
Loading
Loading