Skip to content

Commit be6ce4c

Browse files
authored
Merge branch 'main' into elmattic/fix-chain-get-events
2 parents 3b526b3 + 286c214 commit be6ce4c

File tree

6 files changed

+217
-5
lines changed

6 files changed

+217
-5
lines changed

docs/docs/users/reference/env_variables.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ process.
4747
| `FOREST_TRACE_FILTER_MAX_RESULT` | positive integer | 500 | 1000 | Sets the maximum results returned per request by `trace_filter` |
4848
| `FOREST_CHAIN_INDEXER_ENABLED` | 1 or true | false | 1 | Whether or not to index the chain to support the Ethereum RPC API |
4949
| `FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE` | positive integer | 100 | 42 | The size of an internal cache of tipsets to messages |
50+
| `FOREST_STATE_MIGRATION_DB_WRITE_BUFFER` | non-negative integer | 10000 | 100000 | The size of db write buffer for state migration (`~10MB` RAM per `10k` buffer) |
5051

5152
### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`
5253

docs/yarn.lock

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10271,8 +10271,8 @@ __metadata:
1027110271
linkType: hard
1027210272

1027310273
"http-proxy-middleware@npm:^2.0.3":
10274-
version: 2.0.7
10275-
resolution: "http-proxy-middleware@npm:2.0.7"
10274+
version: 2.0.9
10275+
resolution: "http-proxy-middleware@npm:2.0.9"
1027610276
dependencies:
1027710277
"@types/http-proxy": "npm:^1.17.8"
1027810278
http-proxy: "npm:^1.18.1"
@@ -10284,7 +10284,7 @@ __metadata:
1028410284
peerDependenciesMeta:
1028510285
"@types/express":
1028610286
optional: true
10287-
checksum: 10c0/8d00a61eb215b83826460b07489d8bb095368ec16e02a9d63e228dcf7524e7c20d61561e5476de1391aecd4ec32ea093279cdc972115b311f8e0a95a24c9e47e
10287+
checksum: 10c0/8e9032af625f7c9f2f0d318f6cdb14eb725cc16ffe7b4ccccea25cf591fa819bb7c3bb579e0b543e0ae9c73059b505a6d728290c757bff27bae526a6ed11c05e
1028810288
languageName: node
1028910289
linkType: hard
1029010290

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2019-2025 ChainSafe Systems
2+
// SPDX-License-Identifier: Apache-2.0, MIT
3+
4+
use ahash::{HashMap, HashMapExt};
5+
use cid::Cid;
6+
use fvm_ipld_blockstore::Blockstore;
7+
use itertools::Itertools;
8+
use parking_lot::RwLock;
9+
10+
pub struct BlockstoreWithWriteBuffer<DB: Blockstore> {
11+
inner: DB,
12+
buffer: RwLock<HashMap<Cid, Vec<u8>>>,
13+
buffer_capacity: usize,
14+
}
15+
16+
impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
17+
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
18+
if let Some(v) = self.buffer.read().get(k) {
19+
return Ok(Some(v.clone()));
20+
}
21+
self.inner.get(k)
22+
}
23+
24+
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
25+
Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
26+
}
27+
28+
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
29+
{
30+
let mut buffer = self.buffer.write();
31+
buffer.insert(*k, block.to_vec());
32+
}
33+
self.flush_buffer_if_needed()
34+
}
35+
}
36+
37+
impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
38+
pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
39+
Self {
40+
inner,
41+
buffer_capacity,
42+
buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
43+
}
44+
}
45+
46+
fn flush_buffer(&self) -> anyhow::Result<()> {
47+
let records = {
48+
let mut buffer = self.buffer.write();
49+
buffer.drain().collect_vec()
50+
};
51+
self.inner.put_many_keyed(records)
52+
}
53+
54+
fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
55+
if self.buffer.read().len() >= self.buffer_capacity {
56+
self.flush_buffer()
57+
} else {
58+
Ok(())
59+
}
60+
}
61+
}
62+
63+
impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
64+
fn drop(&mut self) {
65+
if let Err(e) = self.flush_buffer() {
66+
tracing::warn!("{e}");
67+
}
68+
}
69+
}
70+
71+
#[cfg(test)]
72+
mod tests {
73+
use super::*;
74+
use crate::{db::MemoryDB, utils::rand::forest_rng};
75+
use fvm_ipld_encoding::DAG_CBOR;
76+
use multihash_codetable::Code::Blake2b256;
77+
use multihash_codetable::MultihashDigest as _;
78+
use rand::Rng as _;
79+
use std::sync::Arc;
80+
81+
#[test]
82+
fn test_buffer_flush() {
83+
const BUFFER_SIZE: usize = 10;
84+
const N_RECORDS: usize = 15;
85+
let mem_db = Arc::new(MemoryDB::default());
86+
let buf_db = BlockstoreWithWriteBuffer::new_with_capacity(mem_db.clone(), BUFFER_SIZE);
87+
let mut records = Vec::with_capacity(N_RECORDS);
88+
for _ in 0..N_RECORDS {
89+
let mut record = [0; 1024];
90+
forest_rng().fill(&mut record);
91+
let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
92+
records.push((key, record));
93+
}
94+
95+
buf_db.put_many_keyed(records.clone()).unwrap();
96+
97+
for (i, (k, v)) in records.iter().enumerate() {
98+
assert!(buf_db.has(k).unwrap());
99+
assert_eq!(buf_db.get(k).unwrap().unwrap().as_slice(), v);
100+
if i < BUFFER_SIZE {
101+
assert!(mem_db.has(k).unwrap());
102+
assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
103+
} else {
104+
assert!(!mem_db.has(k).unwrap());
105+
}
106+
}
107+
108+
drop(buf_db);
109+
110+
for (k, v) in records.iter() {
111+
assert!(mem_db.has(k).unwrap());
112+
assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
113+
}
114+
}
115+
}

