Skip to content

Commit f52b2fd

Browse files
committed
feat: snapshot GC
1 parent d5e7f79 commit f52b2fd

File tree

14 files changed

+317
-32
lines changed

14 files changed

+317
-32
lines changed

f3-sidecar/ec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ type ForestEC struct {
1717
closer jsonrpc.ClientCloser
1818
}
1919

20-
func NewForestEC(rpcEndpoint, jwt string) (ForestEC, error) {
20+
func NewForestEC(ctx context.Context, rpcEndpoint, jwt string) (ForestEC, error) {
2121
f3api := F3Api{}
2222
headers := make(http.Header)
2323
isJwtProvided := len(jwt) > 0
2424
if isJwtProvided {
2525
headers.Add("Authorization", fmt.Sprintf("Bearer %s", jwt))
2626
}
27-
closer, err := jsonrpc.NewClient(context.Background(), rpcEndpoint, "F3", &f3api, headers)
27+
closer, err := jsonrpc.NewClient(ctx, rpcEndpoint, "F3", &f3api, headers)
2828
if err != nil {
2929
return ForestEC{}, err
3030
}

f3-sidecar/ec_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ var (
1616
)
1717

1818
func init() {
19-
ec, err := NewForestEC("http://127.0.0.1:2345/rpc/v1", "")
19+
ec, err := NewForestEC(ctx, "http://127.0.0.1:2345/rpc/v1", "")
2020
if err != nil {
2121
panic(err)
2222
}

f3-sidecar/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
func run(ctx context.Context, rpcEndpoint string, jwt string, f3RpcEndpoint string, initialPowerTable string, bootstrapEpoch int64, finality int64, f3Root string, contract_manifest_poll_interval_seconds uint64) error {
2222
api := FilecoinApi{}
2323
isJwtProvided := len(jwt) > 0
24-
closer, err := jsonrpc.NewClient(context.Background(), rpcEndpoint, "Filecoin", &api, nil)
24+
closer, err := jsonrpc.NewClient(ctx, rpcEndpoint, "Filecoin", &api, nil)
2525
if err != nil {
2626
return err
2727
}
@@ -46,7 +46,7 @@ func run(ctx context.Context, rpcEndpoint string, jwt string, f3RpcEndpoint stri
4646
if err != nil {
4747
return err
4848
}
49-
ec, err := NewForestEC(rpcEndpoint, jwt)
49+
ec, err := NewForestEC(ctx, rpcEndpoint, jwt)
5050
if err != nil {
5151
return err
5252
}

src/chain/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
// SPDX-License-Identifier: Apache-2.0, MIT
33
pub mod store;
44
mod weight;
5-
use crate::blocks::Tipset;
5+
use crate::blocks::{Tipset, TipsetKey};
66
use crate::cid_collections::CidHashSet;
77
use crate::db::car::forest;
8+
use crate::db::{SettingsStore, SettingsStoreExt};
89
use crate::ipld::stream_chain;
910
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
1011
use crate::utils::stream::par_buffer;
@@ -16,6 +17,20 @@ use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
1617

1718
pub use self::{store::*, weight::*};
1819

20+
pub async fn export_from_head<D: Digest>(
21+
db: Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
22+
lookup_depth: ChainEpochDelta,
23+
writer: impl AsyncWrite + Unpin,
24+
seen: CidHashSet,
25+
skip_checksum: bool,
26+
) -> anyhow::Result<(Tipset, Option<digest::Output<D>>), Error> {
27+
let head_key = SettingsStoreExt::read_obj::<TipsetKey>(&db, crate::db::setting_keys::HEAD_KEY)?
28+
.context("chain head key not found")?;
29+
let head_ts = Tipset::load(&db, &head_key)?.context("chain head not found")?;
30+
let digest = export::<D>(db, &head_ts, lookup_depth, writer, seen, skip_checksum).await?;
31+
Ok((head_ts, digest))
32+
}
33+
1934
pub async fn export<D: Digest>(
2035
db: Arc<impl Blockstore + Send + Sync + 'static>,
2136
tipset: &Tipset,

src/cli/subcommands/chain_cmd.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// Copyright 2019-2025 ChainSafe Systems
22
// SPDX-License-Identifier: Apache-2.0, MIT
33

4+
mod prune;
5+
use prune::ChainPruneCommands;
6+
47
use crate::blocks::{Tipset, TipsetKey};
58
use crate::lotus_json::HasLotusJson;
69
use crate::message::ChainMessage;
@@ -68,6 +71,8 @@ pub enum ChainCommands {
6871
#[arg(short, long, aliases = ["yes", "no-confirm"], short_alias = 'y')]
6972
force: bool,
7073
},
74+
#[command(subcommand)]
75+
Prune(ChainPruneCommands),
7176
}
7277

7378
impl ChainCommands {
@@ -123,6 +128,7 @@ impl ChainCommands {
123128
.await?;
124129
Ok(())
125130
}
131+
Self::Prune(cmd) => cmd.run(client).await,
126132
}
127133
}
128134
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2019-2025 ChainSafe Systems
2+
// SPDX-License-Identifier: Apache-2.0, MIT
3+
4+
use crate::rpc::{self, RpcMethodExt, chain::ChainPruneSnapshot};
5+
use clap::Subcommand;
6+
use std::time::Duration;
7+
8+
#[derive(Debug, Subcommand)]
9+
pub enum ChainPruneCommands {
10+
/// Run snapshot GC
11+
Snap,
12+
}
13+
14+
impl ChainPruneCommands {
15+
pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
16+
match self {
17+
Self::Snap => {
18+
client
19+
.call(ChainPruneSnapshot::request(())?.with_timeout(Duration::MAX))
20+
.await?;
21+
}
22+
}
23+
24+
Ok(())
25+
}
26+
}

src/daemon/mod.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use crate::daemon::db_util::{
2121
};
2222
use crate::db::SettingsStore;
2323
use crate::db::car::ManyCar;
24-
use crate::db::{MarkAndSweep, MemoryDB, SettingsExt, ttl::EthMappingCollector};
24+
use crate::db::gc::{MarkAndSweep, SnapshotGarbageCollector};
25+
use crate::db::{MemoryDB, SettingsExt, ttl::EthMappingCollector};
2526
use crate::libp2p::{Libp2pService, PeerManager};
2627
use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider};
2728
use crate::networks::{self, ChainConfig};
@@ -44,6 +45,7 @@ use once_cell::sync::Lazy;
4445
use raw_sync_2::events::{Event, EventInit as _, EventState};
4546
use shared_memory::ShmemConf;
4647
use std::path::Path;
48+
use std::sync::OnceLock;
4749
use std::time::Duration;
4850
use std::{cmp, sync::Arc};
4951
use tempfile::{Builder, TempPath};
@@ -66,6 +68,8 @@ static IPC_PATH: Lazy<TempPath> = Lazy::new(|| {
6668
.into_temp_path()
6769
});
6870

71+
pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
72+
6973
// The parent process and the daemonized child communicate through an Event in
7074
// shared memory. The identity of the shared memory object is written to a
7175
// temporary file. The parent process is responsible for cleaning up the file
@@ -467,12 +471,12 @@ fn maybe_start_rpc_service(
467471
Ok(())
468472
}
469473

470-
fn maybe_start_f3_service(
471-
services: &mut JoinSet<anyhow::Result<()>>,
472-
opts: &CliOpts,
473-
config: &Config,
474-
ctx: &AppContext,
475-
) {
474+
fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) {
475+
// already running
476+
if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
477+
return;
478+
}
479+
476480
if !config.client.enable_rpc {
477481
if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
478482
tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
@@ -485,7 +489,7 @@ fn maybe_start_f3_service(
485489
let state_manager = &ctx.state_manager;
486490
let p2p_peer_id = ctx.p2p_peer_id;
487491
let admin_jwt = ctx.admin_jwt.clone();
488-
services.spawn_blocking({
492+
tokio::task::spawn_blocking({
489493
crate::rpc::f3::F3_LEASE_MANAGER
490494
.set(crate::rpc::f3::F3LeaseManager::new(
491495
state_manager.chain_config().network.clone(),
@@ -516,7 +520,6 @@ fn maybe_start_f3_service(
516520
std::env::var("FOREST_F3_ROOT")
517521
.unwrap_or(default_f3_root.display().to_string()),
518522
);
519-
Ok(())
520523
}
521524
});
522525
}
@@ -595,13 +598,41 @@ fn maybe_start_indexer_service(
595598
pub(super) async fn start(
596599
start_time: chrono::DateTime<chrono::Utc>,
597600
opts: CliOpts,
598-
mut config: Config,
601+
config: Config,
599602
shutdown_send: mpsc::Sender<()>,
600603
) -> anyhow::Result<()> {
601604
startup_init(&opts, &config)?;
605+
let (snap_gc, snap_gc_reboot_rx) = SnapshotGarbageCollector::new(&config)?;
606+
let snap_gc = Arc::new(snap_gc);
607+
GLOBAL_SNAPSHOT_GC
608+
.set(snap_gc.clone())
609+
.ok()
610+
.context("failed to set GLOBAL_SNAPSHOT_GC")?;
611+
loop {
612+
tokio::select! {
613+
_ = snap_gc_reboot_rx.recv_async() => {
614+
snap_gc.cleanup_before_reboot().await;
615+
}
616+
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx| {
617+
snap_gc.set_db(ctx.db.clone());
618+
}) => {
619+
break result
620+
}
621+
}
622+
}
623+
}
624+
625+
pub(super) async fn start_services(
626+
start_time: chrono::DateTime<chrono::Utc>,
627+
opts: &CliOpts,
628+
mut config: Config,
629+
shutdown_send: mpsc::Sender<()>,
630+
on_app_context_initialized: impl Fn(&AppContext),
631+
) -> anyhow::Result<()> {
602632
let mut services = JoinSet::new();
603-
maybe_start_track_peak_rss_service(&mut services, &opts);
604-
let ctx = AppContext::init(&opts, &config).await?;
633+
maybe_start_track_peak_rss_service(&mut services, opts);
634+
let ctx = AppContext::init(opts, &config).await?;
635+
on_app_context_initialized(&ctx);
605636
info!(
606637
"Using network :: {}",
607638
get_actual_chain_name(&ctx.network_name)
@@ -611,10 +642,8 @@ pub(super) async fn start(
611642
return Ok(());
612643
}
613644
let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
614-
615645
let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
616-
617-
let chain_follower = create_chain_follower(&opts, &p2p_service, mpool.clone(), &ctx)?;
646+
let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
618647

619648
info!(
620649
"Starting network:: {}",
@@ -631,20 +660,20 @@ pub(super) async fn start(
631660
&ctx,
632661
)?;
633662

634-
maybe_import_snapshot(&opts, &mut config, &ctx).await?;
663+
maybe_import_snapshot(opts, &mut config, &ctx).await?;
635664
if opts.halt_after_import {
636665
// Cancel all async services
637666
services.shutdown().await;
638667
return Ok(());
639668
}
640669
ctx.state_manager.populate_cache();
641670
maybe_start_metrics_service(&mut services, &config, &ctx).await?;
642-
maybe_start_gc_service(&mut services, &opts, &config, &ctx);
643-
maybe_start_f3_service(&mut services, &opts, &config, &ctx);
671+
maybe_start_gc_service(&mut services, opts, &config, &ctx);
672+
maybe_start_f3_service(opts, &config, &ctx);
644673
maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
645674
.await?;
646-
maybe_populate_eth_mappings_in_background(&mut services, &opts, config.clone(), &ctx);
647-
maybe_start_indexer_service(&mut services, &opts, &config, &ctx);
675+
maybe_populate_eth_mappings_in_background(&mut services, opts, config.clone(), &ctx);
676+
maybe_start_indexer_service(&mut services, opts, &config, &ctx);
648677
if !opts.stateless {
649678
ensure_proof_params_downloaded().await?;
650679
}

src/db/gc/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
//! depth-first search algorithm, with `O(V+E)` complexity, where V is the number of vertices and E
7070
//! is the number of edges.
7171
72+
mod snapshot;
73+
pub use snapshot::SnapshotGarbageCollector;
74+
7275
use crate::blocks::Tipset;
7376
use crate::chain::ChainEpochDelta;
7477

@@ -236,7 +239,7 @@ impl<DB: Blockstore + SettingsStore + GarbageCollectable<CidHashSet> + Sync + Se
236239
mod test {
237240
use crate::blocks::{CachingBlockHeader, Tipset};
238241
use crate::chain::{ChainEpochDelta, ChainStore};
239-
use crate::db::{GarbageCollectable, MarkAndSweep, MemoryDB, PersistentStore};
242+
use crate::db::{GarbageCollectable, MemoryDB, PersistentStore, gc::MarkAndSweep};
240243
use crate::message_pool::test_provider::{mock_block, mock_block_with_parents};
241244
use crate::networks::ChainConfig;
242245
use crate::shim::clock::ChainEpoch;

0 commit comments

Comments
 (0)