Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion src/state_migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use std::sync::{
use crate::networks::{ChainConfig, Height, NetworkChain};
use crate::shim::clock::ChainEpoch;
use crate::shim::state_tree::StateRoot;
use ahash::{HashMap, HashMapExt};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use itertools::Itertools;
use parking_lot::RwLock;

pub(in crate::state_migration) mod common;
mod nv17;
Expand Down Expand Up @@ -115,7 +118,8 @@ where
if epoch == chain_config.epoch(height) {
tracing::info!("Running {height} migration at epoch {epoch}");
let start_time = std::time::Instant::now();
let new_state = migrate(chain_config, db, parent_state, epoch)?;
let db = Arc::new(BlockstoreWithWriteBuffer::new(db.clone()));
let new_state = migrate(chain_config, &db, parent_state, epoch)?;
let elapsed = start_time.elapsed();
// `new_state_actors` is the Go state migration output, log for comparision
let new_state_actors = db
Expand Down Expand Up @@ -144,5 +148,70 @@ where
Ok(None)
}

pub struct BlockstoreWithWriteBuffer<DB: Blockstore> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is straightforward, but it'd still be great to have some coverage there, especially given that locking mechanisms are involved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

inner: DB,
buffer: RwLock<HashMap<Cid, Vec<u8>>>,
buffer_capacity: usize,
}

impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
if let Some(v) = self.buffer.read().get(k) {
return Ok(Some(v.clone()));
}
self.inner.get(k)
}

fn has(&self, k: &Cid) -> anyhow::Result<bool> {
Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
}

fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
{
let mut buffer = self.buffer.write();
buffer.insert(*k, block.to_vec());
}
self.flush_buffer_if_needed()
Comment thread
elmattic marked this conversation as resolved.
Outdated
}
}

impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
pub fn new(inner: DB) -> Self {
Self::new_with_capacity(inner, 10000)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have it configurable so that operators with more memory in their box can potentially increase this. It'd be great to note how much memory increase should one expect with every 10k buffer increment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
Self {
inner,
buffer_capacity,
buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
}
}

fn flush_buffer(&self) -> anyhow::Result<()> {
let records = {
let mut buffer = self.buffer.write();
buffer.drain().collect_vec()
};
self.inner.put_many_keyed(records)
}

fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
if self.buffer.read().len() >= self.buffer_capacity {
self.flush_buffer()
} else {
Ok(())
}
}
}

impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
fn drop(&mut self) {
if let Err(e) = self.flush_buffer() {
tracing::warn!("{e}");
}
}
}

#[cfg(test)]
mod tests;
85 changes: 84 additions & 1 deletion src/tool/subcommands/shed_cmd/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::time::Instant;

use cid::Cid;
use clap::Args;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;

use crate::state_migration::BlockstoreWithWriteBuffer;
use crate::utils::db::CborStoreExt;
use crate::{
blocks::CachingBlockHeader,
Expand Down Expand Up @@ -36,6 +38,9 @@ pub struct MigrateStateCommand {
/// Filecoin network chain
#[arg(long, required = true)]
chain: NetworkChain,
/// Size of database write buffer, use 0 to disable write buffer
#[arg(long, default_value_t = 10000)]
db_write_buffer: usize,
}

impl MigrateStateCommand {
Expand All @@ -45,6 +50,7 @@ impl MigrateStateCommand {
block_to_look_back,
db,
chain,
db_write_buffer,
} = self;
let db = {
let db = if let Some(db) = db {
Expand All @@ -53,7 +59,15 @@ impl MigrateStateCommand {
let (_, config) = read_config(None, Some(chain.clone()))?;
db_root(&chain_path(&config))?
};
load_db(&db)?
let db = load_db(&db)?;
Arc::new(if db_write_buffer > 0 {
Either::Left(BlockstoreWithWriteBuffer::new_with_capacity(
db,
db_write_buffer,
))
} else {
Either::Right(db)
})
};
let block: CachingBlockHeader = db.get_cbor_required(&block_to_look_back)?;
let chain_config = Arc::new(ChainConfig::from_chain(&chain));
Expand Down Expand Up @@ -91,3 +105,72 @@ pub(super) fn load_db(db_root: &Path) -> anyhow::Result<Arc<ManyCar<ParityDb>>>
load_all_forest_cars(&db, &forest_car_db_dir)?;
Ok(Arc::new(db))
}

enum Either<A: Blockstore, B: Blockstore> {
Left(A),
Right(B),
}

impl<A: Blockstore, B: Blockstore> Blockstore for Either<A, B> {
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
match self {
Self::Left(v) => v.has(k),
Self::Right(v) => v.has(k),
}
}

#[allow(clippy::disallowed_types)]
fn put<D>(
&self,
mh_code: multihash_codetable::Code,
block: &fvm_ipld_blockstore::Block<D>,
) -> anyhow::Result<Cid>
where
Self: Sized,
D: AsRef<[u8]>,
{
match self {
Self::Left(v) => v.put(mh_code, block),
Self::Right(v) => v.put(mh_code, block),
}
}

#[allow(clippy::disallowed_types)]
fn put_many<D, I>(&self, blocks: I) -> anyhow::Result<()>
where
Self: Sized,
D: AsRef<[u8]>,
I: IntoIterator<Item = (multihash_codetable::Code, fvm_ipld_blockstore::Block<D>)>,
{
match self {
Self::Left(v) => v.put_many(blocks),
Self::Right(v) => v.put_many(blocks),
}
}

fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
where
Self: Sized,
D: AsRef<[u8]>,
I: IntoIterator<Item = (Cid, D)>,
{
match self {
Self::Left(v) => v.put_many_keyed(blocks),
Self::Right(v) => v.put_many_keyed(blocks),
}
}

fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
match self {
Self::Left(v) => v.get(k),
Self::Right(v) => v.get(k),
}
}

fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
match self {
Self::Left(v) => v.put_keyed(k, block),
Self::Right(v) => v.put_keyed(k, block),
}
}
}
Loading