src/db/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
// Copyright 2019-2025 ChainSafe Systems
22
// SPDX-License-Identifier: Apache-2.0, MIT
33

4+
mod blockstore_with_write_buffer;
45
pub mod car;
56
mod memory;
67
pub mod parity_db;
78
pub mod parity_db_config;
89

910
mod gc;
1011
pub mod ttl;
12+
pub use blockstore_with_write_buffer::BlockstoreWithWriteBuffer;
1113
pub use gc::MarkAndSweep;
1214
pub use memory::MemoryDB;
1315
use setting_keys::ETH_MAPPING_UP_TO_DATE_KEY;

src/state_migration/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::{
66
atomic::{self, AtomicBool},
77
};
88

9+
use crate::db::BlockstoreWithWriteBuffer;
910
use crate::networks::{ChainConfig, Height, NetworkChain};
1011
use crate::shim::clock::ChainEpoch;
1112
use crate::shim::state_tree::StateRoot;
@@ -92,6 +93,12 @@ pub fn run_state_migrations<DB>(
9293
where
9394
DB: Blockstore + Send + Sync,
9495
{
96+
// ~10MB RAM per 10k buffer
97+
let db_write_buffer = match std::env::var("FOREST_STATE_MIGRATION_DB_WRITE_BUFFER") {
98+
Ok(v) => v.parse().ok(),
99+
_ => None,
100+
}
101+
.unwrap_or(10000);
95102
let mappings = get_migrations(&chain_config.network);
96103

97104
// Make sure bundle is defined.
@@ -115,7 +122,11 @@ where
115122
if epoch == chain_config.epoch(height) {
116123
tracing::info!("Running {height} migration at epoch {epoch}");
117124
let start_time = std::time::Instant::now();
118-
let new_state = migrate(chain_config, db, parent_state, epoch)?;
125+
let db = Arc::new(BlockstoreWithWriteBuffer::new_with_capacity(
126+
db.clone(),
127+
db_write_buffer,
128+
));
129+
let new_state = migrate(chain_config, &db, parent_state, epoch)?;
119130
let elapsed = start_time.elapsed();
120131
// `new_state_actors` is the Go state migration output, log for comparision
121132
let new_state_actors = db

src/tool/subcommands/shed_cmd/migration.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ use std::time::Instant;
77

88
use cid::Cid;
99
use clap::Args;
10+
use fvm_ipld_blockstore::Blockstore;
1011
use itertools::Itertools;
1112

13+
use crate::db::BlockstoreWithWriteBuffer;
1214
use crate::utils::db::CborStoreExt;
1315
use crate::{
1416
blocks::CachingBlockHeader,
@@ -36,6 +38,9 @@ pub struct MigrateStateCommand {
3638
/// Filecoin network chain
3739
#[arg(long, required = true)]
3840
chain: NetworkChain,
41+
/// Size of database write buffer, use 0 to disable write buffer
42+
#[arg(long, default_value_t = 10000)]
43+
db_write_buffer: usize,
3944
}
4045

4146
impl MigrateStateCommand {
@@ -45,6 +50,7 @@ impl MigrateStateCommand {
4550
block_to_look_back,
4651
db,
4752
chain,
53+
db_write_buffer,
4854
} = self;
4955
let db = {
5056
let db = if let Some(db) = db {
@@ -53,7 +59,15 @@ impl MigrateStateCommand {
5359
let (_, config) = read_config(None, Some(chain.clone()))?;
5460
db_root(&chain_path(&config))?
5561
};
56-
load_db(&db)?
62+
let db = load_db(&db)?;
63+
Arc::new(if db_write_buffer > 0 {
64+
Either::Left(BlockstoreWithWriteBuffer::new_with_capacity(
65+
db,
66+
db_write_buffer,
67+
))
68+
} else {
69+
Either::Right(db)
70+
})
5771
};
5872
let block: CachingBlockHeader = db.get_cbor_required(&block_to_look_back)?;
5973
let chain_config = Arc::new(ChainConfig::from_chain(&chain));
@@ -91,3 +105,72 @@ pub(super) fn load_db(db_root: &Path) -> anyhow::Result<Arc<ManyCar<ParityDb>>>
91105
load_all_forest_cars(&db, &forest_car_db_dir)?;
92106
Ok(Arc::new(db))
93107
}
108+
109+
enum Either<A: Blockstore, B: Blockstore> {
110+
Left(A),
111+
Right(B),
112+
}
113+
114+
impl<A: Blockstore, B: Blockstore> Blockstore for Either<A, B> {
115+
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
116+
match self {
117+
Self::Left(v) => v.has(k),
118+
Self::Right(v) => v.has(k),
119+
}
120+
}
121+
122+
#[allow(clippy::disallowed_types)]
123+
fn put<D>(
124+
&self,
125+
mh_code: multihash_codetable::Code,
126+
block: &fvm_ipld_blockstore::Block<D>,
127+
) -> anyhow::Result<Cid>
128+
where
129+
Self: Sized,
130+
D: AsRef<[u8]>,
131+
{
132+
match self {
133+
Self::Left(v) => v.put(mh_code, block),
134+
Self::Right(v) => v.put(mh_code, block),
135+
}
136+
}
137+
138+
#[allow(clippy::disallowed_types)]
139+
fn put_many<D, I>(&self, blocks: I) -> anyhow::Result<()>
140+
where
141+
Self: Sized,
142+
D: AsRef<[u8]>,
143+
I: IntoIterator<Item = (multihash_codetable::Code, fvm_ipld_blockstore::Block<D>)>,
144+
{
145+
match self {
146+
Self::Left(v) => v.put_many(blocks),
147+
Self::Right(v) => v.put_many(blocks),
148+
}
149+
}
150+
151+
fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
152+
where
153+
Self: Sized,
154+
D: AsRef<[u8]>,
155+
I: IntoIterator<Item = (Cid, D)>,
156+
{
157+
match self {
158+
Self::Left(v) => v.put_many_keyed(blocks),
159+
Self::Right(v) => v.put_many_keyed(blocks),
160+
}
161+
}
162+
163+
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
164+
match self {
165+
Self::Left(v) => v.get(k),
166+
Self::Right(v) => v.get(k),
167+
}
168+
}
169+
170+
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
171+
match self {
172+
Self::Left(v) => v.put_keyed(k, block),
173+
Self::Right(v) => v.put_keyed(k, block),
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)