Skip to content

feat(electrum): optimize merkle proof validation with batching #1957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 167 additions & 26 deletions crates/electrum/src/bdk_electrum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ use bdk_core::{
},
BlockId, CheckPoint, ConfirmationBlockTime, TxUpdate,
};
use electrum_client::{ElectrumApi, Error, HeaderNotification};
use electrum_client::{ElectrumApi, Error, GetMerkleRes, HeaderNotification};
use std::sync::{Arc, Mutex};

/// We include a chain suffix of a certain length for the purpose of robustness.
const CHAIN_SUFFIX_LENGTH: u32 = 8;

/// Maximum batch size for Merkle proof requests
const MAX_MERKLE_BATCH_SIZE: usize = 100;

/// Wrapper around an [`electrum_client::ElectrumApi`] which includes an internal in-memory
/// transaction cache to avoid re-fetching already downloaded transactions.
#[derive(Debug)]
Expand All @@ -22,6 +25,8 @@ pub struct BdkElectrumClient<E> {
tx_cache: Mutex<HashMap<Txid, Arc<Transaction>>>,
/// The header cache
block_header_cache: Mutex<HashMap<u32, Header>>,
/// The Merkle proof cache
merkle_cache: Mutex<HashMap<(Txid, BlockHash), GetMerkleRes>>,
Comment on lines +28 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be more efficient if we cache anchors instead of GetMerkleRes here.

}

impl<E: ElectrumApi> BdkElectrumClient<E> {
Expand All @@ -31,6 +36,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
inner: client,
tx_cache: Default::default(),
block_header_cache: Default::default(),
merkle_cache: Default::default(),
}
}

Expand Down Expand Up @@ -254,13 +260,14 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
) -> Result<Option<u32>, Error> {
let mut unused_spk_count = 0_usize;
let mut last_active_index = Option::<u32>::None;
let mut txs_to_validate = Vec::new();

loop {
let spks = (0..batch_size)
.map_while(|_| spks_with_expected_txids.next())
.collect::<Vec<_>>();
if spks.is_empty() {
return Ok(last_active_index);
break;
}

let spk_histories = self
Expand All @@ -271,7 +278,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
if spk_history.is_empty() {
unused_spk_count = unused_spk_count.saturating_add(1);
if unused_spk_count >= stop_gap {
return Ok(last_active_index);
break;
}
} else {
last_active_index = Some(spk_index);
Expand All @@ -294,15 +301,27 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
match tx_res.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
self.validate_merkle_for_anchor(tx_update, tx_res.tx_hash, height)?;
txs_to_validate.push((tx_res.tx_hash, height));
}
_ => {
tx_update.seen_ats.insert((tx_res.tx_hash, start_time));
}
}
}
}

if unused_spk_count >= stop_gap {
break;
}
}

// Batch validate all collected transactions.
if !txs_to_validate.is_empty() {
let proofs = self.batch_fetch_merkle_proofs(&txs_to_validate)?;
self.batch_validate_merkle_proofs(tx_update, proofs)?;
}
Comment on lines +318 to 322
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having every populate_with_{} method call this internally, it will be more efficient and make more logical sense if we extract this so that we only call it at the end of full_scan and sync.

In other words, populate_with_{} should no longer fetch anchors. Instead, they should either mutate, or return a list of (Txid, BlockId) for which we try to fetch anchors for in a separate step.

It will be even better if full txs are fetched in a separate step too.


Ok(last_active_index)
}

