Skip to content

Commit e839f42

Browse files
Blocking tipset validation (#6898)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent be0d0db commit e839f42

File tree

3 files changed

+116
-55
lines changed

3 files changed

+116
-55
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555

5656
- [#6856](https://github.com/ChainSafe/forest/pull/6856): Return ethereum compatible error `BlockRangeExceeded` with code `-32005` when block range exceeds in the eth filter and logs API.
5757

58+
- [`#6893`](https://github.com/ChainSafe/forest/issues/6893): Fixed occasional lock contention during tipset validation.
59+
5860
## Forest v0.32.4 "Mild Inconvenience"
5961

6062
This is a non-mandatory release for all node operators. It enables F3 finality resolution on ETH v1 RPC methods.

src/daemon/mod.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -175,21 +175,48 @@ async fn maybe_import_snapshot(
175175
.client
176176
.snapshot_head
177177
.unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
178-
assert!(current_height.is_positive());
179-
match validate_from.is_negative() {
180-
// allow --height=-1000 to scroll back from the current head
181-
true => ctx
182-
.state_manager
183-
.validate_range((current_height + validate_from)..=current_height)?,
184-
false => ctx
185-
.state_manager
186-
.validate_range(validate_from..=current_height)?,
187-
}
178+
179+
let validation_range = validation_range(current_height, validate_from)?;
180+
// `validate_range` is CPU-bound (drives rayon-parallel VM execution) and
181+
// can run for minutes. Safer to spawn it on a blocking thread.
182+
let state_manager = ctx.state_manager.clone();
183+
tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
184+
.await??;
188185
}
189186

190187
Ok(())
191188
}
192189

190+
/// Returns the range of epochs to validate. This includes special handling for negative `from`
191+
/// values, which are interpreted as offsets from the current epoch.
192+
fn validation_range(
193+
current: ChainEpoch,
194+
from: ChainEpoch,
195+
) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
196+
anyhow::ensure!(
197+
current.is_positive(),
198+
"current head epoch {current} is invalid"
199+
);
200+
201+
// Negative values scroll back from the current head (e.g. --height=-1000).
202+
// `saturating_add` + `.max(0)` keeps extreme negatives from underflowing or
203+
// wrapping to a huge positive (which would silently produce an empty range).
204+
let start = if from.is_negative() {
205+
current.saturating_add(from).max(0)
206+
} else {
207+
from
208+
};
209+
210+
// An absolute `--height` past the head would otherwise produce an empty
211+
// range and silently succeed without validating anything.
212+
anyhow::ensure!(
213+
start <= current,
214+
"requested validation start epoch {start} is beyond the current head at epoch {current}",
215+
);
216+
217+
Ok(start..=current)
218+
}
219+
193220
async fn maybe_start_metrics_service(
194221
services: &mut JoinSet<anyhow::Result<()>>,
195222
config: &Config,
@@ -820,3 +847,40 @@ where
820847
{
821848
tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
822849
}
850+
851+
#[cfg(test)]
852+
mod tests {
853+
use rstest::rstest;
854+
855+
use super::*;
856+
857+
#[rstest]
858+
#[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
859+
"current head epoch 0 is invalid"
860+
)))]
861+
#[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
862+
"current head epoch 0 is invalid"
863+
)))]
864+
#[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
865+
"requested validation start epoch 11 is beyond the current head at epoch 10"
866+
)))]
867+
#[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
868+
#[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
869+
#[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
870+
#[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
871+
fn test_validation_range(
872+
#[case] current: ChainEpoch,
873+
#[case] from: ChainEpoch,
874+
#[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
875+
) {
876+
let result = validation_range(current, from);
877+
match expected {
878+
Ok(expected_range) => {
879+
assert_eq!(result.unwrap(), expected_range);
880+
}
881+
Err(_) => {
882+
assert!(result.is_err());
883+
}
884+
}
885+
}
886+
}

src/state_manager/mod.rs

Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ use itertools::Itertools as _;
7575
use nonzero_ext::nonzero;
7676
use num::BigInt;
7777
use num_traits::identities::Zero;
78-
use rayon::prelude::ParallelBridge;
7978
use schemars::JsonSchema;
8079
use serde::{Deserialize, Serialize};
8180
use std::ops::RangeInclusive;
@@ -1645,8 +1644,10 @@ where
16451644

