Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

### Added

- [#6161](https://github.com/ChainSafe/forest/pull/6161) Added `forest-tool db import` subcommand.

- [#6166](https://github.com/ChainSafe/forest/pull/6166) Gate `JWT` expiration validation behind environment variable `FOREST_JWT_DISABLE_EXP_VALIDATION`.

### Changed
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ human_bytes = "0.4"
humantime = "2"
indexmap = { version = "2", features = ["serde"] }
indicatif = { version = "0.18", features = ["tokio"] }
integer-encoding = "4.0"
integer-encoding = { version = "4.0", features = ["tokio_async"] }
ipld-core = { version = "0.4", features = ["serde", "arb"] }
is-terminal = "0.4"
itertools = "0.14"
Expand Down
1 change: 1 addition & 0 deletions docs/docs/users/reference/cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ generate_markdown_section "forest-tool" "archive sync-bucket"
generate_markdown_section "forest-tool" "db"
generate_markdown_section "forest-tool" "db stats"
generate_markdown_section "forest-tool" "db destroy"
generate_markdown_section "forest-tool" "db import"

generate_markdown_section "forest-tool" "car"
generate_markdown_section "forest-tool" "car concat"
Expand Down
9 changes: 4 additions & 5 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::DAG_CBOR;
use multihash_derive::MultihashDigest as _;
use nunny::Vec as NonEmpty;
use std::fs::File;
use std::io::{Seek as _, SeekFrom};
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};

Expand Down Expand Up @@ -65,9 +64,9 @@ pub async fn export<D: Digest>(

/// Exports a Filecoin snapshot in v2 format
/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
pub async fn export_v2<D: Digest>(
pub async fn export_v2<D: Digest, F: Seek + Read>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
mut f3: Option<(Cid, File)>,
mut f3: Option<(Cid, F)>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
Expand Down Expand Up @@ -109,7 +108,7 @@ pub async fn export_v2<D: Digest>(
f3_data.seek(SeekFrom::Start(0))?;
prefix_data_frames.push({
let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
encoder.write_car_block(f3_cid, f3_data_len as _, &mut f3_data)?;
encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
anyhow::Ok((
vec![f3_cid],
finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
Expand Down
3 changes: 2 additions & 1 deletion src/chain/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
utils::db::CborStoreExt,
};
use sha2::{Digest as _, Sha256};
use std::fs::File;

#[test]
fn test_snapshot_version_cbor_serde() {
Expand Down Expand Up @@ -62,7 +63,7 @@ async fn test_export_inner(version: FilecoinSnapshotVersion) -> anyhow::Result<(
export::<Sha256>(&db, &head, 0, &mut car_bytes, None).await?
}
FilecoinSnapshotVersion::V2 => {
export_v2::<Sha256>(&db, None, &head, 0, &mut car_bytes, None).await?
export_v2::<Sha256, File>(&db, None, &head, 0, &mut car_bytes, None).await?
}
};

Expand Down
5 changes: 1 addition & 4 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ pub async fn load_actor_bundles_from_path(
"Bundle file not found at {}",
bundle_path.as_ref().display()
);
let mut car_stream = CarStream::new(tokio::io::BufReader::new(
tokio::fs::File::open(bundle_path.as_ref()).await?,
))
.await?;
let mut car_stream = CarStream::new_from_path(bundle_path.as_ref()).await?;

// Validate the bundle
let roots = HashSet::from_iter(car_stream.header_v1.roots.iter());
Expand Down
5 changes: 1 addition & 4 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,7 @@ async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()>
to = %to.display(),
"transcoding into forest car"
);
let car_stream = CarStream::new(tokio::io::BufReader::new(
tokio::fs::File::open(from).await?,
))
.await?;
let car_stream = CarStream::new_from_path(from).await?;
let roots = car_stream.header_v1.roots.clone();

let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
Expand Down
2 changes: 1 addition & 1 deletion src/db/car/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub type CacheKey = u64;
type FrameOffset = u64;

/// According to FRC-0108, v2 snapshots have exactly one root pointing to metadata
const V2_SNAPSHOT_ROOT_COUNT: usize = 1;
pub const V2_SNAPSHOT_ROOT_COUNT: usize = 1;

pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
Expand Down
29 changes: 15 additions & 14 deletions src/db/car/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,21 @@ impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
let mut cursor = positioned_io::Cursor::new(&reader);
let position = cursor.position();
let header_v2 = read_v2_header(&mut cursor)?;
let (limit_position, version) = if let Some(header_v2) = &header_v2 {
cursor.set_position(position.saturating_add(header_v2.data_offset as u64));
(
Some(
cursor
.stream_position()?
.saturating_add(header_v2.data_size as u64),
),
2,
)
} else {
cursor.set_position(position);
(None, 1)
};
let (limit_position, version) =
if let Some(header_v2) = &header_v2 {
cursor.set_position(position.saturating_add(
u64::try_from(header_v2.data_offset).map_err(io::Error::other)?,
));
(
Some(cursor.stream_position()?.saturating_add(
u64::try_from(header_v2.data_size).map_err(io::Error::other)?,
)),
2,
)
} else {
cursor.set_position(position);
(None, 1)
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.

let header_v1 = read_v1_header(&mut cursor)?;
// When indexing, we perform small reads of the length and CID before seeking
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl RpcMethod<1> for ForestChainExport {
}
}
};
crate::chain::export_v2::<Sha256>(
crate::chain::export_v2::<Sha256, _>(
&ctx.store_owned(),
f3_snap,
&start_ts,
Expand Down
8 changes: 2 additions & 6 deletions src/tool/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,11 +678,7 @@ async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> a
let mut f3_data = File::open(f3)?;
let f3_cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut f3_data)?;

let car_stream = CarStream::new(tokio::io::BufReader::new(
tokio::fs::File::open(&filecoin).await?,
))
.await?;

let car_stream = CarStream::new_from_path(&filecoin).await?;
let chain_head = car_stream.header_v1.roots.clone();

println!("f3 snapshot cid: {f3_cid}");
Expand Down Expand Up @@ -719,7 +715,7 @@ async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> a
crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
f3_data.seek(SeekFrom::Start(0))?;
encoder.write_car_block(f3_cid, f3_data_len as _, &mut f3_data)?;
encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
anyhow::Ok((
vec![f3_cid],
crate::db::car::forest::finalize_frame(
Expand Down
69 changes: 66 additions & 3 deletions src/tool/subcommands/db_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ use std::path::PathBuf;

use crate::cli::subcommands::prompt_confirm;
use crate::cli_shared::{chain_path, read_config};
use crate::db::db_engine::db_root;
use crate::db::BlockstoreWithWriteBuffer;
use crate::db::db_engine::{db_root, open_db};
use crate::networks::NetworkChain;
use crate::utils::db::car_stream::CarStream;
use clap::Subcommand;
use fvm_ipld_blockstore::Blockstore;
use indicatif::{ProgressBar, ProgressStyle};
use tokio_stream::StreamExt;
use tracing::error;

#[derive(Debug, Subcommand)]
Expand All @@ -33,13 +38,28 @@ pub enum DBCommands {
#[arg(long)]
chain: Option<NetworkChain>,
},
/// Import CAR files into the key-value store
Import {
/// Snapshot input paths. Supports `.car`, `.car.zst`, and `.forest.car.zst`.
#[arg(num_args = 1.., required = true)]
snapshot_files: Vec<PathBuf>,
/// Filecoin network chain
#[arg(long, required = true)]
chain: NetworkChain,
/// Optional path to the database folder that powers a Forest node
#[arg(long)]
db: Option<PathBuf>,
/// Skip block validation
#[arg(long)]
skip_validation: bool,
},
}

impl DBCommands {
pub async fn run(&self) -> anyhow::Result<()> {
pub async fn run(self) -> anyhow::Result<()> {
match self {
Self::Stats { config, chain } => {
use human_repr::HumanCount;
use human_repr::HumanCount as _;

let (_, config) = read_config(config.as_ref(), chain.clone())?;

Expand Down Expand Up @@ -80,6 +100,49 @@ impl DBCommands {
}
}
}
Self::Import {
snapshot_files,
chain,
db,
skip_validation: no_validation,
} => {
const DB_WRITE_BUFFER_CAPACITY: usize = 10000;

let db_root_path = if let Some(db) = db {
db
} else {
let (_, config) = read_config(None, Some(chain.clone()))?;
db_root(&chain_path(&config))?
};
println!("Opening parity-db at {}", db_root_path.display());
let db_writer = BlockstoreWithWriteBuffer::new_with_capacity(
open_db(db_root_path, &Default::default())?,
DB_WRITE_BUFFER_CAPACITY,
);

let pb = ProgressBar::new_spinner().with_style(
ProgressStyle::with_template("{spinner} {msg}")
.expect("indicatif template must be valid"),
);
pb.enable_steady_tick(std::time::Duration::from_millis(100));

let mut total = 0;
for snap in snapshot_files {
let mut car = CarStream::new_from_path(&snap).await?;
while let Some(b) = car.try_next().await? {
if !no_validation {
b.validate()?;
}
db_writer.put_keyed(&b.cid, &b.data)?;
total += 1;
let text = format!("{total} blocks imported");
pb.set_message(text);
}
}
drop(db_writer);
pb.finish();
Ok(())
Comment thread
hanabi1224 marked this conversation as resolved.
}
}
}
}
Loading
Loading