1515use core:: cmp;
1616use core:: ops:: Bound ;
1717use core:: pin:: Pin ;
18+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
1819use core:: time:: Duration ;
1920use std:: borrow:: Cow ;
2021use std:: sync:: { Arc , Weak } ;
@@ -39,9 +40,9 @@ use nativelink_util::store_trait::{
3940use nativelink_util:: task:: JoinHandleDropGuard ;
4041use parking_lot:: { Mutex , RwLock } ;
4142use patricia_tree:: StringPatriciaMap ;
42- use tokio:: sync:: watch;
43+ use tokio:: sync:: { Semaphore , SemaphorePermit , watch} ;
4344use tokio:: time:: sleep;
44- use tracing:: { error, info, warn} ;
45+ use tracing:: { error, info, trace , warn} ;
4546
4647use crate :: cas_utils:: is_zero_digest;
4748
@@ -63,9 +64,6 @@ const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 3000;
6364/// The default command timeout in milliseconds if not specified.
6465const DEFAULT_COMMAND_TIMEOUT_MS : u64 = 10_000 ;
6566
66- /// The default maximum number of concurrent uploads.
67- const DEFAULT_MAX_CONCURRENT_UPLOADS : usize = 10 ;
68-
6967/// The name of the field in `MongoDB` documents that stores the key.
7068const KEY_FIELD : & str = "_id" ;
7169
@@ -102,16 +100,18 @@ pub struct ExperimentalMongoStore {
102100 #[ metric( help = "The amount of data to read from MongoDB at a time" ) ]
103101 read_chunk_size : usize ,
104102
105- /// The maximum number of concurrent uploads.
106- #[ metric( help = "The maximum number of concurrent uploads" ) ]
107- max_concurrent_uploads : usize ,
108-
109103 /// Enable change streams for real-time updates.
110104 #[ metric( help = "Whether change streams are enabled" ) ]
111105 enable_change_streams : bool ,
112106
113107 /// A manager for subscriptions to keys in `MongoDB`.
114108 subscription_manager : Mutex < Option < Arc < ExperimentalMongoSubscriptionManager > > > ,
109+
110+ /// Limits the number of requests at any one time
111+ request_permits : Arc < Semaphore > ,
112+
113+ /// Keep track of the `request_permits` queue size
114+ waiting_permits : Arc < AtomicUsize > ,
115115}
116116
117117impl ExperimentalMongoStore {
@@ -149,8 +149,19 @@ impl ExperimentalMongoStore {
149149 spec. command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS ;
150150 }
151151
152- if spec. max_concurrent_uploads == 0 {
153- spec. max_concurrent_uploads = DEFAULT_MAX_CONCURRENT_UPLOADS ;
152+ if spec. max_concurrent_uploads != 0 {
153+ warn ! (
154+ "max_concurrent_uploads was set for Mongo, and it's a deprecated value we don't use anymore"
155+ ) ;
156+ }
157+
158+ if let Some ( max_permits) = spec. max_requests {
159+ if max_permits == 0 {
160+ return Err ( make_err ! (
161+ Code :: InvalidArgument ,
162+ "max_request_permits was set to zero, which will block mongo_store from working at all"
163+ ) ) ;
164+ }
154165 }
155166
156167 // Configure client options
@@ -224,9 +235,12 @@ impl ExperimentalMongoStore {
224235 scheduler_collection,
225236 key_prefix : spec. key_prefix . clone ( ) . unwrap_or_default ( ) ,
226237 read_chunk_size : spec. read_chunk_size ,
227- max_concurrent_uploads : spec. max_concurrent_uploads ,
228238 enable_change_streams : spec. enable_change_streams ,
229239 subscription_manager : Mutex :: new ( None ) ,
240+ request_permits : Arc :: new ( Semaphore :: new (
241+ spec. max_requests . unwrap_or ( Semaphore :: MAX_PERMITS ) ,
242+ ) ) ,
243+ waiting_permits : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
230244 } ;
231245
232246 Ok ( Arc :: new ( store) )
@@ -282,6 +296,19 @@ impl ExperimentalMongoStore {
282296 key. strip_prefix ( & self . key_prefix ) . map ( ToString :: to_string)
283297 }
284298 }
299+
300+ async fn acquire_permit ( & self ) -> Result < SemaphorePermit < ' _ > , Error > {
301+ let waiting = self . waiting_permits . fetch_add ( 1 , Ordering :: Relaxed ) ;
302+
303+ if waiting > 0 && waiting % 100 == 0 {
304+ info ! ( waiting, "Number of waiting permits for Mongo" ) ;
305+ } else {
306+ trace ! ( waiting, "Number of waiting permits for Mongo" ) ;
307+ }
308+ let permit = self . request_permits . acquire ( ) . await ;
309+ self . waiting_permits . fetch_sub ( 1 , Ordering :: Relaxed ) ;
310+ Ok ( permit?)
311+ }
285312}
286313
287314#[ async_trait]
@@ -301,6 +328,11 @@ impl StoreDriver for ExperimentalMongoStore {
301328 let encoded_key = self . encode_key ( key) ;
302329 let filter = doc ! { KEY_FIELD : encoded_key. as_ref( ) } ;
303330
331+ // We could do this with acquire_many, but that's unsafe if the number of keys is greater
332+ // than the number of permits, as it'll block forever. Doing this one at a time is guaranteed
333+ // not to block provided no-one sets permits to 0, and we check for that case at startup.
334+ let semaphore = self . acquire_permit ( ) . await ?;
335+
304336 match self . cas_collection . find_one ( filter) . await {
305337 Ok ( Some ( doc) ) => {
306338 * result = doc. get_i64 ( SIZE_FIELD ) . ok ( ) . map ( |v| v as u64 ) ;
@@ -315,7 +347,9 @@ impl StoreDriver for ExperimentalMongoStore {
315347 ) ) ;
316348 }
317349 }
350+ drop ( semaphore) ;
318351 }
352+
319353 Ok ( ( ) )
320354 }
321355
@@ -370,6 +404,8 @@ impl StoreDriver for ExperimentalMongoStore {
370404 }
371405 }
372406
407+ let semaphore = self . acquire_permit ( ) . await ?;
408+
373409 let mut cursor = self
374410 . cas_collection
375411 . find ( filter)
@@ -394,6 +430,8 @@ impl StoreDriver for ExperimentalMongoStore {
394430 }
395431 }
396432
433+ drop ( semaphore) ;
434+
397435 Ok ( count)
398436 }
399437
@@ -463,6 +501,8 @@ impl StoreDriver for ExperimentalMongoStore {
463501 SIZE_FIELD : size,
464502 } ;
465503
504+ let semaphore = self . acquire_permit ( ) . await ?;
505+
466506 // Upsert the document
467507 self . cas_collection
468508 . update_one (
@@ -473,6 +513,8 @@ impl StoreDriver for ExperimentalMongoStore {
473513 . await
474514 . map_err ( |e| make_err ! ( Code :: Internal , "Failed to update document in MongoDB: {e}" ) ) ?;
475515
516+ drop ( semaphore) ;
517+
476518 Ok ( ( ) )
477519 }
478520
@@ -496,6 +538,8 @@ impl StoreDriver for ExperimentalMongoStore {
496538 let encoded_key = self . encode_key ( & key) ;
497539 let filter = doc ! { KEY_FIELD : encoded_key. as_ref( ) } ;
498540
541+ let semaphore = self . acquire_permit ( ) . await ?;
542+
499543 let doc = self
500544 . cas_collection
501545 . find_one ( filter)
@@ -553,6 +597,8 @@ impl StoreDriver for ExperimentalMongoStore {
553597 }
554598 }
555599
600+ drop ( semaphore) ;
601+
556602 writer. send_eof ( ) . map_err ( |e| {
557603 make_err ! (
558604 Code :: Internal ,
@@ -593,6 +639,8 @@ impl HealthStatusIndicator for ExperimentalMongoStore {
593639 }
594640
595641 async fn check_health ( & self , namespace : Cow < ' static , str > ) -> HealthStatus {
642+ // Note we do not acquire a request_permit here, as the health check needs to always go through
643+ // even if everything else is fully loaded
596644 match self . database . run_command ( doc ! { "ping" : 1 } ) . await {
597645 Ok ( _) => HealthStatus :: new_ok ( self , "Connection healthy" . into ( ) ) ,
598646 Err ( e) => HealthStatus :: new_failed (
@@ -918,7 +966,9 @@ impl SchedulerStore for ExperimentalMongoStore {
918966 VERSION_FIELD : current_version,
919967 } ;
920968
921- match self
969+ let semaphore = self . acquire_permit ( ) . await ?;
970+
971+ let result = match self
922972 . scheduler_collection
923973 . find_one_and_update ( filter, update_doc)
924974 . upsert ( true )
@@ -931,7 +981,9 @@ impl SchedulerStore for ExperimentalMongoStore {
931981 Code :: Internal ,
932982 "MongoDB error in update_data: {e}"
933983 ) ) ,
934- }
984+ } ;
985+ drop ( semaphore) ;
986+ result
935987 } else {
936988 let data_bytes = data. try_into_bytes ( ) . map_err ( |e| {
937989 make_err ! (
@@ -959,6 +1011,8 @@ impl SchedulerStore for ExperimentalMongoStore {
9591011 ) ;
9601012 }
9611013
1014+ let semaphore = self . acquire_permit ( ) . await ?;
1015+
9621016 self . scheduler_collection
9631017 . update_one (
9641018 doc ! { KEY_FIELD : encoded_key. as_ref( ) } ,
@@ -970,6 +1024,8 @@ impl SchedulerStore for ExperimentalMongoStore {
9701024 make_err ! ( Code :: Internal , "Failed to update scheduler document: {e}" )
9711025 } ) ?;
9721026
1027+ drop ( semaphore) ;
1028+
9731029 Ok ( Some ( 0 ) )
9741030 }
9751031 }
0 commit comments