Skip to content

Commit 13f8e4f

Browse files
committed
rellocate code and add tests
1 parent afe1dc7 commit 13f8e4f

File tree

5 files changed

+121
-67
lines changed

5 files changed

+121
-67
lines changed

docs/docs/users/reference/env_variables.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ process.
4747
| `FOREST_TRACE_FILTER_MAX_RESULT` | positive integer | 500 | 1000 | Sets the maximum results returned per request by `trace_filter` |
4848
| `FOREST_CHAIN_INDEXER_ENABLED` | 1 or true | false | 1 | Whether or not to index the chain to support the Ethereum RPC API |
4949
| `FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE` | positive integer | 100 | 42 | The size of an internal cache of tipsets to messages |
50-
| `FOREST_STATE_MIGRATION_DB_WRITE_BUFFER` | non-negative integer | 10000 | 100000 | The size of db write buffer for state migration (~10MB memory per 10k buffer) |
50+
| `FOREST_STATE_MIGRATION_DB_WRITE_BUFFER` | non-negative integer | 10000 | 100000 | The size of db write buffer for state migration (`~10MB` RAM per `10k` buffer) |
5151

5252
### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`
5353

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2019-2025 ChainSafe Systems
2+
// SPDX-License-Identifier: Apache-2.0, MIT
3+
4+
use ahash::{HashMap, HashMapExt};
5+
use cid::Cid;
6+
use fvm_ipld_blockstore::Blockstore;
7+
use itertools::Itertools;
8+
use parking_lot::RwLock;
9+
10+
pub struct BlockstoreWithWriteBuffer<DB: Blockstore> {
11+
inner: DB,
12+
buffer: RwLock<HashMap<Cid, Vec<u8>>>,
13+
buffer_capacity: usize,
14+
}
15+
16+
impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
17+
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
18+
if let Some(v) = self.buffer.read().get(k) {
19+
return Ok(Some(v.clone()));
20+
}
21+
self.inner.get(k)
22+
}
23+
24+
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
25+
Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
26+
}
27+
28+
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
29+
{
30+
let mut buffer = self.buffer.write();
31+
buffer.insert(*k, block.to_vec());
32+
}
33+
self.flush_buffer_if_needed()
34+
}
35+
}
36+
37+
impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
38+
pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
39+
Self {
40+
inner,
41+
buffer_capacity,
42+
buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
43+
}
44+
}
45+
46+
fn flush_buffer(&self) -> anyhow::Result<()> {
47+
let records = {
48+
let mut buffer = self.buffer.write();
49+
buffer.drain().collect_vec()
50+
};
51+
self.inner.put_many_keyed(records)
52+
}
53+
54+
fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
55+
if self.buffer.read().len() >= self.buffer_capacity {
56+
self.flush_buffer()
57+
} else {
58+
Ok(())
59+
}
60+
}
61+
}
62+
63+
impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
64+
fn drop(&mut self) {
65+
if let Err(e) = self.flush_buffer() {
66+
tracing::warn!("{e}");
67+
}
68+
}
69+
}
70+
71+
#[cfg(test)]
72+
mod tests {
73+
use super::*;
74+
use crate::{db::MemoryDB, utils::rand::forest_rng};
75+
use fvm_ipld_encoding::DAG_CBOR;
76+
use multihash_codetable::Code::Blake2b256;
77+
use multihash_codetable::MultihashDigest as _;
78+
use rand::Rng as _;
79+
use std::sync::Arc;
80+
81+
#[test]
82+
fn test_buffer_flush() {
83+
const BUFFER_SIZE: usize = 10;
84+
const N_RECORDS: usize = 15;
85+
let mem_db = Arc::new(MemoryDB::default());
86+
let buf_db = BlockstoreWithWriteBuffer::new_with_capacity(mem_db.clone(), BUFFER_SIZE);
87+
let mut records = Vec::with_capacity(N_RECORDS);
88+
for _ in 0..N_RECORDS {
89+
let mut record = [0; 1024];
90+
forest_rng().fill(&mut record);
91+
let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
92+
records.push((key, record));
93+
}
94+
95+
buf_db.put_many_keyed(records.clone()).unwrap();
96+
97+
for (i, (k, v)) in records.iter().enumerate() {
98+
assert!(buf_db.has(k).unwrap());
99+
assert_eq!(buf_db.get(k).unwrap().unwrap().as_slice(), v);
100+
if i < BUFFER_SIZE {
101+
assert!(mem_db.has(k).unwrap());
102+
assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
103+
} else {
104+
assert!(!mem_db.has(k).unwrap());
105+
}
106+
}
107+
108+
drop(buf_db);
109+
110+
for (k, v) in records.iter() {
111+
assert!(mem_db.has(k).unwrap());
112+
assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
113+
}
114+
}
115+
}

