diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index 6f78704d593e..78348264bff0 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -47,6 +47,7 @@ process. | `FOREST_TRACE_FILTER_MAX_RESULT` | positive integer | 500 | 1000 | Sets the maximum results returned per request by `trace_filter` | | `FOREST_CHAIN_INDEXER_ENABLED` | 1 or true | false | 1 | Whether or not to index the chain to support the Ethereum RPC API | | `FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE` | positive integer | 100 | 42 | The size of an internal cache of tipsets to messages | +| `FOREST_STATE_MIGRATION_DB_WRITE_BUFFER` | non-negative integer | 10000 | 100000 | The size of db write buffer for state migration (`~10MB` RAM per `10k` buffer) | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/db/blockstore_with_write_buffer.rs b/src/db/blockstore_with_write_buffer.rs new file mode 100644 index 000000000000..90880e0917ac --- /dev/null +++ b/src/db/blockstore_with_write_buffer.rs @@ -0,0 +1,115 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use ahash::{HashMap, HashMapExt}; +use cid::Cid; +use fvm_ipld_blockstore::Blockstore; +use itertools::Itertools; +use parking_lot::RwLock; + +pub struct BlockstoreWithWriteBuffer { + inner: DB, + buffer: RwLock>>, + buffer_capacity: usize, +} + +impl Blockstore for BlockstoreWithWriteBuffer { + fn get(&self, k: &Cid) -> anyhow::Result>> { + if let Some(v) = self.buffer.read().get(k) { + return Ok(Some(v.clone())); + } + self.inner.get(k) + } + + fn has(&self, k: &Cid) -> anyhow::Result { + Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?) + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + { + let mut buffer = self.buffer.write(); + buffer.insert(*k, block.to_vec()); + } + self.flush_buffer_if_needed() + } +} + +impl BlockstoreWithWriteBuffer { + pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self { + Self { + inner, + buffer_capacity, + buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)), + } + } + + fn flush_buffer(&self) -> anyhow::Result<()> { + let records = { + let mut buffer = self.buffer.write(); + buffer.drain().collect_vec() + }; + self.inner.put_many_keyed(records) + } + + fn flush_buffer_if_needed(&self) -> anyhow::Result<()> { + if self.buffer.read().len() >= self.buffer_capacity { + self.flush_buffer() + } else { + Ok(()) + } + } +} + +impl Drop for BlockstoreWithWriteBuffer { + fn drop(&mut self) { + if let Err(e) = self.flush_buffer() { + tracing::warn!("{e}"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{db::MemoryDB, utils::rand::forest_rng}; + use fvm_ipld_encoding::DAG_CBOR; + use multihash_codetable::Code::Blake2b256; + use multihash_codetable::MultihashDigest as _; + use rand::Rng as _; + use std::sync::Arc; + + #[test] + fn test_buffer_flush() { + const BUFFER_SIZE: usize = 10; + const N_RECORDS: usize = 15; + let mem_db = Arc::new(MemoryDB::default()); + let buf_db = BlockstoreWithWriteBuffer::new_with_capacity(mem_db.clone(), BUFFER_SIZE); + let mut records = Vec::with_capacity(N_RECORDS); + for _ in 0..N_RECORDS { + let mut record = [0; 1024]; + forest_rng().fill(&mut record); + let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice())); + records.push((key, record)); + } + + buf_db.put_many_keyed(records.clone()).unwrap(); + + for (i, (k, v)) in records.iter().enumerate() { + assert!(buf_db.has(k).unwrap()); + assert_eq!(buf_db.get(k).unwrap().unwrap().as_slice(), v); + if i < BUFFER_SIZE { + assert!(mem_db.has(k).unwrap()); + assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v); + } else { + assert!(!mem_db.has(k).unwrap()); + } + } + + drop(buf_db); + + for (k, v) in records.iter() { + assert!(mem_db.has(k).unwrap()); + assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v); + } + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 5423df7d770c..f2a90c618186 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,6 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +mod blockstore_with_write_buffer; pub mod car; mod memory; pub mod parity_db; @@ -8,6 +9,7 @@ pub mod parity_db_config; mod gc; pub mod ttl; +pub use blockstore_with_write_buffer::BlockstoreWithWriteBuffer; pub use gc::MarkAndSweep; pub use memory::MemoryDB; use setting_keys::ETH_MAPPING_UP_TO_DATE_KEY; diff --git a/src/state_migration/mod.rs b/src/state_migration/mod.rs index dac851a2d6be..10f9d43e3656 100644 --- a/src/state_migration/mod.rs +++ b/src/state_migration/mod.rs @@ -6,6 +6,7 @@ use std::sync::{ atomic::{self, AtomicBool}, }; +use crate::db::BlockstoreWithWriteBuffer; use crate::networks::{ChainConfig, Height, NetworkChain}; use crate::shim::clock::ChainEpoch; use crate::shim::state_tree::StateRoot; @@ -92,6 +93,12 @@ pub fn run_state_migrations( where DB: Blockstore + Send + Sync, { + // ~10MB RAM per 10k buffer + let db_write_buffer = match std::env::var("FOREST_STATE_MIGRATION_DB_WRITE_BUFFER") { + Ok(v) => v.parse().ok(), + _ => None, + } + .unwrap_or(10000); let mappings = get_migrations(&chain_config.network); // Make sure bundle is defined. @@ -115,7 +122,11 @@ where if epoch == chain_config.epoch(height) { tracing::info!("Running {height} migration at epoch {epoch}"); let start_time = std::time::Instant::now(); - let new_state = migrate(chain_config, db, parent_state, epoch)?; + let db = Arc::new(BlockstoreWithWriteBuffer::new_with_capacity( + db.clone(), + db_write_buffer, + )); + let new_state = migrate(chain_config, &db, parent_state, epoch)?; let elapsed = start_time.elapsed(); // `new_state_actors` is the Go state migration output, log for comparision let new_state_actors = db diff --git a/src/tool/subcommands/shed_cmd/migration.rs b/src/tool/subcommands/shed_cmd/migration.rs index 3058cfb430b9..2630c79b33de 100644 --- a/src/tool/subcommands/shed_cmd/migration.rs +++ b/src/tool/subcommands/shed_cmd/migration.rs @@ -7,8 +7,10 @@ use std::time::Instant; use cid::Cid; use clap::Args; +use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; +use crate::db::BlockstoreWithWriteBuffer; use crate::utils::db::CborStoreExt; use crate::{ blocks::CachingBlockHeader, @@ -36,6 +38,9 @@ pub struct MigrateStateCommand { /// Filecoin network chain #[arg(long, required = true)] chain: NetworkChain, + /// Size of database write buffer, use 0 to disable write buffer + #[arg(long, default_value_t = 10000)] + db_write_buffer: usize, } impl MigrateStateCommand { @@ -45,6 +50,7 @@ impl MigrateStateCommand { block_to_look_back, db, chain, + db_write_buffer, } = self; let db = { let db = if let Some(db) = db { @@ -53,7 +59,15 @@ impl MigrateStateCommand { let (_, config) = read_config(None, Some(chain.clone()))?; db_root(&chain_path(&config))? }; - load_db(&db)? + let db = load_db(&db)?; + Arc::new(if db_write_buffer > 0 { + Either::Left(BlockstoreWithWriteBuffer::new_with_capacity( + db, + db_write_buffer, + )) + } else { + Either::Right(db) + }) }; let block: CachingBlockHeader = db.get_cbor_required(&block_to_look_back)?; let chain_config = Arc::new(ChainConfig::from_chain(&chain)); @@ -91,3 +105,72 @@ pub(super) fn load_db(db_root: &Path) -> anyhow::Result>> load_all_forest_cars(&db, &forest_car_db_dir)?; Ok(Arc::new(db)) } + +enum Either { + Left(A), + Right(B), +} + +impl Blockstore for Either { + fn has(&self, k: &Cid) -> anyhow::Result { + match self { + Self::Left(v) => v.has(k), + Self::Right(v) => v.has(k), + } + } + + #[allow(clippy::disallowed_types)] + fn put( + &self, + mh_code: multihash_codetable::Code, + block: &fvm_ipld_blockstore::Block, + ) -> anyhow::Result + where + Self: Sized, + D: AsRef<[u8]>, + { + match self { + Self::Left(v) => v.put(mh_code, block), + Self::Right(v) => v.put(mh_code, block), + } + } + + #[allow(clippy::disallowed_types)] + fn put_many(&self, blocks: I) -> anyhow::Result<()> + where + Self: Sized, + D: AsRef<[u8]>, + I: IntoIterator)>, + { + match self { + Self::Left(v) => v.put_many(blocks), + Self::Right(v) => v.put_many(blocks), + } + } + + fn put_many_keyed(&self, blocks: I) -> anyhow::Result<()> + where + Self: Sized, + D: AsRef<[u8]>, + I: IntoIterator, + { + match self { + Self::Left(v) => v.put_many_keyed(blocks), + Self::Right(v) => v.put_many_keyed(blocks), + } + } + + fn get(&self, k: &Cid) -> anyhow::Result>> { + match self { + Self::Left(v) => v.get(k), + Self::Right(v) => v.get(k), + } + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + match self { + Self::Left(v) => v.put_keyed(k, block), + Self::Right(v) => v.put_keyed(k, block), + } + } +}