16461645
/// Validates all tipsets at epoch `start..=end` behind the heaviest tipset.
16471646
///
1648-
/// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work
1649-
/// of tipset validation.
1647+
/// Tipsets are processed sequentially. The compute-intensive work inside each
1648+
/// tipset (`bellperson` proof verification, FVM batch seal verification, etc.)
1649+
/// is already heavily rayon-parallelized. Parallelizing the outer loop actually introduces
1650+
/// some issues due to locks in the aforementioned crates. So don't do it.
16501651
///
16511652
/// # What is validation?
16521653
/// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots.
@@ -1662,10 +1663,6 @@ where
16621663
/// - assert that they match
16631664
///
16641665
/// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions.
1665-
///
1666-
/// # Known issues
1667-
/// This function is blocking, but we do observe threads waiting and synchronizing.
1668-
/// This is suspected to be due something in the VM or its `WASM` runtime.
16691666
#[tracing::instrument(skip(self))]
16701667
pub fn validate_range(&self, epochs: RangeInclusive<i64>) -> anyhow::Result<()> {
16711668
let heaviest = self.heaviest_tipset();
@@ -1852,44 +1849,42 @@ where
18521849
DB: Blockstore + Send + Sync + 'static,
18531850
T: Iterator<Item = Tipset> + Send,
18541851
{
1855-
use rayon::iter::ParallelIterator as _;
1856-
tipsets
1857-
.tuple_windows()
1858-
.par_bridge()
1859-
.try_for_each(|(child, parent)| {
1860-
info!(height = parent.epoch(), "compute parent state");
1861-
let ExecutedTipset {
1862-
state_root: actual_state,
1863-
receipt_root: actual_receipt,
1864-
..
1865-
} = apply_block_messages(
1866-
genesis_timestamp,
1867-
chain_index.shallow_clone(),
1868-
chain_config.shallow_clone(),
1869-
beacon.shallow_clone(),
1870-
engine,
1871-
parent,
1872-
NO_CALLBACK,
1873-
VMTrace::NotTraced,
1874-
)
1875-
.context("couldn't compute tipset state")?;
1876-
let expected_receipt = child.min_ticket_block().message_receipts;
1877-
let expected_state = child.parent_state();
1878-
match (expected_state, expected_receipt) == (&actual_state, actual_receipt) {
1879-
true => Ok(()),
1880-
false => {
1881-
error!(
1882-
height = child.epoch(),
1883-
?expected_state,
1884-
?expected_receipt,
1885-
?actual_state,
1886-
?actual_receipt,
1887-
"state mismatch"
1888-
);
1889-
bail!("state mismatch");
1890-
}
1891-
}
1892-
})
1852+
// Validate one tipset at a time. Parallelizing the outer loop across tipsets
1853+
// might wedge the global rayon pool.
1854+
// Sequential outer iteration leaves the entire rayon pool free for that
1855+
// already-rich inner parallelism.
1856+
for (child, parent) in tipsets.tuple_windows() {
1857+
info!(height = parent.epoch(), "compute parent state");
1858+
let ExecutedTipset {
1859+
state_root: actual_state,
1860+
receipt_root: actual_receipt,
1861+
..
1862+
} = apply_block_messages(
1863+
genesis_timestamp,
1864+
chain_index.shallow_clone(),
1865+
chain_config.shallow_clone(),
1866+
beacon.shallow_clone(),
1867+
engine,
1868+
parent,
1869+
NO_CALLBACK,
1870+
VMTrace::NotTraced,
1871+
)
1872+
.context("couldn't compute tipset state")?;
1873+
let expected_receipt = child.min_ticket_block().message_receipts;
1874+
let expected_state = child.parent_state();
1875+
if (expected_state, expected_receipt) != (&actual_state, actual_receipt) {
1876+
error!(
1877+
height = child.epoch(),
1878+
?expected_state,
1879+
?expected_receipt,
1880+
?actual_state,
1881+
?actual_receipt,
1882+
"state mismatch"
1883+
);
1884+
bail!("state mismatch");
1885+
}
1886+
}
1887+
Ok(())
18931888
}
18941889

18951890
/// Shared context for creating VMs and preparing tipset state.

0 commit comments

Comments
 (0)