src/db/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
// Copyright 2019-2025 ChainSafe Systems
22
// SPDX-License-Identifier: Apache-2.0, MIT
33

4+
mod blockstore_with_write_buffer;
45
pub mod car;
56
mod memory;
67
pub mod parity_db;
78
pub mod parity_db_config;
89

910
mod gc;
1011
pub mod ttl;
12+
pub use blockstore_with_write_buffer::BlockstoreWithWriteBuffer;
1113
pub use gc::MarkAndSweep;
1214
pub use memory::MemoryDB;
1315
use setting_keys::ETH_MAPPING_UP_TO_DATE_KEY;

src/state_migration/mod.rs

Lines changed: 2 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ use std::sync::{
66
atomic::{self, AtomicBool},
77
};
88

9+
use crate::db::BlockstoreWithWriteBuffer;
910
use crate::networks::{ChainConfig, Height, NetworkChain};
1011
use crate::shim::clock::ChainEpoch;
1112
use crate::shim::state_tree::StateRoot;
12-
use ahash::{HashMap, HashMapExt};
1313
use cid::Cid;
1414
use fvm_ipld_blockstore::Blockstore;
1515
use fvm_ipld_encoding::CborStore;
16-
use itertools::Itertools;
17-
use parking_lot::RwLock;
1816

1917
pub(in crate::state_migration) mod common;
2018
mod nv17;
@@ -95,7 +93,7 @@ pub fn run_state_migrations<DB>(
9593
where
9694
DB: Blockstore + Send + Sync,
9795
{
98-
// ~10MB memory per 10k buffer
96+
// ~10MB RAM per 10k buffer
9997
let db_write_buffer = match std::env::var("FOREST_STATE_MIGRATION_DB_WRITE_BUFFER") {
10098
Ok(v) => v.parse().ok(),
10199
_ => None,
@@ -157,66 +155,5 @@ where
157155
Ok(None)
158156
}
159157

160-
pub(crate) struct BlockstoreWithWriteBuffer<DB: Blockstore> {
161-
inner: DB,
162-
buffer: RwLock<HashMap<Cid, Vec<u8>>>,
163-
buffer_capacity: usize,
164-
}
165-
166-
impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
167-
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
168-
if let Some(v) = self.buffer.read().get(k) {
169-
return Ok(Some(v.clone()));
170-
}
171-
self.inner.get(k)
172-
}
173-
174-
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
175-
Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
176-
}
177-
178-
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
179-
{
180-
let mut buffer = self.buffer.write();
181-
buffer.insert(*k, block.to_vec());
182-
}
183-
self.flush_buffer_if_needed()
184-
}
185-
}
186-
187-
impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
188-
pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
189-
Self {
190-
inner,
191-
buffer_capacity,
192-
buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
193-
}
194-
}
195-
196-
fn flush_buffer(&self) -> anyhow::Result<()> {
197-
let records = {
198-
let mut buffer = self.buffer.write();
199-
buffer.drain().collect_vec()
200-
};
201-
self.inner.put_many_keyed(records)
202-
}
203-
204-
fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
205-
if self.buffer.read().len() >= self.buffer_capacity {
206-
self.flush_buffer()
207-
} else {
208-
Ok(())
209-
}
210-
}
211-
}
212-
213-
impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
214-
fn drop(&mut self) {
215-
if let Err(e) = self.flush_buffer() {
216-
tracing::warn!("{e}");
217-
}
218-
}
219-
}
220-
221158
#[cfg(test)]
222159
mod tests;

src/tool/subcommands/shed_cmd/migration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use clap::Args;
1010
use fvm_ipld_blockstore::Blockstore;
1111
use itertools::Itertools;
1212

13-
use crate::state_migration::BlockstoreWithWriteBuffer;
13+
use crate::db::BlockstoreWithWriteBuffer;
1414
use crate::utils::db::CborStoreExt;
1515
use crate::{
1616
blocks::CachingBlockHeader,

0 commit comments

Comments
 (0)