Skip to content

Commit e9cd079

Browse files
authored
Merge pull request #141 from ChrisSon15/fix-flaki-cbf-sync
fix flakiness of syncing CBF.
2 parents 41fab0d + 266cd06 commit e9cd079

1 file changed

Lines changed: 27 additions & 21 deletions

File tree

wallet/src/bmp_wallet.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,6 @@ impl WalletApi for BMPWallet<Connection> {
479479
}
480480

481481
/// Sync the imported keys from protocol using CBF
482-
/// @TODO: use a shared node connection between the different imported keys sync
483482
async fn sync_cbf_imported(
484483
&mut self,
485484
scan_type: ScanType,
@@ -491,53 +490,60 @@ impl WalletApi for BMPWallet<Connection> {
491490
.map(|s| s.base_point_mul().serialize_xonly())
492491
.collect::<Vec<_>>();
493492

494-
let mut req: Option<Requester> = None;
495-
496-
let mut final_imported_balance = Balance::default();
493+
// Build all wallets and light clients upfront, then run all nodes concurrently
494+
// so their peer connections overlap (gossip addresses from one benefit the others).
495+
struct KeyEntry {
496+
db: Connection,
497+
wallet: PersistedWallet<Connection>,
498+
update_subscriber: UpdateSubscriber,
499+
requester: Requester,
500+
}
497501

498-
for key in pubkeys {
502+
let mut entries: Vec<KeyEntry> = Vec::with_capacity(pubkeys.len());
503+
for key in &pubkeys {
499504
let db_path = format!("bmp_{}.db3", key.to_lower_hex_string());
500505
let mut db = Connection::open(db_path)?;
501-
502506
let imported_wallet_opt = Wallet::load()
503507
.check_network(self.wallet.network())
504508
.extract_keys()
505509
.load_wallet(&mut db)?;
506-
let mut imported_wallet = match imported_wallet_opt {
507-
Some(wallet) => wallet,
510+
let wallet = match imported_wallet_opt {
511+
Some(w) => w,
508512
None => {
509513
let descriptor = format!("tr({})", key.to_lower_hex_string());
510514
Wallet::create_single(descriptor)
511515
.network(self.wallet.network())
512516
.create_wallet(&mut db)?
513517
}
514518
};
515-
516519
let LightClient {
517520
requester,
518521
info_subscriber: _,
519522
warning_subscriber: _,
520-
mut update_subscriber,
523+
update_subscriber,
521524
node,
522525
} = Builder::new(self.network())
523526
.add_peers(peers.clone())
524-
.build_with_wallet(&imported_wallet, scan_type)?;
525-
526-
tokio::task::spawn(async move { node.run().await.unwrap() });
527-
let updates = update_subscriber.update().await?;
528-
imported_wallet.apply_update(updates)?;
529-
530-
imported_wallet.persist(&mut db)?;
527+
.build_with_wallet(&wallet, scan_type)?;
528+
tokio::task::spawn(async move { node.run().await });
529+
entries.push(KeyEntry { db, wallet, update_subscriber, requester });
530+
}
531531

532-
final_imported_balance = final_imported_balance + imported_wallet.balance();
532+
// Collect updates from all nodes while they are all running, then shut down together.
533+
let mut final_imported_balance = Balance::default();
534+
for entry in &mut entries {
535+
let updates = entry.update_subscriber.update().await?;
536+
entry.wallet.apply_update(updates)?;
537+
entry.wallet.persist(&mut entry.db)?;
538+
final_imported_balance = final_imported_balance + entry.wallet.balance();
539+
}
533540

534-
req.get_or_insert(requester);
541+
for entry in entries {
542+
let _ = entry.requester.shutdown();
535543
}
536544

537545
self.imported_balance = final_imported_balance;
538546

539-
req.is_some().then(|| req.unwrap().shutdown());
540-
541547
Ok(())
542548
}
543549

0 commit comments

Comments
 (0)