/// Populate the `tx_update` with associated transactions/anchors of `outpoints`.
Expand All @@ -315,6 +334,8 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
outpoints: impl IntoIterator<Item = OutPoint>,
) -> Result<(), Error> {
let mut txs_to_validate = Vec::new();

for outpoint in outpoints {
let op_txid = outpoint.txid;
let op_tx = self.fetch_tx(op_txid)?;
Expand All @@ -324,7 +345,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
};
debug_assert_eq!(op_tx.compute_txid(), op_txid);

// attempt to find the following transactions (alongside their chain positions), and
// Attempt to find the following transactions (alongside their chain positions), and
// add to our sparsechain `update`:
let mut has_residing = false; // tx in which the outpoint resides
let mut has_spending = false; // tx that spends the outpoint
Expand All @@ -339,7 +360,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
match res.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
txs_to_validate.push((res.tx_hash, height));
}
_ => {
tx_update.seen_ats.insert((res.tx_hash, start_time));
Expand All @@ -349,7 +370,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {

if !has_spending && res.tx_hash != op_txid {
let res_tx = self.fetch_tx(res.tx_hash)?;
// we exclude txs/anchors that do not spend our specified outpoint(s)
// We exclude txs/anchors that do not spend our specified outpoint(s).
has_spending = res_tx
.input
.iter()
Expand All @@ -361,7 +382,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
match res.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
self.validate_merkle_for_anchor(tx_update, res.tx_hash, height)?;
txs_to_validate.push((res.tx_hash, height));
}
_ => {
tx_update.seen_ats.insert((res.tx_hash, start_time));
Expand All @@ -370,6 +391,13 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
}
}
}

// Batch validate all collected transactions.
if !txs_to_validate.is_empty() {
let proofs = self.batch_fetch_merkle_proofs(&txs_to_validate)?;
self.batch_validate_merkle_proofs(tx_update, proofs)?;
}

Ok(())
}

