Skip to content

Commit 8d91a73

Browse files
committed
fix
1 parent e3eb005 commit 8d91a73

File tree

5 files changed

+78
-61
lines changed

5 files changed

+78
-61
lines changed

scripts/tests/calibnet_other_check.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fi
2727
forest_check_db_stats
2828
echo "Run snapshot GC"
2929
forest_run_snap_gc
30-
sleep 5
30+
forest_wait_api
3131
echo "Wait the node to sync"
3232
forest_wait_for_sync
3333
forest_check_db_stats

src/cli/subcommands/chain_cmd/prune.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@ use std::time::Duration;
88
#[derive(Debug, Subcommand)]
99
pub enum ChainPruneCommands {
1010
/// Run snapshot GC
11-
Snap,
11+
Snap {
12+
#[arg(long)]
13+
no_wait: bool,
14+
},
1215
}
1316

1417
impl ChainPruneCommands {
1518
pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
1619
match self {
17-
Self::Snap => {
20+
Self::Snap { no_wait } => {
1821
client
19-
.call(ChainPruneSnapshot::request(())?.with_timeout(Duration::MAX))
22+
.call(ChainPruneSnapshot::request((!no_wait,))?.with_timeout(Duration::MAX))
2023
.await?;
2124
}
2225
}

src/daemon/mod.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use raw_sync_2::events::{Event, EventInit as _, EventState};
4646
use shared_memory::ShmemConf;
4747
use std::path::Path;
4848
use std::sync::OnceLock;
49+
use std::sync::atomic::{AtomicBool, Ordering};
4950
use std::time::Duration;
5051
use std::{cmp, sync::Arc};
5152
use tempfile::{Builder, TempPath};
@@ -82,13 +83,19 @@ pub fn ipc_shmem_conf() -> ShmemConf {
8283
}
8384

8485
fn unblock_parent_process() -> anyhow::Result<()> {
85-
let shmem = ipc_shmem_conf().open()?;
86-
let (event, _) =
87-
unsafe { Event::from_existing(shmem.as_ptr()).map_err(|err| anyhow::anyhow!("{err}")) }?;
88-
89-
event
90-
.set(EventState::Signaled)
91-
.map_err(|err| anyhow::anyhow!("{err}"))
86+
static UNBLOCKED: AtomicBool = AtomicBool::new(false);
87+
if !UNBLOCKED.load(Ordering::Relaxed) {
88+
let shmem = ipc_shmem_conf().open()?;
89+
let (event, _) = unsafe {
90+
Event::from_existing(shmem.as_ptr()).map_err(|err| anyhow::anyhow!("{err}"))
91+
}?;
92+
93+
event
94+
.set(EventState::Signaled)
95+
.map_err(|err| anyhow::anyhow!("{err}"))?;
96+
UNBLOCKED.store(true, Ordering::Relaxed);
97+
}
98+
Ok(())
9299
}
93100

94101
/// Increase the file descriptor limit to a reasonable number.

src/db/gc/snapshot.rs

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,18 @@ where
8787
}
8888
}
8989

