diff --git a/crates/bitcoind_rpc/src/bip158.rs b/crates/bitcoind_rpc/src/bip158.rs index 5419716b1..628972665 100644 --- a/crates/bitcoind_rpc/src/bip158.rs +++ b/crates/bitcoind_rpc/src/bip158.rs @@ -31,8 +31,10 @@ pub struct FilterIter<'c, C> { spks: Vec, // local cp cp: Option, - // blocks map - blocks: BTreeMap, + // map of height -> (hash, is_match) + blocks: BTreeMap, + // initial height + start: Height, // best height counter height: Height, // stop height @@ -47,6 +49,7 @@ impl<'c, C: RpcApi> FilterIter<'c, C> { spks: vec![], cp: None, blocks: BTreeMap::new(), + start: height, height, stop: 0, } @@ -69,22 +72,14 @@ impl<'c, C: RpcApi> FilterIter<'c, C> { self.spks.push(spk); } - /// Get the next filter and increment the current best height. - /// - /// Returns `Ok(None)` when the stop height is exceeded. - fn next_filter(&mut self) -> Result, Error> { - if self.height > self.stop { - return Ok(None); - } - let height = self.height; - let hash = match self.blocks.get(&height) { - Some(h) => *h, - None => self.client.get_block_hash(height as u64)?, - }; - let filter_bytes = self.client.get_block_filter(&hash)?.filter; - let filter = BlockFilter::new(&filter_bytes); - self.height += 1; - Ok(Some((BlockId { height, hash }, filter))) + /// Get the block hash by `height` if it is found in the blocks map. + fn get_block_hash(&self, height: &Height) -> Option { + Some(self.blocks.get(height)?.0) + } + + /// Insert a (non-matching) block height and hash into the blocks map. + fn insert_block(&mut self, height: Height, hash: BlockHash) { + self.blocks.insert(height, (hash, false)); } /// Get the remote tip. @@ -93,33 +88,17 @@ impl<'c, C: RpcApi> FilterIter<'c, C> { /// [`FilterIter`]. pub fn get_tip(&mut self) -> Result, Error> { let tip_hash = self.client.get_best_block_hash()?; - let mut header = self.client.get_block_header_info(&tip_hash)?; + let header = self.client.get_block_header_info(&tip_hash)?; let tip_height = header.height as u32; if self.height >= tip_height { // nothing to do return Ok(None); } - self.blocks.insert(tip_height, tip_hash); - // if we have a checkpoint we use a lookback of ten blocks - // to ensure consistency of the local chain + // start scanning from point of agreement + 1 if let Some(cp) = self.cp.as_ref() { - // adjust start height to point of agreement + 1 let base = self.find_base_with(cp.clone())?; - self.height = base.height + 1; - - for _ in 0..9 { - let hash = match header.previous_block_hash { - Some(hash) => hash, - None => break, - }; - header = self.client.get_block_header_info(&hash)?; - let height = header.height as u32; - if height < self.height { - break; - } - self.blocks.insert(height, hash); - } + self.height = base.height.saturating_add(1); } self.stop = tip_height; @@ -131,9 +110,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> { } } -/// Alias for a compact filter and associated block id. -type NextFilter = (BlockId, BlockFilter); - /// Event inner type #[derive(Debug, Clone)] pub struct EventInner { @@ -171,27 +147,53 @@ impl Iterator for FilterIter<'_, C> { type Item = Result; fn next(&mut self) -> Option { - (|| -> Result<_, Error> { - // if the next filter matches any of our watched spks, get the block - // and return it, inserting relevant block ids along the way - self.next_filter()?.map_or(Ok(None), |(block, filter)| { - let height = block.height; - let hash = block.hash; - - if self.spks.is_empty() { - Err(Error::NoScripts) - } else if filter - .match_any(&hash, self.spks.iter().map(|script| script.as_bytes())) - .map_err(Error::Bip158)? - { - let block = self.client.get_block(&hash)?; - self.blocks.insert(height, hash); - let inner = EventInner { height, block }; - Ok(Some(Event::Block(inner))) - } else { - Ok(Some(Event::NoMatch(height))) + (|| -> Result, Error> { + if self.height > self.stop { + return Ok(None); + } + // Fetch next filter + let mut height = self.height; + let mut hash = self.client.get_block_hash(height as _)?; + loop { + let header = self.client.get_block_header(&hash)?; + let prev_height = height.saturating_sub(1); + let prev_hash = match self.get_block_hash(&prev_height) { + Some(hash) => hash, + None => break, + }; + if header.prev_blockhash == prev_hash { + break; } - }) + // reorg detected, keep backtracking + height = height.saturating_sub(1); + hash = self.client.get_block_hash(height as _)?; + } + let filter_bytes = self.client.get_block_filter(&hash)?.filter; + let filter = BlockFilter::new(&filter_bytes); + + // record the scanned block + self.insert_block(height, hash); + // increment best height + self.height = height.saturating_add(1); + + // If the filter matches any of our watched SPKs, fetch the full + // block, and record the matching block entry. + if self.spks.is_empty() { + Err(Error::NoScripts) + } else if filter + .match_any(&hash, self.spks.iter().map(|s| s.as_bytes())) + .map_err(Error::Bip158)? + { + let block = self.client.get_block(&hash)?; + // update for matched block + self.blocks.entry(height).and_modify(|(_, is_match)| { + *is_match = true; + }); + let inner = EventInner { height, block }; + Ok(Some(Event::Block(inner))) + } else { + Ok(Some(Event::NoMatch(height))) + } })() .transpose() } @@ -202,18 +204,18 @@ impl FilterIter<'_, C> { fn find_base_with(&mut self, mut cp: CheckPoint) -> Result { loop { let height = cp.height(); - let fetched_hash = match self.blocks.get(&height) { - Some(hash) => *hash, + let fetched_hash = match self.get_block_hash(&height) { + Some(hash) => hash, None if height == 0 => cp.hash(), _ => self.client.get_block_hash(height as _)?, }; if cp.hash() == fetched_hash { // ensure this block also exists in self - self.blocks.insert(height, cp.hash()); + self.insert_block(height, cp.hash()); return Ok(cp.block_id()); } // remember conflicts - self.blocks.insert(height, fetched_hash); + self.insert_block(height, fetched_hash); cp = cp.prev().expect("must break before genesis"); } } @@ -221,17 +223,26 @@ impl FilterIter<'_, C> { /// Returns a chain update from the newly scanned blocks. /// /// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or - /// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip). + /// if not all events have been emitted (by calling `next`). pub fn chain_update(&mut self) -> Option { - if self.cp.is_none() || self.blocks.is_empty() { + if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop { return None; } - // note: to connect with the local chain we must guarantee that `self.blocks.first()` - // is also the point of agreement with `self.cp`. + // We return blocks up to and including the initial height, all of the matching blocks, + // and blocks in the terminal range. + let tail_range = self.stop.saturating_sub(9)..=self.stop; Some( - CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from)) - .expect("blocks must be in order"), + CheckPoint::from_block_ids(self.blocks.iter().filter_map( + |(&height, &(hash, is_match))| { + if height <= self.start || is_match || tail_range.contains(&height) { + Some(BlockId { height, hash }) + } else { + None + } + }, + )) + .expect("blocks must be in order"), ) } } diff --git a/crates/bitcoind_rpc/tests/test_filter_iter.rs b/crates/bitcoind_rpc/tests/test_filter_iter.rs index c8d3335a2..b7831b43e 100644 --- a/crates/bitcoind_rpc/tests/test_filter_iter.rs +++ b/crates/bitcoind_rpc/tests/test_filter_iter.rs @@ -1,8 +1,7 @@ -use bitcoin::{constants, Address, Amount, Network, ScriptBuf}; - -use bdk_bitcoind_rpc::bip158::FilterIter; +use bdk_bitcoind_rpc::bip158::{Event, EventInner, FilterIter}; use bdk_core::{BlockId, CheckPoint}; use bdk_testenv::{anyhow, bitcoind, block_id, TestEnv}; +use bitcoin::{constants, Address, Amount, Network, ScriptBuf}; use bitcoincore_rpc::RpcApi; fn testenv() -> anyhow::Result { @@ -100,6 +99,7 @@ fn get_tip_and_chain_update() -> anyhow::Result<()> { let cp = CheckPoint::from_block_ids(test.chain).unwrap(); let mut iter = FilterIter::new_with_checkpoint(env.rpc_client(), cp); assert_eq!(iter.get_tip().unwrap(), Some(new_tip)); + for _res in iter.by_ref() {} let update_cp = iter.chain_update().unwrap(); let mut update_blocks: Vec<_> = update_cp.iter().map(|cp| cp.block_id()).collect(); update_blocks.reverse(); @@ -111,7 +111,6 @@ fn get_tip_and_chain_update() -> anyhow::Result<()> { #[test] fn filter_iter_returns_matched_blocks() -> anyhow::Result<()> { - use bdk_bitcoind_rpc::bip158::{Event, EventInner}; let env = testenv()?; let rpc = env.rpc_client(); while rpc.get_block_count()? < 101 { @@ -163,3 +162,277 @@ fn filter_iter_error_no_scripts() -> anyhow::Result<()> { Ok(()) } + +#[test] +#[allow(clippy::print_stdout)] +fn filter_iter_handles_reorg() -> anyhow::Result<()> { + let env = testenv()?; + let client = env.rpc_client(); + + // 1. Initial setup & mining + println!("STEP: Initial mining (target height 102 for maturity)"); + + let expected_initial_height = 102; + while env.rpc_client().get_block_count()? < expected_initial_height { + let _ = env.mine_blocks(1, None)?; + } + // ***************************** + // Check the expected initial height + assert_eq!( + client.get_block_count()?, + expected_initial_height, + "Block count should be {} after initial mine", + expected_initial_height + ); + + // 2. Create watched script + println!("STEP: Creating watched script"); + // Ensure address and spk_to_watch are defined here ***** + // ****************************************************************** + let spk_to_watch = ScriptBuf::from_hex("0014446906a6560d8ad760db3156706e72e171f3a2aa")?; + let address = Address::from_script(&spk_to_watch, Network::Regtest)?; + println!("Watching SPK: {}", spk_to_watch.to_hex_string()); + + // Create 2 txs to be confirmed at consecutive heights. + // We have to choose our UTXOs now to make sure one doesn't get invalidated + // later by a reorg. + let unspent = client.list_unspent(None, None, None, None, None)?; + assert!(unspent.len() >= 2); + use bdk_testenv::bitcoincore_rpc::bitcoincore_rpc_json::CreateRawTransactionInput; + let unspent_1 = &unspent[0]; + let unspent_2 = &unspent[1]; + let utxo_1 = CreateRawTransactionInput { + txid: unspent_1.txid, + vout: unspent_1.vout, + sequence: None, + }; + let utxo_2 = CreateRawTransactionInput { + txid: unspent_2.txid, + vout: unspent_2.vout, + sequence: None, + }; + + // create tx 1 + println!("STEP: Creating transactions to send"); + let to_send = Amount::ONE_BTC; + let fee = Amount::from_sat(1_000); + let change_addr = client.get_new_address(None, None)?.assume_checked(); + let change_amt = unspent_1.amount - to_send - fee; + let out = [ + (address.to_string(), to_send), + (change_addr.to_string(), change_amt), + ] + .into(); + let to_send = Amount::ONE_BTC * 2; + let tx = client.create_raw_transaction(&[utxo_1], &out, None, None)?; + let res = client.sign_raw_transaction_with_wallet(&tx, None, None)?; + let tx_1 = res.transaction()?; + // create tx 2 + let change_addr = client.get_new_address(None, None)?.assume_checked(); + let change_amt = unspent_2.amount - to_send - fee; + let out = [ + (address.to_string(), to_send), + (change_addr.to_string(), change_amt), + ] + .into(); + let tx = client.create_raw_transaction(&[utxo_2], &out, None, None)?; + let res = client.sign_raw_transaction_with_wallet(&tx, None, None)?; + let tx_2 = res.transaction()?; + + // let mine_to: u32 = 103; + + println!("STEP: Mining to height {}", 103); + while env.rpc_client().get_block_count()? < 103 { + let _ = env.mine_blocks(1, None)?; + } + + // 3. Mine block A WITH relevant tx + println!("STEP: Sending tx for original block A"); + let txid_a = client.send_raw_transaction(&tx_1)?; + println!("STEP: Mining original block A"); + let hash_104 = env.mine_blocks(1, None)?[0]; + + // 4. Mine block B WITH relevant tx 2 + println!("STEP: Sending tx 2 for original block B"); + let txid_b = client.send_raw_transaction(&tx_2)?; + println!("STEP: Mining original block B"); + let hash_105 = env.mine_blocks(1, None)?[0]; + + assert_eq!( + client.get_block_count()?, + 105, + "Block count should be 105 after mining block B" + ); + + // 5. Instantiate FilterIter at start height 104 + println!("STEP: Instantiating FilterIter"); + // Start processing from height 104 + let start_height = 104; + let mut iter = FilterIter::new_with_height(client, start_height); + iter.add_spk(spk_to_watch.clone()); + let initial_tip = iter.get_tip()?.expect("Should get initial tip"); + assert_eq!(initial_tip.height, 105); + assert_eq!(initial_tip.hash, hash_105); + + // 6. Iterate once processing block A + println!("STEP: Iterating once (original block A)"); + let event_a = iter.next().expect("Iterator should have item A")?; + // println!("First event: {:?}", event_a); + match event_a { + Event::Block(EventInner { height, block }) => { + assert_eq!(height, 104); + assert_eq!(block.block_hash(), hash_104); + assert!(block.txdata.iter().any(|tx| tx.compute_txid() == txid_a)); + } + _ => panic!("Expected relevant tx at block A 102"), + } + + // 7. Simulate Reorg (Invalidate blocks B and A) + println!("STEP: Invalidating original blocks B and A"); + println!("Invalidating blocks B ({}) and A ({})", hash_105, hash_104); + client.invalidate_block(&hash_105)?; + client.invalidate_block(&hash_104)?; + // We should see 2 unconfirmed txs in mempool + let raw_mempool = client.get_raw_mempool()?; + assert_eq!(raw_mempool.len(), 2); + println!( + "{} txs in mempool at height {}", + raw_mempool.len(), + client.get_block_count()? + ); + + // 8. Mine Replacement Blocks WITH relevant txs + // First mine Block A' + println!("STEP: Mining replacement block A' (with send tx x2)"); + let hash_104_prime = env.mine_blocks(1, None)?[0]; + let height = client.get_block_count()?; + println!("Block {} (A') hash: {}", height, hash_104_prime); + assert_eq!(height, 104); + assert_ne!(hash_104, hash_104_prime); + + // Mine Block B' - empty or unrelated txs + println!("STEP: Mining replacement block B' (no send tx)"); + let hash_105_prime = env.mine_blocks(1, None)?[0]; + let height = client.get_block_count()?; + println!("Block {} (B') hash: {}", height, hash_105_prime); + assert_eq!(height, 105); + assert_ne!(hash_105, hash_105_prime); + + // 9. Continue Iterating & Collect Events AFTER reorg + // Iterator should now process heights 109 (A') and 110 (B'). + let mut post_reorg_events: Vec = vec![]; + + println!("STEP: Starting post-reorg iteration loop"); + println!("Continuing iteration after reorg..."); + for event_result in iter.by_ref() { + let event = event_result?; + println!( + "Post-reorg event height: {}, matched: {}", + event.height(), + event.is_match(), + ); + post_reorg_events.push(event); + } + + // 10. Assertions + println!("STEP: Checking post-reorg assertions"); + + // Check for event post-reorg (Block A') + let event_104_post = post_reorg_events.iter().find(|e| e.height() == 104); + assert!( + event_104_post.is_some(), + "Should have yielded an event for post-reorg (Block A')" + ); + match event_104_post.unwrap() { + Event::Block(inner) => { + assert_eq!( + inner.block.block_hash(), + hash_104_prime, + "BUG: Iterator yielded wrong block for height 104! Expected A'" + ); + assert!( + inner + .block + .txdata + .iter() + .any(|tx| tx.compute_txid() == txid_a), + "Expected relevant tx A" + ); + assert!( + inner + .block + .txdata + .iter() + .any(|tx| tx.compute_txid() == txid_b), + "Expected relevant tx B" + ); + } + Event::NoMatch(..) => { + panic!("Expected to match height 104"); + } + } + + // Check for event post-reorg (Block B') + let event_105_post = post_reorg_events.iter().find(|e| e.height() == 105); + assert!( + event_105_post.is_some(), + "Should have yielded an event for post-reorg (Block B')" + ); + match event_105_post.unwrap() { + Event::NoMatch(h) => { + assert_eq!(*h, 105, "Should be NoMatch for block B'"); + } + Event::Block(..) => { + panic!("Expected NoMatch for block B'"); + } + } + + // Check chain update tip + // println!("STEP: Checking chain_update"); + let final_update = iter.chain_update(); + assert!( + final_update.is_none(), + "We didn't instantiate FilterIter with a checkpoint" + ); + + Ok(()) +} + +// Test that while a reorg is detected we delay incrementing the best height +#[test] +fn repeat_reorgs() -> anyhow::Result<()> { + const MINE_TO: u32 = 11; + + let env = testenv()?; + let rpc = env.rpc_client(); + while rpc.get_block_count()? < MINE_TO as u64 { + let _ = env.mine_blocks(1, None)?; + } + + let spk = ScriptBuf::from_hex("0014446906a6560d8ad760db3156706e72e171f3a2aa")?; + + let mut iter = FilterIter::new_with_height(env.rpc_client(), 1); + iter.add_spk(spk); + assert_eq!(iter.get_tip()?.unwrap().height, MINE_TO); + + // Process events to height (MINE_TO - 1) + loop { + if iter.next().unwrap()?.height() == MINE_TO - 1 { + break; + } + } + + for _ in 0..3 { + // Invalidate 2 blocks and remine to height = MINE_TO + let _ = env.reorg(2)?; + + // Call next. If we detect a reorg, we'll see no change in the event height + assert_eq!(iter.next().unwrap()?.height(), MINE_TO - 1); + } + + // If no reorg, then height should increment normally from here on + assert_eq!(iter.next().unwrap()?.height(), MINE_TO); + assert!(iter.next().is_none()); + + Ok(()) +}