From c943b7431d80dc048d89c3abcaded3096d1948e9 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 17 Jul 2025 11:54:52 +0900 Subject: [PATCH 1/6] Finally introduce sane unified scheduler shutdown --- core/src/banking_stage/decision_maker.rs | 6 +- runtime/src/bank_forks.rs | 15 +++ runtime/src/installed_scheduler_pool.rs | 2 + unified-scheduler-pool/src/lib.rs | 153 +++++++++++++++++++---- 4 files changed, 149 insertions(+), 27 deletions(-) diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index 19e0a674a848cf..9e13b0895fe99c 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -7,7 +7,7 @@ use { solana_pubkey::Pubkey, solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus}, std::{ - sync::{Arc, RwLock}, + sync::{atomic::Ordering::Relaxed, Arc, RwLock}, time::{Duration, Instant}, }, }; @@ -138,7 +138,9 @@ impl DecisionMaker { impl BankingStageMonitor for DecisionMaker { fn status(&mut self) -> BankingStageStatus { - if matches!( + if self.poh_recorder.read().unwrap().is_exited.load(Relaxed) { + BankingStageStatus::Exited + } else if matches!( self.make_consume_or_forward_decision(), BufferedPacketsDecision::Forward, ) { diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 413ef2472214bc..3be11daf2d283d 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -649,6 +649,21 @@ impl ForkGraph for BankForks { } } +impl Drop for BankForks { + fn drop(&mut self) { + info!("BankForks::drop(): started..."); + // It's okay to abruptly drop all banks here albeit BankForks provides panic-happy Index + // impl on them. We're inside the very end of life of it (i.e. the Drop impl block!), + // considering it's Arc-ed elsewhere. + self.banks.clear(); + + if let Some(scheduler_pool) = self.scheduler_pool.take() { + scheduler_pool.uninstalled_from_bank_forks(); + } + info!("BankForks::drop(): ...finished"); + } +} + #[cfg(test)] mod tests { use { diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 3726f1ac500981..9977a6baed2fc1 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -67,6 +67,8 @@ pub trait InstalledSchedulerPool: Send + Sync + Debug { /// timing of scheduler returning to reduce latency of the normal block-verification code-path, /// relying on eventual stale listener clean-up by `solScCleaner`. fn register_timeout_listener(&self, timeout_listener: TimeoutListener); + + fn uninstalled_from_bank_forks(self: Arc); } #[derive(Debug)] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index eae37119739c00..66be396c28c01a 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -130,6 +130,7 @@ pub struct SchedulerPool, TH: TaskHandler> { weak_self: Weak, next_scheduler_id: AtomicSchedulerId, max_usage_queue_count: usize, + cleaner_thread: JoinHandle<()>, _phantom: PhantomData, } @@ -231,6 +232,22 @@ impl HandlerContext { fn banking_stage_helper(&self) -> &BankingStageHelper { self.banking_stage_helper.as_ref().unwrap() } + + fn clone_for_scheduler_thread(&self) -> Self { + let mut context = self.clone(); + if self.banking_stage_helper.is_some() { + context.disable_banking_packet_handler(); + } + context + } + + fn disable_banking_packet_handler(&mut self) { + self.banking_packet_receiver = never(); + self.banking_packet_handler = Box::new(|_, _| { + // This is safe because of the paired use of never() just above. + unreachable!() + }); + } } #[derive(Debug, Clone)] @@ -433,32 +450,20 @@ where max_usage_queue_count: usize, timeout_duration: Duration, ) -> Arc { - let scheduler_pool = Arc::new_cyclic(|weak_self| Self { - scheduler_inners: Mutex::default(), - block_production_scheduler_inner: Mutex::default(), - trashed_scheduler_inners: Mutex::default(), - timeout_listeners: Mutex::default(), - common_handler_context: CommonHandlerContext { - log_messages_bytes_limit, - transaction_status_sender, - replay_vote_sender, - prioritization_fee_cache, - }, - block_verification_handler_count, - banking_stage_handler_context: Mutex::default(), - weak_self: weak_self.clone(), - next_scheduler_id: AtomicSchedulerId::default(), - max_usage_queue_count, - _phantom: PhantomData, - }); + let (scheduler_pool_sender, scheduler_pool_receiver) = crossbeam_channel::bounded(1); - let cleaner_main_loop = { - let weak_scheduler_pool = Arc::downgrade(&scheduler_pool); + let mut exiting = false; + let cleaner_main_loop = move || { + let weak_scheduler_pool: Weak = + scheduler_pool_receiver.into_iter().next().unwrap(); - move || loop { + info!("cleaner_main_loop: started..."); + loop { sleep(pool_cleaner_interval); let Some(scheduler_pool) = weak_scheduler_pool.upgrade() else { + // this is the only safe termination point of cleaner_main_loop while all other + // `break`s being due to poisoned locks. break; }; @@ -490,6 +495,10 @@ where }; let banking_stage_status = scheduler_pool.banking_stage_status(); + if !exiting && matches!(banking_stage_status, Some(BankingStageStatus::Exited)) { + exiting = true; + scheduler_pool.unregister_banking_stage(); + } if matches!(banking_stage_status, Some(BankingStageStatus::Inactive)) { let Ok(mut inner) = scheduler_pool.block_production_scheduler_inner.lock() @@ -595,14 +604,38 @@ where triggered_timeout_listener_count, )); } + info!("cleaner_main_loop: ...finished"); }; - // No need to join; the spawned main loop will gracefully exit. - thread::Builder::new() + let cleaner_thread = thread::Builder::new() .name("solScCleaner".to_owned()) .spawn_tracked(cleaner_main_loop) .unwrap(); + let scheduler_pool = Arc::new_cyclic(|weak_self| Self { + scheduler_inners: Mutex::default(), + block_production_scheduler_inner: Mutex::default(), + trashed_scheduler_inners: Mutex::default(), + timeout_listeners: Mutex::default(), + common_handler_context: CommonHandlerContext { + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + }, + block_verification_handler_count, + banking_stage_handler_context: Mutex::default(), + weak_self: weak_self.clone(), + next_scheduler_id: AtomicSchedulerId::default(), + max_usage_queue_count, + cleaner_thread, + _phantom: PhantomData, + }); + + scheduler_pool_sender + .send(Arc::downgrade(&scheduler_pool)) + .unwrap(); + scheduler_pool } @@ -751,6 +784,21 @@ where ); } + fn unregister_banking_stage(&self) { + let handler_context = &mut self.banking_stage_handler_context.lock().unwrap(); + let handler_context = handler_context.as_mut().unwrap(); + // Replace with dummy ones to unblock validator shutdown. + // Note that replacing banking_stage_handler_context with None altogether will create a + // very short window of race condition due to untimely spawning of block production + // scheduler. + handler_context.banking_packet_receiver = never(); + handler_context.banking_packet_handler = Box::new(|_, _| { + // This is safe because of the paired use of never() just above. + unreachable!() + }); + handler_context.banking_stage_monitor = Box::new(ExitedBankingMonitor); + } + fn banking_stage_status(&self) -> Option { self.banking_stage_handler_context .lock() @@ -884,6 +932,46 @@ where .unwrap() .push((timeout_listener, Instant::now())); } + + fn uninstalled_from_bank_forks(self: Arc) { + info!("SchedulerPool::uninstalled_from_bank_forks(): started..."); + + // Forcibly return back all taken schedulers back to this scheduler pool. + for (listener, _registered_at) in mem::take(&mut *self.timeout_listeners.lock().unwrap()) { + listener.trigger(self.clone()); + } + + // Then, drop all schedulers in the pool. + mem::take(&mut *self.scheduler_inners.lock().unwrap()); + mem::take(&mut *self.block_production_scheduler_inner.lock().unwrap()); + mem::take(&mut *self.trashed_scheduler_inners.lock().unwrap()); + + // At this point, all circular references of this pool has been cut. And there should be + // only 1 strong rerefence unless the cleaner thread is active right now. + + // So, wait a bit to unwrap the pool out of the sinful Arc finally here. Note that we can't resort to the + // Drop impl, because of the need to take the ownership of the join handle of the cleaner + // thread... + let mut this = self; + let this: Self = loop { + match Arc::try_unwrap(this) { + Ok(pool) => { + break pool; + } + Err(that) => { + // It seems solScCleaner is active... retry later + this = that; + sleep(Duration::from_millis(100)); + // Yes, indefinite loop, but the situation isn't so different from the + // following join(), which indefinitely waits as well. + continue; + } + } + }; + this.cleaner_thread.join().unwrap(); + + info!("SchedulerPool::uninstalled_from_bank_forks(): ...finished"); + } } pub trait TaskHandler: Send + Sync + Debug + Sized + 'static { @@ -1828,7 +1916,7 @@ impl, TH: TaskHandler> ThreadManager { // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. let scheduler_main_loop = { - let handler_context = handler_context.clone(); + let handler_context = handler_context.clone_for_scheduler_thread(); let session_result_sender = self.session_result_sender.clone(); // Taking new_task_receiver here is important to ensure there's a single receiver. In // this way, the replay stage will get .send() failures reliably, after this scheduler @@ -2166,8 +2254,13 @@ impl, TH: TaskHandler> ThreadManager { let banking_stage_helper = banking_stage_helper.as_ref().unwrap(); let Ok(banking_packet) = banking_packet else { + // Don't break here; handler threads are expected to outlive its + // associated scheduler thread always. So, disable banking packet + // receiver then continue to be cleaned up properly later, much + // like block verification handler thread. info!("disconnected banking_packet_receiver"); - break; + handler_context.disable_banking_packet_handler(); + continue; }; banking_packet_handler(banking_stage_helper, banking_packet); continue; @@ -2461,12 +2554,22 @@ impl SpawnableScheduler for PooledScheduler { pub enum BankingStageStatus { Active, Inactive, + Exited, } pub trait BankingStageMonitor: Send + Debug { fn status(&mut self) -> BankingStageStatus; } +#[derive(Debug)] +struct ExitedBankingMonitor; + +impl BankingStageMonitor for ExitedBankingMonitor { + fn status(&mut self) -> BankingStageStatus { + BankingStageStatus::Exited + } +} + impl InstalledScheduler for PooledScheduler { fn id(&self) -> SchedulerId { self.inner.id() From a9916d3003f200d6e082e9cb8166080dc9d983d8 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 22 Jul 2025 14:56:37 +0900 Subject: [PATCH 2/6] Avoid lock contentions on poh_recorder --- core/src/banking_stage/decision_maker.rs | 26 +++++++++++++++++---- core/src/banking_stage/unified_scheduler.rs | 4 ++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index 9e13b0895fe99c..aa973a7a231c94 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -7,7 +7,7 @@ use { solana_pubkey::Pubkey, solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus}, std::{ - sync::{atomic::Ordering::Relaxed, Arc, RwLock}, + sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc, RwLock}, time::{Duration, Instant}, }, }; @@ -136,12 +136,30 @@ impl DecisionMaker { } } -impl BankingStageMonitor for DecisionMaker { +#[derive(Debug)] +pub(crate) struct DecisionMakerWrapper { + is_exited: Arc, + decision_maker: DecisionMaker, +} + +impl DecisionMakerWrapper { + pub(crate) fn new(decision_maker: DecisionMaker) -> Self { + // Clone-off before hand to avoid lock contentions. + let is_exited = decision_maker.poh_recorder.read().unwrap().is_exited.clone(); + + Self { + is_exited, + decision_maker, + } + } +} + +impl BankingStageMonitor for DecisionMakerWrapper { fn status(&mut self) -> BankingStageStatus { - if self.poh_recorder.read().unwrap().is_exited.load(Relaxed) { + if self.is_exited.load(Relaxed) { BankingStageStatus::Exited } else if matches!( - self.make_consume_or_forward_decision(), + self.decision_maker.make_consume_or_forward_decision(), BufferedPacketsDecision::Forward, ) { BankingStageStatus::Inactive diff --git a/core/src/banking_stage/unified_scheduler.rs b/core/src/banking_stage/unified_scheduler.rs index 602bc0c061096e..283af6117b6c83 100644 --- a/core/src/banking_stage/unified_scheduler.rs +++ b/core/src/banking_stage/unified_scheduler.rs @@ -30,7 +30,7 @@ use qualifier_attr::qualifiers; use { super::{ - decision_maker::{BufferedPacketsDecision, DecisionMaker}, + decision_maker::{BufferedPacketsDecision, DecisionMaker, DecisionMakerWrapper}, packet_deserializer::PacketDeserializer, LikeClusterInfo, }, @@ -56,7 +56,7 @@ pub(crate) fn ensure_banking_stage_setup( let mut root_bank_cache = RootBankCache::new(bank_forks.clone()); let unified_receiver = channels.unified_receiver().clone(); let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); - let banking_stage_monitor = Box::new(decision_maker.clone()); + let banking_stage_monitor = Box::new(DecisionMakerWrapper::new(decision_maker.clone())); let banking_packet_handler = Box::new( move |helper: &BankingStageHelper, batches: BankingPacketBatch| { From c5b866cde2b77dc098aaab52956f9e9c1510b9df Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 22 Jul 2025 14:57:49 +0900 Subject: [PATCH 3/6] Remove confusing redundant comment --- runtime/src/bank_forks.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 3be11daf2d283d..d0397b03a36351 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -652,9 +652,6 @@ impl ForkGraph for BankForks { impl Drop for BankForks { fn drop(&mut self) { info!("BankForks::drop(): started..."); - // It's okay to abruptly drop all banks here albeit BankForks provides panic-happy Index - // impl on them. We're inside the very end of life of it (i.e. the Drop impl block!), - // considering it's Arc-ed elsewhere. self.banks.clear(); if let Some(scheduler_pool) = self.scheduler_pool.take() { From 5509cde803f9941e4df9f29dbe55e0c0d981c933 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 22 Jul 2025 15:00:55 +0900 Subject: [PATCH 4/6] Provide explicit msg to unreachable!()s --- unified-scheduler-pool/src/lib.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 66be396c28c01a..19335785b41875 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -243,10 +243,8 @@ impl HandlerContext { fn disable_banking_packet_handler(&mut self) { self.banking_packet_receiver = never(); - self.banking_packet_handler = Box::new(|_, _| { - // This is safe because of the paired use of never() just above. - unreachable!() - }); + self.banking_packet_handler = + Box::new(|_, _| unreachable!("paired with never() receiver, this cannot be called")); } } @@ -792,10 +790,8 @@ where // very short window of race condition due to untimely spawning of block production // scheduler. handler_context.banking_packet_receiver = never(); - handler_context.banking_packet_handler = Box::new(|_, _| { - // This is safe because of the paired use of never() just above. - unreachable!() - }); + handler_context.banking_packet_handler = + Box::new(|_, _| unreachable!("paired with never() receiver, this cannot be called")); handler_context.banking_stage_monitor = Box::new(ExitedBankingMonitor); } @@ -830,7 +826,9 @@ where self.block_verification_handler_count, // Return various type-specific no-op values. never(), - Box::new(|_, _| {}), + Box::new(|_, _| { + unreachable!("paired with never() receiver, this cannot be called") + }), None, None, ) From 3e35d9f37dc451d5a4fc517db056130471cde869 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 22 Jul 2025 15:43:30 +0900 Subject: [PATCH 5/6] Minor edits --- unified-scheduler-pool/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 19335785b41875..ccca539f593a83 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -452,10 +452,10 @@ where let mut exiting = false; let cleaner_main_loop = move || { + info!("cleaner_main_loop: started..."); + let weak_scheduler_pool: Weak = scheduler_pool_receiver.into_iter().next().unwrap(); - - info!("cleaner_main_loop: started..."); loop { sleep(pool_cleaner_interval); @@ -2252,11 +2252,11 @@ impl, TH: TaskHandler> ThreadManager { let banking_stage_helper = banking_stage_helper.as_ref().unwrap(); let Ok(banking_packet) = banking_packet else { + info!("disconnected banking_packet_receiver"); // Don't break here; handler threads are expected to outlive its // associated scheduler thread always. So, disable banking packet - // receiver then continue to be cleaned up properly later, much - // like block verification handler thread. - info!("disconnected banking_packet_receiver"); + // handler then continue to be cleaned up properly later, much like + // block verification handler thread. handler_context.disable_banking_packet_handler(); continue; }; From 63dc14cb1a908ebd25105361163222dce49d91b3 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 22 Jul 2025 18:24:57 +0900 Subject: [PATCH 6/6] Improve ci stability with faster joining --- unified-scheduler-pool/src/lib.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index ccca539f593a83..3e2f1bad77e938 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -17,7 +17,9 @@ use qualifier_attr::qualifiers; use { agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, assert_matches::assert_matches, - crossbeam_channel::{self, never, select_biased, Receiver, RecvError, SendError, Sender}, + crossbeam_channel::{ + self, never, select_biased, Receiver, RecvError, RecvTimeoutError, SendError, Sender, + }, dashmap::DashMap, derive_where::derive_where, dyn_clone::{clone_trait_object, DynClone}, @@ -130,6 +132,7 @@ pub struct SchedulerPool, TH: TaskHandler> { weak_self: Weak, next_scheduler_id: AtomicSchedulerId, max_usage_queue_count: usize, + scheduler_pool_sender: Sender>, cleaner_thread: JoinHandle<()>, _phantom: PhantomData, } @@ -454,10 +457,12 @@ where let cleaner_main_loop = move || { info!("cleaner_main_loop: started..."); - let weak_scheduler_pool: Weak = - scheduler_pool_receiver.into_iter().next().unwrap(); + let weak_scheduler_pool: Weak = scheduler_pool_receiver.recv().unwrap(); loop { - sleep(pool_cleaner_interval); + match scheduler_pool_receiver.recv_timeout(pool_cleaner_interval) { + Ok(_) => unreachable!(), + Err(RecvTimeoutError::Disconnected | RecvTimeoutError::Timeout) => (), + } let Some(scheduler_pool) = weak_scheduler_pool.upgrade() else { // this is the only safe termination point of cleaner_main_loop while all other @@ -626,6 +631,7 @@ where weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), max_usage_queue_count, + scheduler_pool_sender: scheduler_pool_sender.clone(), cleaner_thread, _phantom: PhantomData, }); @@ -951,7 +957,7 @@ where // Drop impl, because of the need to take the ownership of the join handle of the cleaner // thread... let mut this = self; - let this: Self = loop { + let mut this: Self = loop { match Arc::try_unwrap(this) { Ok(pool) => { break pool; @@ -966,6 +972,8 @@ where } } }; + // Accelerate cleaner thread joining by disconnection + this.scheduler_pool_sender = crossbeam_channel::bounded(1).0; this.cleaner_thread.join().unwrap(); info!("SchedulerPool::uninstalled_from_bank_forks(): ...finished");