Expand All @@ -380,6 +408,8 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
txids: impl IntoIterator<Item = Txid>,
) -> Result<(), Error> {
let mut txs_to_validate = Vec::new();

for txid in txids {
let tx = match self.fetch_tx(txid) {
Ok(tx) => tx,
Expand All @@ -393,8 +423,8 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
.map(|txo| &txo.script_pubkey)
.expect("tx must have an output");

// because of restrictions of the Electrum API, we have to use the `script_get_history`
// call to get confirmation status of our transaction
// Because of restrictions of the Electrum API, we have to use the `script_get_history`
// call to get confirmation status of our transaction.
if let Some(r) = self
.inner
.script_get_history(spk)?
Expand All @@ -404,7 +434,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
match r.height.try_into() {
// Returned heights 0 & -1 are reserved for unconfirmed txs.
Ok(height) if height > 0 => {
self.validate_merkle_for_anchor(tx_update, txid, height)?;
txs_to_validate.push((txid, height));
}
_ => {
tx_update.seen_ats.insert((r.tx_hash, start_time));
Expand All @@ -414,55 +444,166 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {

tx_update.txs.push(tx);
}

// Batch validate all collected transactions.
if !txs_to_validate.is_empty() {
let proofs = self.batch_fetch_merkle_proofs(&txs_to_validate)?;
self.batch_validate_merkle_proofs(tx_update, proofs)?;
}

Ok(())
}

// Helper function which checks if a transaction is confirmed by validating the merkle proof.
// An anchor is inserted if the transaction is validated to be in a confirmed block.
fn validate_merkle_for_anchor(
/// Batch fetch Merkle proofs for multiple transactions.
fn batch_fetch_merkle_proofs(
&self,
txs_with_heights: &[(Txid, usize)],
) -> Result<Vec<(Txid, GetMerkleRes)>, Error> {
// Evict proofs whose block hash is no longer on our current chain.
self.clear_stale_proofs()?;

let mut results = Vec::with_capacity(txs_with_heights.len());
let mut to_fetch = Vec::new();

// Build a map for height to block hash conversions. This is for obtaining block hash data
// with minimum `fetch_header` calls.
let mut height_to_hash: HashMap<u32, BlockHash> = HashMap::new();
for &(_, height) in txs_with_heights {
let h = height as u32;
if !height_to_hash.contains_key(&h) {
// Try to obtain hash from the header cache, or fetch the header if absent.
let hash = self.fetch_header(h)?.block_hash();
height_to_hash.insert(h, hash);
}
}

// Check cache.
{
let merkle_cache = self.merkle_cache.lock().unwrap();
for &(txid, height) in txs_with_heights {
let h = height as u32;
let hash = height_to_hash[&h];
if let Some(proof) = merkle_cache.get(&(txid, hash)) {
results.push((txid, proof.clone()));
} else {
to_fetch.push((txid, height, hash));
}
}
}

// Fetch missing proofs in batches.
for chunk in to_fetch.chunks(MAX_MERKLE_BATCH_SIZE) {
for &(txid, height, hash) in chunk {
let merkle_res = self.inner.transaction_get_merkle(&txid, height)?;
self.merkle_cache
.lock()
.unwrap()
.insert((txid, hash), merkle_res.clone());
results.push((txid, merkle_res));
}
}

Ok(results)
}

/// Batch validate Merkle proofs.
fn batch_validate_merkle_proofs(
&self,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
txid: Txid,
confirmation_height: usize,
proofs: Vec<(Txid, GetMerkleRes)>,
) -> Result<(), Error> {
if let Ok(merkle_res) = self
.inner
.transaction_get_merkle(&txid, confirmation_height)
{
let mut header = self.fetch_header(merkle_res.block_height as u32)?;
// Pre-fetch all required headers.
let heights: HashSet<u32> = proofs
.iter()
.map(|(_, proof)| proof.block_height as u32)
.collect();

let mut headers = HashMap::new();
for height in heights {
headers.insert(height, self.fetch_header(height)?);
}

// Validate proofs.
for (txid, merkle_res) in proofs {
let height = merkle_res.block_height as u32;
let header = headers.get(&height).unwrap();

let mut is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
&txid,
&header.merkle_root,
&merkle_res,
);

// Merkle validation will fail if the header in `block_header_cache` is outdated, so we
// want to check if there is a new header and validate against the new one.
// Retry with updated header if validation fails.
if !is_confirmed_tx {
header = self.update_header(merkle_res.block_height as u32)?;
let updated_header = self.update_header(height)?;
headers.insert(height, updated_header);
is_confirmed_tx = electrum_client::utils::validate_merkle_proof(
&txid,
&header.merkle_root,
&updated_header.merkle_root,
&merkle_res,
);
}

if is_confirmed_tx {
let header = headers.get(&height).unwrap();
tx_update.anchors.insert((
ConfirmationBlockTime {
confirmation_time: header.time as u64,
block_id: BlockId {
height: merkle_res.block_height as u32,
height,
hash: header.block_hash(),
},
},
txid,
));
}
}

Ok(())
}

/// Remove any proofs for blocks that may have been re-orged out.
///
/// Checks if the latest cached block hash matches the current chain tip. If not, evicts proofs
/// for blocks that were re-orged out, stopping at the fork point.
fn clear_stale_proofs(&self) -> Result<(), Error> {
let mut cache = self.merkle_cache.lock().unwrap();

// Collect one (height, old_hash) pair per proof.
let mut entries: Vec<(u32, BlockHash)> = cache
.iter()
.map(|((_, old_hash), res)| (res.block_height as u32, *old_hash))
.collect();

// Sort descending and dedup so we only check each height once.
entries.sort_unstable_by(|a, b| b.0.cmp(&a.0));
entries.dedup();

// Evict any stale proofs until fork point is found.
for (height, old_hash) in entries {
let current_hash = self.fetch_header(height)?.block_hash();
if current_hash == old_hash {
break;
}
cache.retain(|&(_txid, bh), _| bh != old_hash);
}
Ok(())
}
Comment on lines +566 to +592
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reorgs don't happen that often so we won't have much "extra data". This method looks like it's O(n^2). Let's remove it.


/// Validate the Merkle proof for a single transaction using the batch APIs.
#[allow(dead_code)]
fn validate_merkle_for_anchor(
&self,
tx_update: &mut TxUpdate<ConfirmationBlockTime>,
txid: Txid,
confirmation_height: usize,
) -> Result<(), Error> {
// Use the batch processing functions even for single tx.
let proofs = self.batch_fetch_merkle_proofs(&[(txid, confirmation_height)])?;
self.batch_validate_merkle_proofs(tx_update, proofs)
}

// Helper function which fetches the `TxOut`s of our relevant transactions' previous transactions,
// which we do not have by default. This data is needed to calculate the transaction fee.
fn fetch_prev_txout(
Expand Down
Loading
Loading