@@ -17,6 +17,7 @@ use core::ops::Bound;
1717use core:: pin:: Pin ;
1818use core:: time:: Duration ;
1919use std:: borrow:: Cow ;
20+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2021use std:: sync:: { Arc , Weak } ;
2122
2223use async_trait:: async_trait;
@@ -39,9 +40,9 @@ use nativelink_util::store_trait::{
3940use nativelink_util:: task:: JoinHandleDropGuard ;
4041use parking_lot:: { Mutex , RwLock } ;
4142use patricia_tree:: StringPatriciaMap ;
42- use tokio:: sync:: watch;
43+ use tokio:: sync:: { Semaphore , SemaphorePermit , watch} ;
4344use tokio:: time:: sleep;
44- use tracing:: { error, info, warn} ;
45+ use tracing:: { debug , error, info, warn} ;
4546
4647use crate :: cas_utils:: is_zero_digest;
4748
@@ -63,9 +64,6 @@ const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
6364/// The default command timeout in milliseconds if not specified.
6465const DEFAULT_COMMAND_TIMEOUT_MS : u64 = 10_000 ;
6566
66- /// The default maximum number of concurrent uploads.
67- const DEFAULT_MAX_CONCURRENT_UPLOADS : usize = 10 ;
68-
6967/// The name of the field in `MongoDB` documents that stores the key.
7068const KEY_FIELD : & str = "_id" ;
7169
@@ -102,16 +100,18 @@ pub struct ExperimentalMongoStore {
102100 #[ metric( help = "The amount of data to read from MongoDB at a time" ) ]
103101 read_chunk_size : usize ,
104102
105- /// The maximum number of concurrent uploads.
106- #[ metric( help = "The maximum number of concurrent uploads" ) ]
107- max_concurrent_uploads : usize ,
108-
109103 /// Enable change streams for real-time updates.
110104 #[ metric( help = "Whether change streams are enabled" ) ]
111105 enable_change_streams : bool ,
112106
113107 /// A manager for subscriptions to keys in `MongoDB`.
114108 subscription_manager : Mutex < Option < Arc < ExperimentalMongoSubscriptionManager > > > ,
109+
110+ /// Limits the number of requests at any one time
111+ request_permits : Arc < Semaphore > ,
112+
113+ /// Keep track of the request_permits queue size
114+ waiting_permits : Arc < AtomicUsize > ,
115115}
116116
117117impl ExperimentalMongoStore {
@@ -149,8 +149,13 @@ impl ExperimentalMongoStore {
149149 spec. command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS ;
150150 }
151151
152- if spec. max_concurrent_uploads == 0 {
153- spec. max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS ;
152+ if let Some ( max_permits) = spec. max_request_permits {
153+ if max_permits == 0 {
154+ return Err ( make_err ! (
155+ Code :: InvalidArgument ,
156+ "max_request_permits was set to zero, which will block mongo_store from working at all"
157+ ) ) ;
158+ }
154159 }
155160
156161 // Configure client options
@@ -224,9 +229,12 @@ impl ExperimentalMongoStore {
224229 scheduler_collection,
225230 key_prefix : spec. key_prefix . clone ( ) . unwrap_or_default ( ) ,
226231 read_chunk_size : spec. read_chunk_size ,
227- max_concurrent_uploads : spec. max_concurrent_uploads ,
228232 enable_change_streams : spec. enable_change_streams ,
229233 subscription_manager : Mutex :: new ( None ) ,
234+ request_permits : Arc :: new ( Semaphore :: new (
235+ spec. max_request_permits . unwrap_or ( Semaphore :: MAX_PERMITS ) ,
236+ ) ) ,
237+ waiting_permits : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
230238 } ;
231239
232240 Ok ( Arc :: new ( store) )
@@ -282,6 +290,16 @@ impl ExperimentalMongoStore {
282290 key. strip_prefix ( & self . key_prefix ) . map ( ToString :: to_string)
283291 }
284292 }
293+
294+ async fn acquire_permit ( & self ) -> Result < SemaphorePermit < ' _ > , Error > {
295+ let waiting = self . waiting_permits . fetch_add ( 1 , Ordering :: Relaxed ) ;
296+ if waiting % 100 == 0 {
297+ debug ! ( waiting, "Number of waiting permits for Mongo" ) ;
298+ }
299+ let permit = self . request_permits . acquire ( ) . await ;
300+ self . waiting_permits . fetch_sub ( 1 , Ordering :: Relaxed ) ;
301+ Ok ( permit?)
302+ }
285303}
286304
287305#[ async_trait]
@@ -301,6 +319,11 @@ impl StoreDriver for ExperimentalMongoStore {
301319 let encoded_key = self . encode_key ( key) ;
302320 let filter = doc ! { KEY_FIELD : encoded_key. as_ref( ) } ;
303321
322+ // We could do this with acquire_many, but that's unsafe if the number of keys is greater
323+ // than the number of permits, as it'll block forever. Doing this one at a time is guaranteed
324+ // not to block provided no-one sets permits to 0, and we check for that case at startup.
325+ let semaphore = self . acquire_permit ( ) . await ?;
326+
304327 match self . cas_collection . find_one ( filter) . await {
305328 Ok ( Some ( doc) ) => {
306329 * result = doc. get_i64 ( SIZE_FIELD ) . ok ( ) . map ( |v| v as u64 ) ;
@@ -315,7 +338,9 @@ impl StoreDriver for ExperimentalMongoStore {
315338 ) ) ;
316339 }
317340 }
341+ drop ( semaphore) ;
318342 }
343+
319344 Ok ( ( ) )
320345 }
321346
@@ -370,6 +395,8 @@ impl StoreDriver for ExperimentalMongoStore {
370395 }
371396 }
372397
398+ let semaphore = self . acquire_permit ( ) . await ?;
399+
373400 let mut cursor = self
374401 . cas_collection
375402 . find ( filter)
@@ -394,6 +421,8 @@ impl StoreDriver for ExperimentalMongoStore {
394421 }
395422 }
396423
424+ drop ( semaphore) ;
425+
397426 Ok ( count)
398427 }
399428
@@ -463,6 +492,8 @@ impl StoreDriver for ExperimentalMongoStore {
463492 SIZE_FIELD : size,
464493 } ;
465494
495+ let semaphore = self . acquire_permit ( ) . await ?;
496+
466497 // Upsert the document
467498 self . cas_collection
468499 . update_one (
@@ -473,6 +504,8 @@ impl StoreDriver for ExperimentalMongoStore {
473504 . await
474505 . map_err ( |e| make_err ! ( Code :: Internal , "Failed to update document in MongoDB: {e}" ) ) ?;
475506
507+ drop ( semaphore) ;
508+
476509 Ok ( ( ) )
477510 }
478511
@@ -496,6 +529,8 @@ impl StoreDriver for ExperimentalMongoStore {
496529 let encoded_key = self . encode_key ( & key) ;
497530 let filter = doc ! { KEY_FIELD : encoded_key. as_ref( ) } ;
498531
532+ let semaphore = self . acquire_permit ( ) . await ?;
533+
499534 let doc = self
500535 . cas_collection
501536 . find_one ( filter)
@@ -553,6 +588,8 @@ impl StoreDriver for ExperimentalMongoStore {
553588 }
554589 }
555590
591+ drop ( semaphore) ;
592+
556593 writer. send_eof ( ) . map_err ( |e| {
557594 make_err ! (
558595 Code :: Internal ,
@@ -593,6 +630,8 @@ impl HealthStatusIndicator for ExperimentalMongoStore {
593630 }
594631
595632 async fn check_health ( & self , namespace : Cow < ' static , str > ) -> HealthStatus {
633+ // Note we do not acquire a request_permit here, as the health check needs to always go through
634+ // even if everything else is fully loaded
596635 match self . database . run_command ( doc ! { "ping" : 1 } ) . await {
597636 Ok ( _) => HealthStatus :: new_ok ( self , "Connection healthy" . into ( ) ) ,
598637 Err ( e) => HealthStatus :: new_failed (
@@ -918,7 +957,9 @@ impl SchedulerStore for ExperimentalMongoStore {
918957 VERSION_FIELD : current_version,
919958 } ;
920959
921- match self
960+ let semaphore = self . acquire_permit ( ) . await ?;
961+
962+ let result = match self
922963 . scheduler_collection
923964 . find_one_and_update ( filter, update_doc)
924965 . upsert ( true )
@@ -931,7 +972,9 @@ impl SchedulerStore for ExperimentalMongoStore {
931972 Code :: Internal ,
932973 "MongoDB error in update_data: {e}"
933974 ) ) ,
934- }
975+ } ;
976+ drop ( semaphore) ;
977+ result
935978 } else {
936979 let data_bytes = data. try_into_bytes ( ) . map_err ( |e| {
937980 make_err ! (
@@ -959,6 +1002,8 @@ impl SchedulerStore for ExperimentalMongoStore {
9591002 ) ;
9601003 }
9611004
1005+ let semaphore = self . acquire_permit ( ) . await ?;
1006+
9621007 self . scheduler_collection
9631008 . update_one (
9641009 doc ! { KEY_FIELD : encoded_key. as_ref( ) } ,
@@ -970,6 +1015,8 @@ impl SchedulerStore for ExperimentalMongoStore {
9701015 make_err ! ( Code :: Internal , "Failed to update scheduler document: {e}" )
9711016 } ) ?;
9721017
1018+ drop ( semaphore) ;
1019+
9731020 Ok ( Some ( 0 ) )
9741021 }
9751022 }
0 commit comments