90-
pub fn trigger(&self) -> flume::Receiver<()> {
91-
let (progress_tx, progress_rx) = flume::unbounded();
92-
*self.progress_tx.write() = Some(progress_tx);
90+
pub fn trigger(&self) -> anyhow::Result<flume::Receiver<()>> {
91+
if self.running.load(Ordering::Relaxed) {
92+
anyhow::bail!("snap gc has already been running");
93+
}
94+
9395
if self.trigger_tx.try_send(()).is_err() {
94-
tracing::warn!("snap gc has already been triggered");
96+
anyhow::bail!("snap gc has already been triggered");
9597
}
96-
progress_rx
98+
99+
let (progress_tx, progress_rx) = flume::unbounded();
100+
*self.progress_tx.write() = Some(progress_tx);
101+
Ok(progress_rx)
97102
}
98103

99104
async fn export_snapshot(&self) -> anyhow::Result<()> {
@@ -146,52 +151,54 @@ where
146151
}
147152
}
148153
if let Some(blessed_lite_snapshot) = { self.blessed_lite_snapshot.read().clone() } {
149-
let mut opts = ParityDb::to_options(self.db_root_dir.clone(), &self.db_config);
150-
for col in [
151-
DbColumn::GraphDagCborBlake2b256 as u8,
152-
DbColumn::GraphFull as u8,
153-
] {
154-
let start = Instant::now();
155-
tracing::info!("pruning parity-db column {col}...");
156-
loop {
157-
match parity_db::Db::reset_column(&mut opts, col, None) {
158-
Ok(_) => break,
159-
Err(_) => {
160-
tokio::time::sleep(Duration::from_secs(1)).await;
154+
if blessed_lite_snapshot.is_file() {
155+
let mut opts = ParityDb::to_options(self.db_root_dir.clone(), &self.db_config);
156+
for col in [
157+
DbColumn::GraphDagCborBlake2b256 as u8,
158+
DbColumn::GraphFull as u8,
159+
] {
160+
let start = Instant::now();
161+
tracing::info!("pruning parity-db column {col}...");
162+
loop {
163+
match parity_db::Db::reset_column(&mut opts, col, None) {
164+
Ok(_) => break,
165+
Err(_) => {
166+
tokio::time::sleep(Duration::from_secs(1)).await;
167+
}
161168
}
162169
}
170+
tracing::info!(
171+
"pruned parity-db column {col}, took {}",
172+
humantime::format_duration(start.elapsed())
173+
);
163174
}
164-
tracing::info!(
165-
"pruned parity-db column {col}, took {}",
166-
humantime::format_duration(start.elapsed())
167-
);
168-
}
169175

170-
for car_to_remove in walkdir::WalkDir::new(&self.car_db_dir)
171-
.max_depth(1)
172-
.into_iter()
173-
.filter_map(|entry| {
174-
if let Ok(entry) = entry {
175-
if entry.path() != blessed_lite_snapshot.as_path() {
176-
if let Some(filename) = entry.file_name().to_str() {
177-
if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
178-
return Some(entry.into_path());
176+
for car_to_remove in walkdir::WalkDir::new(&self.car_db_dir)
177+
.max_depth(1)
178+
.into_iter()
179+
.filter_map(|entry| {
180+
if let Ok(entry) = entry {
181+
if entry.path() != blessed_lite_snapshot.as_path() {
182+
if let Some(filename) = entry.file_name().to_str() {
183+
if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
184+
return Some(entry.into_path());
185+
}
179186
}
180187
}
181188
}
182-
}
183-
None
184-
})
185-
{
186-
match std::fs::remove_file(&car_to_remove) {
187-
Ok(_) => {
188-
tracing::info!("deleted car db at {}", car_to_remove.display());
189-
}
190-
Err(e) => {
191-
tracing::warn!(
192-
"failed to delete car db at {}: {e}",
193-
car_to_remove.display()
194-
);
189+
None
190+
})
191+
{
192+
match std::fs::remove_file(&car_to_remove) {
193+
Ok(_) => {
194+
tracing::info!("deleted car db at {}", car_to_remove.display());
195+
}
196+
Err(e) => {
197+
tracing::warn!(
198+
"failed to delete car db at {}: {e}",
199+
car_to_remove.display()
200+
);
201+
}
195202
}
196203
}
197204
}

src/rpc/methods/chain.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,22 +196,22 @@ impl RpcMethod<1> for ChainGetMessagesInTipset {
196196
}
197197

198198
pub enum ChainPruneSnapshot {}
199-
impl RpcMethod<0> for ChainPruneSnapshot {
199+
impl RpcMethod<1> for ChainPruneSnapshot {
200200
const NAME: &'static str = "Forest.SnapshotGC";
201-
const PARAM_NAMES: [&'static str; 0] = [];
201+
const PARAM_NAMES: [&'static str; 1] = ["blocking"];
202202
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
203203
const PERMISSION: Permission = Permission::Admin;
204204

205-
type Params = ();
205+
type Params = (bool,);
206206
type Ok = ();
207207

208208
async fn handle(
209209
_ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
210-
(): Self::Params,
210+
(blocking,): Self::Params,
211211
) -> Result<Self::Ok, ServerError> {
212212
if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
213-
let progress_rx = gc.trigger();
214-
while progress_rx.recv_async().await.is_ok() {}
213+
let progress_rx = gc.trigger()?;
214+
while blocking && progress_rx.recv_async().await.is_ok() {}
215215
Ok(())
216216
} else {
217217
Err(anyhow::anyhow!("snapshot gc is not enabled").into())

0 commit comments

Comments
 (0)