Skip to content

Commit 71342a8

Browse files
committed
introduce max number of streams
1 parent 0d8e1a9 commit 71342a8

5 files changed

Lines changed: 176 additions & 1 deletion

File tree

include/aws/http/http2_stream_manager.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,16 @@ struct aws_http2_stream_manager_options {
155155
/**
156156
* Required.
157157
* The max number of connections will be open at same time. If all the connections are full, manager will wait until
158-
* available to vender more streams */
158+
* available to vender more streams
159+
*/
159160
size_t max_connections;
161+
/**
162+
* Optional.
163+
* The max total number of streams that can be active across all connections at the same time.
164+
* 0 means no limit (default). When this limit is reached, the stream manager will wait for
165+
* existing streams to complete before creating new ones, even if connections have available capacity.
166+
*/
167+
size_t max_total_streams;
160168
};
161169

162170
struct aws_http2_stream_manager_acquire_stream_options {

include/aws/http/private/http2_stream_manager_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ struct aws_http2_stream_manager {
114114
*/
115115
size_t max_concurrent_streams_per_connection;
116116

117+
/**
118+
* Optional. 0 means no limit (default).
119+
* The max total number of streams that can be active across all connections at the same time.
120+
* When this limit is reached, the stream manager will wait for existing streams to complete
121+
* before creating new ones, even if connections have available capacity.
122+
*/
123+
size_t max_total_streams;
124+
117125
/**
118126
* Task to invoke pending acquisition callbacks asynchronously if stream manager is shutting.
119127
*/

source/http2_stream_manager.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,25 @@ static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
151151
struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) {
152152

153153
AWS_ASSERT(pending_stream_acquisition->sm_connection == NULL);
154+
155+
/* Check if we've reached the max_total_streams limit */
156+
if (stream_manager->max_total_streams > 0) {
157+
size_t total_active_streams =
158+
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_OPEN_STREAM] +
159+
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_MAKE_REQUESTS];
160+
if (total_active_streams >= stream_manager->max_total_streams) {
161+
/* We've reached the limit, cannot assign a connection yet */
162+
STREAM_MANAGER_LOGF(
163+
DEBUG,
164+
stream_manager,
165+
"acquisition:%p waiting - max_total_streams limit reached (%" PRIu64 "/%" PRIu64 ")",
166+
(void *)pending_stream_acquisition,
167+
(uint64_t)total_active_streams,
168+
(uint64_t)stream_manager->max_total_streams);
169+
return;
170+
}
171+
}
172+
154173
int errored = 0;
155174
if (aws_random_access_set_get_size(&stream_manager->synced_data.ideal_available_set)) {
156175
/**
@@ -1156,6 +1175,7 @@ struct aws_http2_stream_manager *aws_http2_stream_manager_new(
11561175
stream_manager->max_concurrent_streams_per_connection =
11571176
options->max_concurrent_streams_per_connection ? options->max_concurrent_streams_per_connection : UINT32_MAX;
11581177
stream_manager->max_connections = options->max_connections;
1178+
stream_manager->max_total_streams = options->max_total_streams; /* 0 means no limit */
11591179
stream_manager->close_connection_on_server_error = options->close_connection_on_server_error;
11601180

11611181
return stream_manager;

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,9 @@ add_net_test_case(h2_sm_mock_goaway)
694694
add_net_test_case(h2_sm_connection_ping)
695695
add_net_test_case(h2_sm_with_flow_control_err)
696696
add_net_test_case(h2_sm_with_initial_settings)
697+
add_net_test_case(h2_sm_mock_max_total_streams)
698+
add_net_test_case(h2_sm_mock_max_total_streams_multiple_connections)
699+
add_net_test_case(h2_sm_mock_max_total_streams_zero_means_no_limit)
697700

698701
# Tests against real world server
699702
add_net_test_case(h2_sm_acquire_stream)

tests/test_stream_manager.c

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,142 @@ TEST_CASE(h2_sm_with_initial_settings) {
12931293
return s_tester_clean_up();
12941294
}
12951295

1296+
/* Test that max_total_streams limits the total number of active streams */
1297+
TEST_CASE(h2_sm_mock_max_total_streams) {
1298+
(void)ctx;
1299+
size_t max_total_streams = 5;
1300+
struct sm_tester_options options = {
1301+
.max_connections = 3,
1302+
.max_concurrent_streams_per_connection = 10,
1303+
.alloc = allocator,
1304+
};
1305+
ASSERT_SUCCESS(s_tester_init(&options));
1306+
1307+
/* Set max_total_streams on the stream manager */
1308+
s_tester.stream_manager->max_total_streams = max_total_streams;
1309+
1310+
s_override_cm_connect_function(s_aws_http_connection_manager_create_connection_sync_mock);
1311+
1312+
/* Try to acquire 10 streams, but only 5 should be created initially */
1313+
int num_to_acquire = 10;
1314+
ASSERT_SUCCESS(s_sm_stream_acquiring(num_to_acquire));
1315+
1316+
/* Wait for connection to be made */
1317+
ASSERT_SUCCESS(s_wait_on_fake_connection_count(1));
1318+
s_drain_all_fake_connection_testing_channel();
1319+
1320+
/* Should have acquired only max_total_streams streams */
1321+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(max_total_streams));
1322+
ASSERT_UINT_EQUALS(max_total_streams, aws_array_list_length(&s_tester.streams));
1323+
1324+
/* Complete 3 streams */
1325+
struct sm_fake_connection *fake_connection = s_get_fake_connection(0);
1326+
s_fake_connection_complete_streams(fake_connection, 3, false);
1327+
s_drain_all_fake_connection_testing_channel();
1328+
1329+
/* Now 3 more streams should be created (total of 8 acquired) */
1330+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(8));
1331+
ASSERT_UINT_EQUALS(8, aws_array_list_length(&s_tester.streams));
1332+
1333+
/* Complete 2 more streams */
1334+
s_fake_connection_complete_streams(fake_connection, 2, false);
1335+
s_drain_all_fake_connection_testing_channel();
1336+
1337+
/* The remaining 2 streams should now be created (total of 10) */
1338+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(10));
1339+
ASSERT_UINT_EQUALS(10, aws_array_list_length(&s_tester.streams));
1340+
1341+
/* Complete all remaining streams */
1342+
ASSERT_SUCCESS(s_complete_all_fake_connection_streams());
1343+
1344+
return s_tester_clean_up();
1345+
}
1346+
1347+
/* Test that max_total_streams works with multiple connections */
1348+
TEST_CASE(h2_sm_mock_max_total_streams_multiple_connections) {
1349+
(void)ctx;
1350+
size_t max_total_streams = 8;
1351+
struct sm_tester_options options = {
1352+
.max_connections = 4,
1353+
.max_concurrent_streams_per_connection = 3,
1354+
.alloc = allocator,
1355+
};
1356+
ASSERT_SUCCESS(s_tester_init(&options));
1357+
1358+
/* Set max_total_streams */
1359+
s_tester.stream_manager->max_total_streams = max_total_streams;
1360+
1361+
s_override_cm_connect_function(s_aws_http_connection_manager_create_connection_sync_mock);
1362+
1363+
/* Try to acquire 15 streams */
1364+
int num_to_acquire = 15;
1365+
ASSERT_SUCCESS(s_sm_stream_acquiring(num_to_acquire));
1366+
1367+
/* all 4 connections should be created for the pending streams, but only 8 streams will be created in total */
1368+
ASSERT_SUCCESS(s_wait_on_fake_connection_count(4));
1369+
s_drain_all_fake_connection_testing_channel();
1370+
1371+
/* Should have acquired only max_total_streams streams */
1372+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(max_total_streams));
1373+
ASSERT_UINT_EQUALS(max_total_streams, aws_array_list_length(&s_tester.streams));
1374+
/* 4 connections total */
1375+
ASSERT_UINT_EQUALS(4, aws_array_list_length(&s_tester.fake_connections));
1376+
1377+
/* Complete 4 streams from first 2 connections, since they must have at least 2 streams. */
1378+
for (size_t i = 0; i < 2; ++i) {
1379+
struct sm_fake_connection *fake_connection = s_get_fake_connection(i);
1380+
s_fake_connection_complete_streams(fake_connection, 2, false);
1381+
}
1382+
s_drain_all_fake_connection_testing_channel();
1383+
1384+
/* 4 more streams should be created (total of 12) */
1385+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(12));
1386+
ASSERT_UINT_EQUALS(12, aws_array_list_length(&s_tester.streams));
1387+
1388+
/* Complete remaining streams */
1389+
ASSERT_SUCCESS(s_complete_all_fake_connection_streams());
1390+
s_drain_all_fake_connection_testing_channel();
1391+
1392+
/* All 15 streams should eventually be acquired */
1393+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(15));
1394+
ASSERT_UINT_EQUALS(15, aws_array_list_length(&s_tester.streams));
1395+
1396+
ASSERT_SUCCESS(s_complete_all_fake_connection_streams());
1397+
1398+
return s_tester_clean_up();
1399+
}
1400+
1401+
/* Test that max_total_streams = 0 means no limit (default behavior) */
1402+
TEST_CASE(h2_sm_mock_max_total_streams_zero_means_no_limit) {
1403+
(void)ctx;
1404+
struct sm_tester_options options = {
1405+
.max_connections = 2,
1406+
.max_concurrent_streams_per_connection = 5,
1407+
.alloc = allocator,
1408+
};
1409+
ASSERT_SUCCESS(s_tester_init(&options));
1410+
1411+
/* max_total_streams defaults to 0 (no limit) */
1412+
ASSERT_UINT_EQUALS(0, s_tester.stream_manager->max_total_streams);
1413+
1414+
s_override_cm_connect_function(s_aws_http_connection_manager_create_connection_sync_mock);
1415+
1416+
/* Acquire 10 streams - all should be created immediately */
1417+
int num_to_acquire = 10;
1418+
ASSERT_SUCCESS(s_sm_stream_acquiring(num_to_acquire));
1419+
1420+
ASSERT_SUCCESS(s_wait_on_fake_connection_count(2)); /* 2 connections needed for 10 streams */
1421+
s_drain_all_fake_connection_testing_channel();
1422+
1423+
/* All 10 streams should be acquired without waiting */
1424+
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(num_to_acquire));
1425+
ASSERT_UINT_EQUALS(num_to_acquire, aws_array_list_length(&s_tester.streams));
1426+
1427+
ASSERT_SUCCESS(s_complete_all_fake_connection_streams());
1428+
1429+
return s_tester_clean_up();
1430+
}
1431+
12961432
/*******************************************************************************
12971433
* Net test, that makes real HTTP/2 connection and requests
12981434
******************************************************************************/

0 commit comments

Comments
 (0)