@@ -166,31 +166,6 @@ static uint32_t s_get_ideal_connection_number_from_throughput(double throughput_
166166 return (uint32_t )ideal_connection_count_double ;
167167}
168168
169- /**
170- * Get the system file descriptor limit.
171- * Returns 50% of the OS limit to reserve capacity for non-S3 operations.
172- * Falls back to 500 if getrlimit fails.
173- */
174- static uint32_t s_get_system_fd_limit (void ) {
175- #ifdef _WIN32
176- /* Windows doesn't have getrlimit, return a reasonable default */
177- return 100 ;
178- #else
179- struct rlimit rl ;
180- if (getrlimit (RLIMIT_NOFILE , & rl ) == 0 ) {
181- /* Reserve 50% for non-S3 operations */
182- uint64_t usable = (uint64_t )(rl .rlim_cur ) / 2 ;
183- /* Cap at uint32_t max to avoid overflow */
184- if (usable > UINT32_MAX ) {
185- usable = UINT32_MAX ;
186- }
187- return (uint32_t )usable ;
188- }
189- /* Fallback if getrlimit fails */
190- return 0 ;
191- #endif
192- }
193-
194169/* Returns the max number of connections allowed.
195170 *
196171 * When meta request is NULL, this will return the overall allowed number of connections based on the clinet
@@ -2129,68 +2104,81 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
21292104 uint32_t num_requests_in_flight = (uint32_t )aws_atomic_load_int (& client -> stats .num_requests_in_flight );
21302105 uint32_t num_requests_streaming = (uint32_t )aws_atomic_load_int (& client -> stats .num_requests_streaming_request_body );
21312106
2132- /**
2133- * Iterate through the meta requests to update meta requests and get new requests that can then be prepared
2134- * (reading from any streams, signing, etc.) for sending.
2135- */
2136- while (!aws_linked_list_empty (& client -> threaded_data .meta_requests )) {
2137-
2138- struct aws_linked_list_node * meta_request_node = aws_linked_list_begin (& client -> threaded_data .meta_requests );
2139- struct aws_s3_meta_request * meta_request =
2140- AWS_CONTAINER_OF (meta_request_node , struct aws_s3_meta_request , client_process_work_threaded_data );
2141-
2142- if (!s_s3_client_should_update_meta_request (
2143- client ,
2144- meta_request ,
2145- num_requests_in_flight ,
2146- num_requests_streaming ,
2147- max_requests_in_flight ,
2148- max_requests_prepare )) {
2149-
2150- /* Move the meta request to be processed from next loop. */
2151- aws_linked_list_remove (& meta_request -> client_process_work_threaded_data .node );
2152- aws_linked_list_push_back (
2153- & meta_requests_work_remaining , & meta_request -> client_process_work_threaded_data .node );
2154- continue ;
2155- }
2107+ const uint32_t pass_flags [] = {
2108+ AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE ,
2109+ 0 ,
2110+ };
21562111
2157- struct aws_s3_request * request = NULL ;
2112+ const uint32_t num_passes = AWS_ARRAY_SIZE ( pass_flags ) ;
21582113
2159- /* Try to grab the next request from the meta request. */
2160- bool work_remaining = aws_s3_meta_request_update (meta_request , 0 , & request );
2114+ for (uint32_t pass_index = client -> enable_dynamic_connection_scaling ; pass_index < num_passes ; ++ pass_index ) {
21612115
2162- if (work_remaining ) {
2163- /* If there is work remaining, but we didn't get a request back, take the meta request out of the
2164- * list so that we don't use it again during this function, with the intention of putting it back in
2165- * the list before this function ends. */
2166- if (request == NULL ) {
2116+ /**
2117+ * Iterate through the meta requests to update meta requests and get new requests that can then be prepared
2118+ * (reading from any streams, signing, etc.) for sending.
2119+ */
2120+ while (!aws_linked_list_empty (& client -> threaded_data .meta_requests )) {
2121+
2122+ struct aws_linked_list_node * meta_request_node =
2123+ aws_linked_list_begin (& client -> threaded_data .meta_requests );
2124+ struct aws_s3_meta_request * meta_request =
2125+ AWS_CONTAINER_OF (meta_request_node , struct aws_s3_meta_request , client_process_work_threaded_data );
2126+
2127+ if (!s_s3_client_should_update_meta_request (
2128+ client ,
2129+ meta_request ,
2130+ num_requests_in_flight ,
2131+ num_requests_streaming ,
2132+ max_requests_in_flight ,
2133+ max_requests_prepare )) {
2134+
2135+ /* Move the meta request to be processed from next loop. */
21672136 aws_linked_list_remove (& meta_request -> client_process_work_threaded_data .node );
21682137 aws_linked_list_push_back (
21692138 & meta_requests_work_remaining , & meta_request -> client_process_work_threaded_data .node );
2170- } else {
2171- request -> tracked_by_client = true;
2139+ continue ;
2140+ }
21722141
2173- ++ client -> threaded_data .num_requests_being_prepared ;
2174- ++ meta_request -> client_process_work_threaded_data .num_request_being_prepared ;
2142+ struct aws_s3_request * request = NULL ;
21752143
2176- num_requests_in_flight = (uint32_t )aws_atomic_fetch_add (& client -> stats .num_requests_in_flight , 1 ) + 1 ;
2144+ /* Try to grab the next request from the meta request. */
2145+ bool work_remaining = aws_s3_meta_request_update (meta_request , 0 , & request );
21772146
2178- /**
2179- * When upload with streaming, the prepare stage will not read into buffer.
2180- * But it should prevent more requests to be preapred so that the request will not staying in the
2181- * queue to wait for the connection available. Prevents the credentials to be expired during waiting
2182- * for too long.
2183- */
2184- if (meta_request -> fio_opts .should_stream ) {
2185- /* If the request is a streaming request, update the states to track the request that is
2186- * streaming request body. */
2187- aws_atomic_fetch_add (& client -> stats .num_requests_streaming_request_body , 1 );
2188- }
2147+ if (work_remaining ) {
2148+ /* If there is work remaining, but we didn't get a request back, take the meta request out of the
2149+ * list so that we don't use it again during this function, with the intention of putting it back in
2150+ * the list before this function ends. */
2151+ if (request == NULL ) {
2152+ aws_linked_list_remove (& meta_request -> client_process_work_threaded_data .node );
2153+ aws_linked_list_push_back (
2154+ & meta_requests_work_remaining , & meta_request -> client_process_work_threaded_data .node );
2155+ } else {
2156+ request -> tracked_by_client = true;
2157+
2158+ ++ client -> threaded_data .num_requests_being_prepared ;
2159+ ++ meta_request -> client_process_work_threaded_data .num_request_being_prepared ;
2160+
2161+ num_requests_in_flight =
2162+ (uint32_t )aws_atomic_fetch_add (& client -> stats .num_requests_in_flight , 1 ) + 1 ;
2163+
2164+ /**
2165+ * When upload with streaming, the prepare stage will not read into buffer.
2166+ * But it should prevent more requests to be preapred so that the request will not staying in the
2167+ * queue to wait for the connection available. Prevents the credentials to be expired during waiting
2168+ * for too long.
2169+ */
2170+ if (meta_request -> fio_opts .should_stream ) {
2171+ /* If the request is a streaming request, update the states to track the request that is
2172+ * streaming request body. */
2173+ aws_atomic_fetch_add (& client -> stats .num_requests_streaming_request_body , 1 );
2174+ }
21892175
2190- s_acquire_mem_and_prepare_request (client , request , s_s3_client_prepare_callback_queue_request , client );
2176+ s_acquire_mem_and_prepare_request (
2177+ client , request , s_s3_client_prepare_callback_queue_request , client );
2178+ }
2179+ } else {
2180+ s_s3_client_remove_meta_request_threaded (client , meta_request );
21912181 }
2192- } else {
2193- s_s3_client_remove_meta_request_threaded (client , meta_request );
21942182 }
21952183 }
21962184
@@ -2265,7 +2253,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
22652253
22662254 struct aws_s3_request * request = aws_s3_client_dequeue_request_threaded (client );
22672255 struct aws_s3_meta_request * meta_request = request -> meta_request ;
2268-
2256+ const uint32_t max_active_connections = aws_s3_client_get_max_active_connections ( client , meta_request );
22692257 /* As the request removed from the queue. Decrement the preparing track */
22702258 -- meta_request -> client_process_work_threaded_data .num_request_being_prepared ;
22712259 if (request -> is_noop ) {
@@ -2287,7 +2275,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
22872275
22882276 s_s3_client_meta_request_finished_request (client , meta_request , request , AWS_ERROR_S3_CANCELED );
22892277 request = aws_s3_request_release (request );
2290- } else {
2278+ } else if ( client -> enable_dynamic_connection_scaling ) {
22912279 /* Calculate allowed connections for this meta request based on weight ratio */
22922280 bool should_allocate_connection = false;
22932281 uint32_t current_connections = (uint32_t )aws_atomic_load_int (& meta_request -> num_requests_network );
@@ -2322,6 +2310,14 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
23222310 /* Increment the count as we put it back to the queue. */
23232311 ++ meta_request -> client_process_work_threaded_data .num_request_being_prepared ;
23242312 }
2313+ } else if ((uint32_t )aws_atomic_load_int (& meta_request -> num_requests_network ) < max_active_connections ) {
2314+ /* Make sure it's above the max request level limitation. */
2315+ s_s3_client_create_connection_for_request (client , request );
2316+ } else {
2317+ /* Push the request into the left-over list to be used in a future call of this function. */
2318+ aws_linked_list_push_back (& left_over_requests , & request -> node );
2319+ /* Increment the count as we put it back to the queue. */
2320+ ++ meta_request -> client_process_work_threaded_data .num_request_being_prepared ;
23252321 }
23262322 client_max_active_connections = aws_s3_client_get_max_active_connections (client , NULL );
23272323 num_requests_network_io = s_s3_client_get_num_requests_network_io (client , AWS_S3_META_REQUEST_TYPE_MAX );
0 commit comments