@@ -75,6 +75,36 @@ const uint32_t g_min_num_connections = 10; /* Magic value based on: 10 was old b
7575 * be 2500 Gbps. */
7676const uint32_t g_max_num_connections = 10000 ;
7777
78+ /* This is a first pass at a token based implementation, the calculations are approximate and can be improved in the
79+ * future. The idea is to scale the number of connections we require up and down based on the different requests we
80+ * receive and hence dynamically scale the maximum number of connections we need to open. One token is equivalent to
81+ * 1Mbps of throughput. */
82+
83+ /* All throughput values are in MBps and provided by S3 team */
84+
85+ // 90 MBps
86+ const uint32_t s_s3_download_throughput_per_connection_mbps = 90 * 8 ;
87+ // 20 MBps
88+ const uint32_t s_s3_upload_throughput_per_connection_mbps = 20 * 8 ;
89+ // 150 MBps
90+ const uint32_t s_s3_express_download_throughput_per_connection_mbps = 150 * 8 ;
91+ // 100 MBps
92+ const uint32_t s_s3_express_upload_throughput_per_connection_mbps = 100 * 8 ;
93+
94+ /* All latency values are in milliseconds (ms) and provided by S3 team */
95+ // 30ms
96+ const uint32_t s_s3_p50_request_latency_ms = 30 ;
97+ // 4ms
98+ const uint32_t s_s3_express_p50_request_latency_ms = 4 ;
99+
100+ const uint32_t s_s3_client_minimum_concurrent_requests = 8 ;
101+
102+ /* Currently the ideal part size is 8MB and hence the value set.
103+ * However, this is subject to change due to newer part sizes and adjustments. */
104+ const uint32_t s_ideal_part_size = 8 * 8 ;
105+
106+ const uint32_t s_s3_minimum_tokens = s_ideal_part_size * 8 * s_s3_client_minimum_concurrent_requests ;
107+
78108/**
79109 * Default max part size is 5GiB as the server limit.
80110 */
@@ -206,6 +236,88 @@ uint32_t aws_s3_client_get_max_active_connections(
206236 return max_active_connections ;
207237}
208238
239+ /* Initialize token bucket based on target throughput */
240+ void aws_s3_client_init_tokens (struct aws_s3_client * client , double target_throughput_gbps ) {
241+ AWS_PRECONDITION (client );
242+ if (target_throughput_gbps == 0.0 ) target_throughput_gbps = 150.0 ;
243+ aws_atomic_store_int (& client -> token_bucket , aws_max_u32 (target_throughput_gbps * 1024 , s_s3_minimum_tokens ));
244+ }
245+
246+ /* Releases tokens back after request is complete. */
247+ void aws_s3_client_release_tokens (
248+ struct aws_s3_client * client ,
249+ struct aws_s3_request * request ) {
250+ AWS_PRECONDITION (client );
251+ AWS_PRECONDITION (request );
252+
253+ uint32_t tokens = 0 ;
254+
255+ switch (request -> request_type ) {
256+ case AWS_S3_REQUEST_TYPE_GET_OBJECT : {
257+ if (request -> meta_request -> is_express ) {
258+ tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_express_p50_request_latency_ms ), s_s3_express_download_throughput_per_connection_mbps );
259+ } else {
260+ tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_p50_request_latency_ms ), s_s3_download_throughput_per_connection_mbps );
261+ }
262+ break ;
263+ }
264+ case AWS_S3_REQUEST_TYPE_PUT_OBJECT : {
265+ if (request -> meta_request -> is_express ) {
266+ tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_express_p50_request_latency_ms ), s_s3_express_upload_throughput_per_connection_mbps );
267+ } else {
268+ tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_p50_request_latency_ms ), s_s3_upload_throughput_per_connection_mbps );
269+ }
270+ break ;
271+ }
272+ default : {
273+ tokens = g_default_min_tokens ;
274+ }
275+ }
276+
277+ // do we need error handling here?
278+ aws_atomic_fetch_add (client -> token_bucket , tokens );
279+ }
280+
281+ /* Returns true or false based on whether the request was able to avail the required amount of tokens.
282+ * TODO: try to introduce a scalability factor instead of using pure latency. */
283+ bool aws_s3_client_acquire_tokens (
284+ struct aws_s3_client * client ,
285+ struct aws_s3_request * request ) {
286+ AWS_PRECONDITION (client );
287+ AWS_PRECONDITION (request );
288+
289+ uint32_t required_tokens = 0 ;
290+
291+ switch (request -> request_type ) {
292+ case AWS_S3_REQUEST_TYPE_GET_OBJECT : {
293+ if (request -> meta_request -> is_express ) {
294+ required_tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_express_p50_request_latency_ms ), s_s3_express_download_throughput_per_connection_mbps );
295+ } else {
296+ required_tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_p50_request_latency_ms ), s_s3_download_throughput_per_connection_mbps );
297+ }
298+ break ;
299+ }
300+ case AWS_S3_REQUEST_TYPE_PUT_OBJECT : {
301+ if (request -> meta_request -> is_express ) {
302+ required_tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_express_p50_request_latency_ms ), s_s3_express_upload_throughput_per_connection_mbps );
303+ } else {
304+ required_tokens = aws_min_u32 (ceil ((request -> buffer_size * 1000 ) / s_s3_p50_request_latency_ms ), s_s3_upload_throughput_per_connection_mbps );
305+ }
306+ break ;
307+ }
308+ default : {
309+ required_tokens = g_default_min_tokens ;
310+ }
311+ }
312+
313+ if ((uint32_t * ) aws_atomic_load_int (& client -> token_bucket ) > required_tokens ) {
314+ // do we need error handling here?
315+ aws_atomic_fetch_sub (client -> token_bucket , required_tokens );
316+ return true;
317+ }
318+ return false;
319+ }
320+
209321/* Returns the max number of requests allowed to be in memory */
210322uint32_t aws_s3_client_get_max_requests_in_flight (struct aws_s3_client * client ) {
211323 AWS_PRECONDITION (client );
@@ -2286,7 +2398,7 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
22862398
22872399 s_s3_client_meta_request_finished_request (client , meta_request , request , AWS_ERROR_S3_CANCELED );
22882400 request = aws_s3_request_release (request );
2289- } else if (( uint32_t ) aws_atomic_load_int ( & meta_request -> num_requests_network ) < max_active_connections ) {
2401+ } else if (aws_s3_avail_tokens ( client , request ) ) {
22902402 /* Make sure it's above the max request level limitation. */
22912403 s_s3_client_create_connection_for_request (client , request );
22922404 } else {
0 commit comments