@@ -17,6 +17,7 @@ use core::ops::Bound;
1717use core:: pin:: Pin ;
1818use core:: time:: Duration ;
1919use std:: borrow:: Cow ;
20+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2021use std:: sync:: { Arc , Weak } ;
2122
2223use async_trait:: async_trait;
@@ -39,9 +40,9 @@ use nativelink_util::store_trait::{
3940use nativelink_util:: task:: JoinHandleDropGuard ;
4041use parking_lot:: { Mutex , RwLock } ;
4142use patricia_tree:: StringPatriciaMap ;
42- use tokio:: sync:: watch;
43+ use tokio:: sync:: { Semaphore , SemaphorePermit , watch} ;
4344use tokio:: time:: sleep;
44- use tracing:: { error, info, warn} ;
45+ use tracing:: { debug , error, info, trace , warn} ;
4546
4647use crate :: cas_utils:: is_zero_digest;
4748
@@ -63,9 +64,6 @@ const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
6364/// The default command timeout in milliseconds if not specified.
6465const DEFAULT_COMMAND_TIMEOUT_MS : u64 = 10_000 ;
6566
66- /// The default maximum number of concurrent uploads.
67- const DEFAULT_MAX_CONCURRENT_UPLOADS : usize = 10 ;
68-
6967/// The name of the field in `MongoDB` documents that stores the key.
7068const KEY_FIELD : & str = "_id" ;
7169
@@ -102,16 +100,18 @@ pub struct ExperimentalMongoStore {
102100 #[ metric( help = "The amount of data to read from MongoDB at a time" ) ]
103101 read_chunk_size : usize ,
104102
105- /// The maximum number of concurrent uploads.
106- #[ metric( help = "The maximum number of concurrent uploads" ) ]
107- max_concurrent_uploads : usize ,
108-
109103 /// Enable change streams for real-time updates.
110104 #[ metric( help = "Whether change streams are enabled" ) ]
111105 enable_change_streams : bool ,
112106
113107 /// A manager for subscriptions to keys in `MongoDB`.
114108 subscription_manager : Mutex < Option < Arc < ExperimentalMongoSubscriptionManager > > > ,
109+
110+ /// Limits the number of requests at any one time
111+ request_permits : Arc < Semaphore > ,
112+
113+ /// Keep track of the request_permits queue size
114+ waiting_permits : Arc < AtomicUsize > ,
115115}
116116
117117impl ExperimentalMongoStore {
@@ -149,8 +149,13 @@ impl ExperimentalMongoStore {
149149 spec. command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS ;
150150 }
151151
152- if spec. max_concurrent_uploads == 0 {
153- spec. max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS ;
152+ if let Some ( max_permits) = spec. max_request_permits {
153+ if max_permits == 0 {
154+ return Err ( make_err ! (
155+ Code :: InvalidArgument ,
156+ "max_request_permits was set to zero, which will block mongo_store from working at all"
157+ ) ) ;
158+ }
154159 }
155160
156161 // Configure client options
@@ -224,9 +229,12 @@ impl ExperimentalMongoStore {
224229 scheduler_collection,
225230 key_prefix : spec. key_prefix . clone ( ) . unwrap_or_default ( ) ,
226231 read_chunk_size : spec. read_chunk_size ,
227- max_concurrent_uploads : spec. max_concurrent_uploads ,
228232 enable_change_streams : spec. enable_change_streams ,
229233 subscription_manager : Mutex :: new ( None ) ,
234+ request_permits : Arc :: new ( Semaphore :: new (
235+ spec. max_request_permits . unwrap_or ( Semaphore :: MAX_PERMITS ) ,
236+ ) ) ,
237+ waiting_permits : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
230238 } ;
231239
232240 Ok ( Arc :: new ( store) )
@@ -282,6 +290,19 @@ impl ExperimentalMongoStore {
282290 key. strip_prefix ( & self . key_prefix ) . map ( ToString :: to_string)
283291 }
284292 }
293+
294+ async fn acquire_permit ( & self ) -> Result < SemaphorePermit < ' _ > , Error > {
295+ let waiting = self . waiting_permits . fetch_add ( 1 , Ordering :: Relaxed ) ;
296+
297+ if waiting > 0 && waiting % 100 == 0 {
298+ debug ! ( waiting, "Number of waiting permits for Mongo" ) ;
299+ } else {
300+ trace ! ( waiting, "Number of waiting permits for Mongo" ) ;
301+ }
302+ let permit = self . request_permits . acquire ( ) . await ;
303+ self . waiting_permits . fetch_sub ( 1 , Ordering :: Relaxed ) ;
304+ Ok ( permit?)
305+ }
285306}
286307
287308#[ async_trait]
@@ -301,6 +322,11 @@ impl StoreDriver for ExperimentalMongoStore {
301322 let encoded_key = self . encode_key ( key) ;
302323 let filter = doc ! { KEY_FIELD : encoded_key. as_ref( ) } ;
303324
325+ // We could do this with acquire_many, but that's unsafe if the number of keys is greater
326+ // than the number of permits, as it'll block forever. Doing this one at a time is guaranteed
327+ // not to block provided no-one sets permits to 0, and we check for that case at startup.
328+ let semaphore = self . acquire_permit ( ) . await ?;
329+
304330 match self . cas_collection . find_one ( filter) . await {
305331 Ok ( Some ( doc) ) => {
306332 * result = doc. get_i64 ( SIZE_FIELD ) . ok ( ) . map ( |v| v as u64 ) ;
@@ -315,7 +341,9 @@ impl StoreDriver for ExperimentalMongoStore {
315341 ) ) ;
316342 }
317343 }
344+ drop ( semaphore) ;
318345 }
346+
319347 Ok ( ( ) )
320348 }
321349
@@ -370,6 +398,8 @@ impl StoreDriver for ExperimentalMongoStore {
370398 }
371399 }
372400
401+ let semaphore = self . acquire_permit ( ) . await ?;
402+
373403 let mut cursor = self
374404 . cas_collection
375405 . find ( filter)
@@ -394,6 +424,8 @@ impl StoreDriver for ExperimentalMongoStore {
394424 }
395425 }
396426
427+ drop ( semaphore) ;
428+
397429 Ok ( count)
398430 }
399431
@@ -463,6 +495,8 @@ impl StoreDriver for ExperimentalMongoStore {
463495 SIZE_FIELD : size,
464496 } ;
465497
498+ let semaphore = self . acquire_permit ( ) . await ?;
499+
466500 // Upsert the document
467501 self . cas_collection
468502 . update_one (
@@ -473,6 +507,8 @@ impl StoreDriver for ExperimentalMongoStore {
473507 . await
474508 . map_err ( |e| make_err ! ( Code :: Internal , "Failed to update document in MongoDB: {e}" ) ) ?;
475509
510+ drop ( semaphore) ;
511+
476512 Ok ( ( ) )
477513 }
478514
@@ -496,6 +532,8 @@ impl StoreDriver for ExperimentalMongoStore {
496532 let encoded_key = self . encode_key ( & key) ;
497533 let filter = doc ! { KEY_FIELD : encoded_key. as_ref( ) } ;
498534
535+ let semaphore = self . acquire_permit ( ) . await ?;
536+
499537 let doc = self
500538 . cas_collection
501539 . find_one ( filter)
@@ -553,6 +591,8 @@ impl StoreDriver for ExperimentalMongoStore {
553591 }
554592 }
555593
594+ drop ( semaphore) ;
595+
556596 writer. send_eof ( ) . map_err ( |e| {
557597 make_err ! (
558598 Code :: Internal ,
@@ -593,6 +633,8 @@ impl HealthStatusIndicator for ExperimentalMongoStore {
593633 }
594634
595635 async fn check_health ( & self , namespace : Cow < ' static , str > ) -> HealthStatus {
636+ // Note we do not acquire a request_permit here, as the health check needs to always go through
637+ // even if everything else is fully loaded
596638 match self . database . run_command ( doc ! { "ping" : 1 } ) . await {
597639 Ok ( _) => HealthStatus :: new_ok ( self , "Connection healthy" . into ( ) ) ,
598640 Err ( e) => HealthStatus :: new_failed (
@@ -918,7 +960,9 @@ impl SchedulerStore for ExperimentalMongoStore {
918960 VERSION_FIELD : current_version,
919961 } ;
920962
921- match self
963+ let semaphore = self . acquire_permit ( ) . await ?;
964+
965+ let result = match self
922966 . scheduler_collection
923967 . find_one_and_update ( filter, update_doc)
924968 . upsert ( true )
@@ -931,7 +975,9 @@ impl SchedulerStore for ExperimentalMongoStore {
931975 Code :: Internal ,
932976 "MongoDB error in update_data: {e}"
933977 ) ) ,
934- }
978+ } ;
979+ drop ( semaphore) ;
980+ result
935981 } else {
936982 let data_bytes = data. try_into_bytes ( ) . map_err ( |e| {
937983 make_err ! (
@@ -959,6 +1005,8 @@ impl SchedulerStore for ExperimentalMongoStore {
9591005 ) ;
9601006 }
9611007
1008+ let semaphore = self . acquire_permit ( ) . await ?;
1009+
9621010 self . scheduler_collection
9631011 . update_one (
9641012 doc ! { KEY_FIELD : encoded_key. as_ref( ) } ,
@@ -970,6 +1018,8 @@ impl SchedulerStore for ExperimentalMongoStore {
9701018 make_err ! ( Code :: Internal , "Failed to update scheduler document: {e}" )
9711019 } ) ?;
9721020
1021+ drop ( semaphore) ;
1022+
9731023 Ok ( Some ( 0 ) )
9741024 }
9751025 }
0 commit comments