4444#include <inttypes.h>
4545#include <math.h>
4646
47- #define S_IDEAL_PART_SIZE 8
48- #define S_S3_CLIENT_MINIMUM_CONCURRENT_REQUESTS 8
49-
5047#ifdef _MSC_VER
5148# pragma warning(disable : 4232) /* function pointer to dll symbol */
5249#endif /* _MSC_VER */
@@ -78,10 +75,12 @@ const uint32_t g_min_num_connections = 10; /* Magic value based on: 10 was old b
7875 * be 2500 Gbps. */
7976const uint32_t g_max_num_connections = 10000 ;
8077
81- /* This is a first pass at a token based implementation, the calculations are approximate and can be improved in the
82- * future. The idea is to scale the number of connections we require up and down based on the different requests we
83- * receive and hence dynamically scale the maximum number of connections we need to open. One token is equivalent to
84- * 1Mbps of throughput. */
78+ /*
79+ * This is a token based implementation dor dynamic scaling of connections.
80+ * The calculations are approximate and can be improved in the future. The idea is to scale the number of connections
81+ * we require up and down based on the different requests we receive and hence dynamically scale the maximum number
82+ * of connections we need to open. One token is equivalent to 1Mbps of throughput.
83+ */
8584
8685/* All throughput values are in MBps and provided by S3 team */
8786
@@ -100,10 +99,12 @@ const double s_s3_p50_request_latency_ms = 0.03;
10099// 4ms
101100const double s_s3_express_p50_request_latency_ms = 0.004 ;
102101
103- /* Currently the ideal part size is 8MB and hence the value set.
104- * However, this is subject to change due to newer part sizes and adjustments. */
105-
106- const uint32_t s_s3_minimum_tokens = S_IDEAL_PART_SIZE * 8 * S_S3_CLIENT_MINIMUM_CONCURRENT_REQUESTS ;
102+ /*
103+ * Represents the minimum number of tokens a particular request might use irrespective of payload size or throughput
104+ * achieved. This is required to hard limit the number of connections we open for extremely small request sizes. The
105+ * number below is arbitrary until we come up with more sophisticated math.
106+ */
107+ const uint32_t s_s3_minimum_tokens = 500 ;
107108
108109/**
109110 * Default max part size is 5GiB as the server limit.
@@ -186,8 +187,9 @@ void aws_s3_set_dns_ttl(size_t ttl) {
186187
187188/**
188189 * Determine how many connections are ideal by dividing target-throughput by throughput-per-connection.
189- * TODO: we may consider to alter this, upload to regular s3 can use more connections, and s3 express
190- * can use less connections to reach the target throughput..
190+ * TODO: we have begun altering the calculation behind this. get_ideal_connection_number no longer provides
191+ * the basis for the number of connections we use for a particular meta request. It will soon be removed
192+ * as part of future work towards moving to a dynamic connection allocation implementation.
191193 **/
192194static uint32_t s_get_ideal_connection_number_from_throughput (double throughput_gps ) {
193195 double ideal_connection_count_double = throughput_gps / s_throughput_per_connection_gbps ;
@@ -237,56 +239,25 @@ uint32_t aws_s3_client_get_max_active_connections(
237239}
238240
239241/* Initialize token bucket based on target throughput */
240- void s_s3_client_init_tokens (struct aws_s3_client * client , double target_throughput_gbps ) {
242+ void s_s3_client_init_tokens (struct aws_s3_client * client ) {
241243 AWS_PRECONDITION (client );
244+
242245 aws_atomic_store_int (
243- & client -> token_bucket , aws_max_u32 ((uint32_t )target_throughput_gbps * 1024 , s_s3_minimum_tokens ));
246+ & client -> token_bucket , aws_max_u32 ((uint32_t )client -> throughput_target_gbps * 1024 , s_s3_minimum_tokens ));
244247}
245248
246249/* Releases tokens back after request is complete. */
247250void s_s3_client_release_tokens (struct aws_s3_client * client , struct aws_s3_request * request ) {
248251 AWS_PRECONDITION (client );
249252 AWS_PRECONDITION (request );
250253
251- uint32_t tokens = 0 ;
252-
253- switch (request -> request_type ) {
254- case AWS_S3_REQUEST_TYPE_GET_OBJECT : {
255- if (request -> meta_request -> is_express ) {
256- tokens = aws_min_u32 (
257- (uint32_t )ceil (request -> buffer_size * 8 / (MB_TO_BYTES (1 ) * s_s3_express_p50_request_latency_ms )),
258- s_s3_express_download_throughput_per_connection_mbps );
259- } else {
260- tokens = aws_min_u32 (
261- (uint32_t )ceil (request -> buffer_size * 8 / (MB_TO_BYTES (1 ) * s_s3_p50_request_latency_ms )),
262- s_s3_download_throughput_per_connection_mbps );
263- }
264- break ;
265- }
266- case AWS_S3_REQUEST_TYPE_UPLOAD_PART : {
267- if (request -> meta_request -> is_express ) {
268- tokens = aws_min_u32 (
269- (uint32_t )ceil (request -> buffer_size * 8 / (MB_TO_BYTES (1 ) * s_s3_express_p50_request_latency_ms )),
270- s_s3_express_upload_throughput_per_connection_mbps );
271- } else {
272- tokens = aws_min_u32 (
273- (uint32_t )ceil (request -> buffer_size * 8 / (MB_TO_BYTES (1 ) * s_s3_p50_request_latency_ms )),
274- s_s3_upload_throughput_per_connection_mbps );
275- }
276- break ;
277- }
278- default : {
279- tokens = s_s3_minimum_tokens ;
280- }
281- }
282-
283- // do we need error handling here?
284- aws_atomic_fetch_add (& client -> token_bucket , tokens );
254+ aws_atomic_fetch_add (& client -> token_bucket , request -> tokens_used );
255+ request -> tokens_used = 0 ;
285256}
286257
287- /* Returns true or false based on whether the request was able to avail the required amount of tokens.
288- * TODO: try to introduce a scalability factor instead of using pure latency. */
289- bool s_s3_client_acquire_tokens (struct aws_s3_client * client , struct aws_s3_request * request ) {
258+ /* Checks to ensure we are not violating user configured connection limits althought we are dynamically increasing
259+ * and decreasing connections */
260+ bool s_check_connection_limits (struct aws_s3_client * client , struct aws_s3_request * request ) {
290261 AWS_PRECONDITION (client );
291262 AWS_PRECONDITION (request );
292263
@@ -305,8 +276,33 @@ bool s_s3_client_acquire_tokens(struct aws_s3_client *client, struct aws_s3_requ
305276 return false;
306277 }
307278
279+ return true;
280+ }
281+
282+ /* Returns true or false based on whether the request was able to avail the required amount of tokens.
283+ * TODO: try to introduce a scalability factor instead of using pure latency. */
284+ bool s_s3_client_acquire_tokens (struct aws_s3_client * client , struct aws_s3_request * request ) {
285+ AWS_PRECONDITION (client );
286+ AWS_PRECONDITION (request );
287+
308288 uint32_t required_tokens = 0 ;
309289
290+ /*
291+ * In each of the following cases, we determine the number of tokens required using the following formula, (One
292+ * token is equivalent to attaining 1Mbps of target throughput):-
293+ *
294+ * For each operation (upload/download) and each service (s3/s3express), we have a hardcoded attainable throughput
295+ * per connection value obtained from S3. also the latency involved in the respective services.
296+ *
297+ * The attained throughput per request is atmost the attainable throughput of the respective service-operation or
298+ * when the payload size is small enough the delivery time is neglegible and hence close to payload/latency.
299+ *
300+ * The tokens used is basically the minimum of the two since larger objects end up getting closer to the attainable
301+ * throughput and the smaller object max out at a theoretical limit of what it can attain.
302+ *
303+ * This calculation is an approximation of reality and might continuously improve based on S3 performance.
304+ */
305+
310306 switch (request -> request_type ) {
311307 case AWS_S3_REQUEST_TYPE_GET_OBJECT : {
312308 if (request -> meta_request -> is_express ) {
@@ -340,6 +336,7 @@ bool s_s3_client_acquire_tokens(struct aws_s3_client *client, struct aws_s3_requ
340336 if ((uint32_t )aws_atomic_load_int (& client -> token_bucket ) > required_tokens ) {
341337 // do we need error handling here?
342338 aws_atomic_fetch_sub (& client -> token_bucket , required_tokens );
339+ request -> tokens_used = required_tokens ;
343340 return true;
344341 }
345342 return false;
@@ -560,7 +557,7 @@ struct aws_s3_client *aws_s3_client_new(
560557 * (uint32_t * )& client -> ideal_connection_count = aws_max_u32 (
561558 g_min_num_connections , s_get_ideal_connection_number_from_throughput (client -> throughput_target_gbps ));
562559
563- s_s3_client_init_tokens (client , client -> throughput_target_gbps );
560+ s_s3_client_init_tokens (client );
564561
565562 size_t part_size = (size_t )g_default_part_size_fallback ;
566563 if (client_config -> part_size != 0 ) {
@@ -1967,12 +1964,20 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {
19671964 uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
19681965 num_requests_streaming_response + num_requests_being_prepared +
19691966 client -> threaded_data .request_queue_size ;
1967+
1968+ uint32_t total_tokens = client -> throughput_target_gbps * 1024 ;
1969+
1970+ uint32_t available_tokens = (uint32_t )aws_atomic_load_int (& client -> token_bucket );
1971+
1972+ uint32_t used_tokens = total_tokens - available_tokens ;
1973+
19701974 AWS_LOGF (
19711975 s_log_level_client_stats ,
19721976 AWS_LS_S3_CLIENT_STATS ,
19731977 "id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
19741978 "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d "
19751979 "Requests-streaming-response:%d "
1980+ "Total Tokens: %d, Tokens Available: %d, Tokens Used: %d"
19761981 " Endpoints(in-table/allocated):%d/%d" ,
19771982 (void * )client ,
19781983 total_approx_requests ,
@@ -1985,6 +1990,9 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {
19851990 num_requests_network_io ,
19861991 num_requests_stream_queued_waiting ,
19871992 num_requests_streaming_response ,
1993+ total_tokens ,
1994+ available_tokens ,
1995+ used_tokens ,
19881996 num_endpoints_in_table ,
19891997 num_endpoints_allocated );
19901998 }
@@ -2427,7 +2435,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
24272435
24282436 s_s3_client_meta_request_finished_request (client , meta_request , request , AWS_ERROR_S3_CANCELED );
24292437 request = aws_s3_request_release (request );
2430- } else if (s_s3_client_acquire_tokens (client , request )) {
2438+ } else if (s_check_connection_limits ( client , request ) && s_s3_client_acquire_tokens (client , request )) {
24312439 /* Make sure it's above the max request level limitation. */
24322440 s_s3_client_create_connection_for_request (client , request );
24332441 } else {
0 commit comments