77
88use std:: collections:: { BTreeMap , HashMap } ;
99use std:: net:: SocketAddr ;
10- use std:: sync:: atomic:: { AtomicBool , AtomicU32 , Ordering } ;
10+ use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
1111use std:: sync:: { Arc , Mutex , RwLock } ;
1212use std:: time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ;
1313
@@ -43,6 +43,11 @@ const MIN_FEERATE_SAT_PER_KWU: u64 = 250;
4343/// Number of recent blocks to look back for per-target fee rate estimation.
4444const FEE_RATE_LOOKBACK_BLOCKS : usize = 6 ;
4545
46+ /// Number of blocks to walk back from a component's persisted best block height
47+ /// for reorg safety when computing the incremental scan skip height.
48+ /// Matches bdk-kyoto's `IMPOSSIBLE_REORG_DEPTH`.
49+ const REORG_SAFETY_BLOCKS : u32 = 7 ;
50+
4651/// The fee estimation back-end used by the CBF chain source.
4752enum FeeSource {
4853 /// Derive fee rates from the coinbase reward of recent blocks.
@@ -82,12 +87,6 @@ pub(super) struct CbfChainSource {
8287 scan_lock : tokio:: sync:: Mutex < ( ) > ,
8388 /// Scripts registered by LDK's Filter trait for lightning channel monitoring.
8489 registered_scripts : Mutex < Vec < ScriptBuf > > ,
85- /// Set when new scripts are registered; forces a full rescan on next lightning sync.
86- lightning_scripts_dirty : AtomicBool ,
87- /// Last block height reached by on-chain wallet sync, used for incremental scans.
88- last_onchain_synced_height : Mutex < Option < u32 > > ,
89- /// Last block height reached by lightning wallet sync, used for incremental scans.
90- last_lightning_synced_height : Mutex < Option < u32 > > ,
9190 /// Deduplicates concurrent on-chain wallet sync requests.
9291 onchain_wallet_sync_status : Mutex < WalletSyncStatus > ,
9392 /// Deduplicates concurrent lightning wallet sync requests.
@@ -146,10 +145,7 @@ impl CbfChainSource {
146145 let sync_completion_tx = Arc :: new ( Mutex :: new ( None ) ) ;
147146 let filter_skip_height = Arc :: new ( AtomicU32 :: new ( 0 ) ) ;
148147 let registered_scripts = Mutex :: new ( Vec :: new ( ) ) ;
149- let lightning_scripts_dirty = AtomicBool :: new ( true ) ;
150148 let scan_lock = tokio:: sync:: Mutex :: new ( ( ) ) ;
151- let last_onchain_synced_height = Mutex :: new ( None ) ;
152- let last_lightning_synced_height = Mutex :: new ( None ) ;
153149 let onchain_wallet_sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
154150 let lightning_wallet_sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
155151 Ok ( Self {
@@ -163,10 +159,7 @@ impl CbfChainSource {
163159 sync_completion_tx,
164160 filter_skip_height,
165161 registered_scripts,
166- lightning_scripts_dirty,
167162 scan_lock,
168- last_onchain_synced_height,
169- last_lightning_synced_height,
170163 onchain_wallet_sync_status,
171164 lightning_wallet_sync_status,
172165 fee_estimator,
@@ -309,6 +302,10 @@ impl CbfChainSource {
309302 reorganized. len( ) ,
310303 accepted. len( ) ,
311304 ) ;
305+
306+ // No height reset needed: skip heights are derived from
307+ // BDK's checkpoint (on-chain) and LDK's best block
308+ // (lightning), both walked back by REORG_SAFETY_BLOCKS.
312309 } ,
313310 BlockHeaderChanges :: Connected ( header) => {
314311 log_trace ! ( logger, "CBF block connected at height {}" , header. height, ) ;
@@ -353,13 +350,11 @@ impl CbfChainSource {
353350 /// Register a transaction script for Lightning channel monitoring.
354351 pub ( crate ) fn register_tx ( & self , _txid : & Txid , script_pubkey : & Script ) {
355352 self . registered_scripts . lock ( ) . unwrap ( ) . push ( script_pubkey. to_owned ( ) ) ;
356- self . lightning_scripts_dirty . store ( true , Ordering :: Release ) ;
357353 }
358354
359355 /// Register a watched output script for Lightning channel monitoring.
360356 pub ( crate ) fn register_output ( & self , output : WatchedOutput ) {
361357 self . registered_scripts . lock ( ) . unwrap ( ) . push ( output. script_pubkey . clone ( ) ) ;
362- self . lightning_scripts_dirty . store ( true , Ordering :: Release ) ;
363358 }
364359
365360 /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for
@@ -429,7 +424,7 @@ impl CbfChainSource {
429424 Duration :: from_secs (
430425 self . sync_config . timeouts_config . onchain_wallet_sync_timeout_secs ,
431426 ) ,
432- self . sync_onchain_wallet_op ( requester, scripts) ,
427+ self . sync_onchain_wallet_op ( requester, & onchain_wallet , scripts) ,
433428 ) ;
434429
435430 let ( tx_update, sync_update) = match timeout_fut. await {
@@ -484,12 +479,11 @@ impl CbfChainSource {
484479 }
485480
486481 async fn sync_onchain_wallet_op (
487- & self , requester : Requester , scripts : Vec < ScriptBuf > ,
482+ & self , requester : Requester , onchain_wallet : & Wallet , scripts : Vec < ScriptBuf > ,
488483 ) -> Result < ( TxUpdate < ConfirmationBlockTime > , SyncUpdate ) , Error > {
489- // Always do a full scan (skip_height=None) for the on-chain wallet.
490- // Unlike the Lightning wallet which can rely on reorg_queue events,
491- // the on-chain wallet needs to see all blocks to correctly detect
492- // reorgs via checkpoint comparison in the caller.
484+ // Derive skip height from BDK's persisted checkpoint, walked back by
485+ // REORG_SAFETY_BLOCKS for reorg safety (same approach as bdk-kyoto).
486+ // This survives restarts since BDK persists its checkpoint chain.
493487 //
494488 // We include LDK-registered scripts (e.g., channel funding output
495489 // scripts) alongside the wallet scripts. This ensures the on-chain
@@ -501,9 +495,10 @@ impl CbfChainSource {
501495 // unknown. This mirrors what the Bitcoind chain source does in
502496 // `Wallet::block_connected` by inserting registered tx outputs.
503497 let mut all_scripts = scripts;
504- // we query all registered scripts, not only BDK-related
505498 all_scripts. extend ( self . registered_scripts . lock ( ) . unwrap ( ) . iter ( ) . cloned ( ) ) ;
506- let ( sync_update, matched) = self . run_filter_scan ( all_scripts, None ) . await ?;
499+ let skip_height =
500+ onchain_wallet. latest_checkpoint ( ) . height ( ) . checked_sub ( REORG_SAFETY_BLOCKS ) ;
501+ let ( sync_update, matched) = self . run_filter_scan ( all_scripts, skip_height) . await ?;
507502
508503 log_debug ! (
509504 self . logger,
@@ -532,9 +527,6 @@ impl CbfChainSource {
532527 }
533528 }
534529
535- let tip = sync_update. tip ( ) ;
536- * self . last_onchain_synced_height . lock ( ) . unwrap ( ) = Some ( tip. height ) ;
537-
538530 Ok ( ( tx_update, sync_update) )
539531 }
540532
@@ -615,9 +607,8 @@ impl CbfChainSource {
615607 & self , requester : Requester , channel_manager : Arc < ChannelManager > ,
616608 chain_monitor : Arc < ChainMonitor > , output_sweeper : Arc < Sweeper > , scripts : Vec < ScriptBuf > ,
617609 ) -> Result < ( ) , Error > {
618- let scripts_dirty = self . lightning_scripts_dirty . load ( Ordering :: Acquire ) ;
619610 let skip_height =
620- if scripts_dirty { None } else { * self . last_lightning_synced_height . lock ( ) . unwrap ( ) } ;
611+ channel_manager . current_best_block ( ) . height . checked_sub ( REORG_SAFETY_BLOCKS ) ;
621612 let ( sync_update, matched) = self . run_filter_scan ( scripts, skip_height) . await ?;
622613
623614 log_debug ! (
@@ -652,9 +643,6 @@ impl CbfChainSource {
652643 }
653644 }
654645
655- * self . last_lightning_synced_height . lock ( ) . unwrap ( ) = Some ( tip. height ) ;
656- self . lightning_scripts_dirty . store ( false , Ordering :: Release ) ;
657-
658646 Ok ( ( ) )
659647 }
660648
0 commit comments