Skip to content

Commit 1f3cfc7

Browse files
committed
Implement sync_onchain_wallet for ChainSource::Electrum
1 parent 43a26dd commit 1f3cfc7

File tree

3 files changed

+166
-3
lines changed

3 files changed

+166
-3
lines changed

src/chain/electrum.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
// accordance with one or both of these licenses.
77

88
use crate::config::{
9-
Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
10-
TX_BROADCAST_TIMEOUT_SECS,
9+
Config, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS,
10+
LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
1111
};
1212
use crate::error::Error;
1313
use crate::fee_estimator::{
@@ -20,6 +20,12 @@ use lightning::chain::{Confirm, Filter, WatchedOutput};
2020
use lightning::util::ser::Writeable;
2121
use lightning_transaction_sync::ElectrumSyncClient;
2222

23+
use bdk_chain::bdk_core::spk_client::FullScanRequest as BdkFullScanRequest;
24+
use bdk_chain::bdk_core::spk_client::FullScanResponse as BdkFullScanResponse;
25+
use bdk_chain::bdk_core::spk_client::SyncRequest as BdkSyncRequest;
26+
use bdk_chain::bdk_core::spk_client::SyncResponse as BdkSyncResponse;
27+
use bdk_wallet::KeychainKind as BdkKeyChainKind;
28+
2329
use bdk_electrum::BdkElectrumClient;
2430

2531
use electrum_client::{Batch, Client as ElectrumClient, ElectrumApi};
@@ -30,6 +36,8 @@ use std::collections::HashMap;
3036
use std::sync::Arc;
3137
use std::time::{Duration, Instant};
3238

39+
const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5;
40+
3341
pub(crate) struct ElectrumRuntimeClient {
3442
electrum_client: Arc<ElectrumClient>,
3543
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
@@ -96,6 +104,69 @@ impl ElectrumRuntimeClient {
96104
Ok(res)
97105
}
98106

107+
pub(crate) async fn get_full_scan_wallet_update(
108+
&self, request: BdkFullScanRequest<BdkKeyChainKind>,
109+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
110+
) -> Result<BdkFullScanResponse<BdkKeyChainKind>, Error> {
111+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
112+
bdk_electrum_client.populate_tx_cache(cached_txs);
113+
114+
let spawn_fut = self.runtime.spawn_blocking(move || {
115+
bdk_electrum_client.full_scan(
116+
request,
117+
BDK_CLIENT_STOP_GAP,
118+
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
119+
true,
120+
)
121+
});
122+
let wallet_sync_timeout_fut =
123+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
124+
125+
wallet_sync_timeout_fut
126+
.await
127+
.map_err(|e| {
128+
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
129+
Error::WalletOperationTimeout
130+
})?
131+
.map_err(|e| {
132+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
133+
Error::WalletOperationFailed
134+
})?
135+
.map_err(|e| {
136+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
137+
Error::WalletOperationFailed
138+
})
139+
}
140+
141+
pub(crate) async fn get_incremental_sync_wallet_update(
142+
&self, request: BdkSyncRequest<(BdkKeyChainKind, u32)>,
143+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
144+
) -> Result<BdkSyncResponse, Error> {
145+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
146+
bdk_electrum_client.populate_tx_cache(cached_txs);
147+
148+
let spawn_fut = self.runtime.spawn_blocking(move || {
149+
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
150+
});
151+
let wallet_sync_timeout_fut =
152+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
153+
154+
wallet_sync_timeout_fut
155+
.await
156+
.map_err(|e| {
157+
log_error!(self.logger, "Incremental sync of on-chain wallet timed out: {}", e);
158+
Error::WalletOperationTimeout
159+
})?
160+
.map_err(|e| {
161+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
162+
Error::WalletOperationFailed
163+
})?
164+
.map_err(|e| {
165+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
166+
Error::WalletOperationFailed
167+
})
168+
}
169+
99170
pub(crate) async fn broadcast(&self, tx: Transaction) {
100171
let electrum_client = Arc::clone(&self.electrum_client);
101172

src/chain/mod.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,95 @@ impl ChainSource {
640640

641641
res
642642
},
643-
Self::Electrum { .. } => todo!(),
643+
Self::Electrum {
644+
electrum_runtime_client,
645+
onchain_wallet,
646+
onchain_wallet_sync_status,
647+
kv_store,
648+
logger,
649+
node_metrics,
650+
..
651+
} => {
652+
let electrum_client: Arc<ElectrumRuntimeClient> =
653+
if let Some(client) = electrum_runtime_client.read().unwrap().as_ref() {
654+
Arc::clone(client)
655+
} else {
656+
debug_assert!(
657+
false,
658+
"We should have started the chain source before syncing the onchain wallet"
659+
);
660+
return Err(Error::FeerateEstimationUpdateFailed);
661+
};
662+
let receiver_res = {
663+
let mut status_lock = onchain_wallet_sync_status.lock().unwrap();
664+
status_lock.register_or_subscribe_pending_sync()
665+
};
666+
if let Some(mut sync_receiver) = receiver_res {
667+
log_info!(logger, "Sync in progress, skipping.");
668+
return sync_receiver.recv().await.map_err(|e| {
669+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
670+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
671+
Error::WalletOperationFailed
672+
})?;
673+
}
674+
675+
// If this is our first sync, do a full scan with the configured gap limit.
676+
// Otherwise just do an incremental sync.
677+
let incremental_sync =
678+
node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
679+
680+
macro_rules! get_and_apply_wallet_update {
681+
($sync_future: expr) => {{
682+
let now = Instant::now();
683+
let update = $sync_future.await?;
684+
685+
match onchain_wallet.apply_update(update) {
686+
Ok(()) => {
687+
log_info!(
688+
logger,
689+
"{} of on-chain wallet finished in {}ms.",
690+
if incremental_sync { "Incremental sync" } else { "Sync" },
691+
now.elapsed().as_millis()
692+
);
693+
let unix_time_secs_opt = SystemTime::now()
694+
.duration_since(UNIX_EPOCH)
695+
.ok()
696+
.map(|d| d.as_secs());
697+
{
698+
let mut locked_node_metrics = node_metrics.write().unwrap();
699+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
700+
unix_time_secs_opt;
701+
write_node_metrics(
702+
&*locked_node_metrics,
703+
Arc::clone(&kv_store),
704+
Arc::clone(&logger),
705+
)?;
706+
}
707+
Ok(())
708+
},
709+
Err(e) => Err(e),
710+
}
711+
}};
712+
}
713+
714+
let cached_txs = onchain_wallet.get_cached_txs();
715+
716+
let res = if incremental_sync {
717+
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
718+
let incremental_sync_fut = electrum_client
719+
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs);
720+
get_and_apply_wallet_update!(incremental_sync_fut)
721+
} else {
722+
let full_scan_request = onchain_wallet.get_full_scan_request();
723+
let full_scan_fut =
724+
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs);
725+
get_and_apply_wallet_update!(full_scan_fut)
726+
};
727+
728+
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
729+
730+
res
731+
},
644732
Self::BitcoindRpc { .. } => {
645733
// In BitcoindRpc mode we sync lightning and onchain wallet in one go by via
646734
// `ChainPoller`. So nothing to do here.

src/wallet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ where
9898
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
9999
}
100100

101+
pub(crate) fn get_cached_txs(&self) -> Vec<Arc<Transaction>> {
102+
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
103+
}
104+
101105
pub(crate) fn current_best_block(&self) -> BestBlock {
102106
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
103107
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }

0 commit comments

Comments
 (0)