Skip to content

fix(bitcoind_rpc): fix filter iter may not handle reorgs properly #1909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 81 additions & 70 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ pub struct FilterIter<'c, C> {
spks: Vec<ScriptBuf>,
// local cp
cp: Option<CheckPoint>,
// blocks map
blocks: BTreeMap<Height, BlockHash>,
// map of height -> (hash, is_match)
blocks: BTreeMap<Height, (BlockHash, bool)>,
// initial height
start: Height,
// best height counter
height: Height,
// stop height
Expand All @@ -47,6 +49,7 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
spks: vec![],
cp: None,
blocks: BTreeMap::new(),
start: height,
height,
stop: 0,
}
Expand All @@ -69,22 +72,14 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
self.spks.push(spk);
}

/// Get the next filter and increment the current best height.
///
/// Returns `Ok(None)` when the stop height is exceeded.
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
if self.height > self.stop {
return Ok(None);
}
let height = self.height;
let hash = match self.blocks.get(&height) {
Some(h) => *h,
None => self.client.get_block_hash(height as u64)?,
};
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
let filter = BlockFilter::new(&filter_bytes);
self.height += 1;
Ok(Some((BlockId { height, hash }, filter)))
/// Get the block hash by `height` if it is found in the blocks map.
fn get_block_hash(&self, height: &Height) -> Option<BlockHash> {
Some(self.blocks.get(height)?.0)
}

/// Insert a (non-matching) block height and hash into the blocks map.
fn insert_block(&mut self, height: Height, hash: BlockHash) {
self.blocks.insert(height, (hash, false));
}

/// Get the remote tip.
Expand All @@ -93,33 +88,17 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
/// [`FilterIter`].
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
let tip_hash = self.client.get_best_block_hash()?;
let mut header = self.client.get_block_header_info(&tip_hash)?;
let header = self.client.get_block_header_info(&tip_hash)?;
let tip_height = header.height as u32;
if self.height >= tip_height {
// nothing to do
return Ok(None);
}
self.blocks.insert(tip_height, tip_hash);

// if we have a checkpoint we use a lookback of ten blocks
// to ensure consistency of the local chain
// start scanning from point of agreement + 1
if let Some(cp) = self.cp.as_ref() {
// adjust start height to point of agreement + 1
let base = self.find_base_with(cp.clone())?;
self.height = base.height + 1;

for _ in 0..9 {
let hash = match header.previous_block_hash {
Some(hash) => hash,
None => break,
};
header = self.client.get_block_header_info(&hash)?;
let height = header.height as u32;
if height < self.height {
break;
}
self.blocks.insert(height, hash);
}
self.height = base.height.saturating_add(1);
}

self.stop = tip_height;
Expand All @@ -131,9 +110,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
}
}

/// Alias for a compact filter and associated block id.
type NextFilter = (BlockId, BlockFilter);

/// Event inner type
#[derive(Debug, Clone)]
pub struct EventInner {
Expand Down Expand Up @@ -171,27 +147,53 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
type Item = Result<Event, Error>;

fn next(&mut self) -> Option<Self::Item> {
(|| -> Result<_, Error> {
// if the next filter matches any of our watched spks, get the block
// and return it, inserting relevant block ids along the way
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
let height = block.height;
let hash = block.hash;

if self.spks.is_empty() {
Err(Error::NoScripts)
} else if filter
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
.map_err(Error::Bip158)?
{
let block = self.client.get_block(&hash)?;
self.blocks.insert(height, hash);
let inner = EventInner { height, block };
Ok(Some(Event::Block(inner)))
} else {
Ok(Some(Event::NoMatch(height)))
(|| -> Result<Option<_>, Error> {
if self.height > self.stop {
return Ok(None);
}
// Fetch next filter
let mut height = self.height;
let mut hash = self.client.get_block_hash(height as _)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to compare this with the previous header returned? Otherwise how can we detect reorgs between separate calls to next?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @evanlinjin, Thanks for the review. I think the code addresses your concern about detecting reorgs between calls to next.

When next() finishes, it updates self.height on line 177 to the next height it expects to process. It also stores the hash and match status for the block it just processed in self.blocks on line 175 self.insert_block(height, hash);). And it persists both of self.height and self.blocks until the next call to next().

What was done on line 155 and line 156 is that the height and hash variables were initialized with the values of self.height (which is the height we expect to process next) and self.client.get_block_hash(height as _) (which is its hash)

Then, the code from lines 157 to 166 gets the header of the current block let header = self.client.get_block_header(&hash)?;, calculates the height of the assumed parent's block let prev_height = height.saturating_sub(1);, retrieves the hash of the assumed parent through its height, and compares it to the prev_blockhash field from the current block's header. If they match, then the chain is consistent, and if they don't, a reorg has occured.

loop {
let header = self.client.get_block_header(&hash)?;
let prev_height = height.saturating_sub(1);
let prev_hash = match self.get_block_hash(&prev_height) {
Some(hash) => hash,
None => break,
};
if header.prev_blockhash == prev_hash {
break;
}
})
// reorg detected, keep backtracking
height = height.saturating_sub(1);
hash = self.client.get_block_hash(height as _)?;
}
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
let filter = BlockFilter::new(&filter_bytes);

// record the scanned block
self.insert_block(height, hash);
// increment best height
self.height = height.saturating_add(1);

// If the filter matches any of our watched SPKs, fetch the full
// block, and record the matching block entry.
if self.spks.is_empty() {
Err(Error::NoScripts)
} else if filter
.match_any(&hash, self.spks.iter().map(|s| s.as_bytes()))
.map_err(Error::Bip158)?
{
let block = self.client.get_block(&hash)?;
// update for matched block
self.blocks.entry(height).and_modify(|(_, is_match)| {
*is_match = true;
});
let inner = EventInner { height, block };
Ok(Some(Event::Block(inner)))
} else {
Ok(Some(Event::NoMatch(height)))
}
})()
.transpose()
}
Expand All @@ -202,36 +204,45 @@ impl<C: RpcApi> FilterIter<'_, C> {
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
loop {
let height = cp.height();
let fetched_hash = match self.blocks.get(&height) {
Some(hash) => *hash,
let fetched_hash = match self.get_block_hash(&height) {
Some(hash) => hash,
None if height == 0 => cp.hash(),
_ => self.client.get_block_hash(height as _)?,
};
if cp.hash() == fetched_hash {
// ensure this block also exists in self
self.blocks.insert(height, cp.hash());
self.insert_block(height, cp.hash());
return Ok(cp.block_id());
}
// remember conflicts
self.blocks.insert(height, fetched_hash);
self.insert_block(height, fetched_hash);
cp = cp.prev().expect("must break before genesis");
}
}

/// Returns a chain update from the newly scanned blocks.
///
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
/// if not all events have been emitted (by calling `next`).
pub fn chain_update(&mut self) -> Option<CheckPoint> {
if self.cp.is_none() || self.blocks.is_empty() {
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop {
return None;
}

// note: to connect with the local chain we must guarantee that `self.blocks.first()`
// is also the point of agreement with `self.cp`.
// We return blocks up to and including the initial height, all of the matching blocks,
// and blocks in the terminal range.
let tail_range = self.stop.saturating_sub(9)..=self.stop;
Some(
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
.expect("blocks must be in order"),
CheckPoint::from_block_ids(self.blocks.iter().filter_map(
|(&height, &(hash, is_match))| {
if height <= self.start || is_match || tail_range.contains(&height) {
Some(BlockId { height, hash })
} else {
None
}
},
))
.expect("blocks must be in order"),
)
}
}
Expand Down
Loading
Loading