Skip to content

fix(cubestore): Delete old metastore snapshots in batches, after upda… #9647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6544,6 +6544,101 @@ mod tests {
let _ = fs::remove_dir_all(remote_store_path.clone());
}

#[tokio::test]
async fn delete_old_snapshots() {
let config = Config::test("delete_old_snapshots").update_config(|mut obj| {
obj.metastore_snapshots_lifetime = 1;
obj.minimum_metastore_snapshots_count = 2;
obj
});
let store_path = env::current_dir()
.unwrap()
.join("delete_old_snapshots-local");
let remote_store_path = env::current_dir()
.unwrap()
.join("delete_old_snapshots-remote");
let _ = fs::remove_dir_all(&store_path);
let _ = fs::remove_dir_all(&remote_store_path);
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
{
let meta_store = RocksMetaStore::new(
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
)
.unwrap();

// let list = remote_fs.list("metastore-".to_owned()).await.unwrap();
// assert_eq!(0, list.len(), "remote fs list: {:?}", list);

let uploaded =
BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
.await
.unwrap();
assert_eq!(uploaded.len(), 0);

meta_store
.create_schema("foo1".to_string(), false)
.await
.unwrap();

meta_store.upload_check_point().await.unwrap();
let uploaded1 =
BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
.await
.unwrap();

assert_eq!(uploaded1.len(), 1);

meta_store
.create_schema("foo2".to_string(), false)
.await
.unwrap();

meta_store.upload_check_point().await.unwrap();

let uploaded2 =
BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
.await
.unwrap();

assert_eq!(uploaded2.len(), 2);

meta_store
.create_schema("foo3".to_string(), false)
.await
.unwrap();

meta_store.upload_check_point().await.unwrap();

let uploaded3 =
BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
.await
.unwrap();

assert_eq!(uploaded3.len(), 3);

meta_store
.create_schema("foo4".to_string(), false)
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(1100)).await;
meta_store.upload_check_point().await.unwrap();

let uploaded4 =
BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore")
.await
.unwrap();

// Should have 2 remaining snapshots because 2 is the minimum.
assert_eq!(uploaded4.len(), 2);
}

let _ = fs::remove_dir_all(&store_path);
let _ = fs::remove_dir_all(&remote_store_path);
}

#[tokio::test]
async fn swap_active_partitions() {
let config = Config::test("swap_active_partitions");
Expand Down
143 changes: 85 additions & 58 deletions rust/cubestore/cubestore/src/metastore/rocks_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ use crate::CubeError;
use async_trait::async_trait;
use datafusion::cube_ext;
use futures::future::join_all;
use itertools::Itertools;
use log::{error, info};
use regex::Regex;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::collections::{BTreeSet, HashMap};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -57,6 +55,7 @@ pub struct BaseRocksStoreFs {
name: &'static str,
minimum_snapshots_count: u64,
snapshots_lifetime: u64,
remote_files_cleanup_batch_size: u64,
}

impl BaseRocksStoreFs {
Expand All @@ -66,11 +65,13 @@ impl BaseRocksStoreFs {
) -> Arc<Self> {
let minimum_snapshots_count = config.minimum_metastore_snapshots_count();
let snapshots_lifetime = config.metastore_snapshots_lifetime();
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
Arc::new(Self {
remote_fs,
name: "metastore",
minimum_snapshots_count,
snapshots_lifetime,
remote_files_cleanup_batch_size,
})
}
pub fn new_for_cachestore(
Expand All @@ -79,11 +80,13 @@ impl BaseRocksStoreFs {
) -> Arc<Self> {
let minimum_snapshots_count = config.minimum_cachestore_snapshots_count();
let snapshots_lifetime = config.cachestore_snapshots_lifetime();
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
Arc::new(Self {
remote_fs,
name: "cachestore",
minimum_snapshots_count,
snapshots_lifetime,
remote_files_cleanup_batch_size,
})
}

Expand Down Expand Up @@ -135,63 +138,89 @@ impl BaseRocksStoreFs {

Ok(upload_results)
}

// Exposed for tests
pub async fn list_files_by_snapshot(
remote_fs: &dyn RemoteFs,
name: &str,
) -> Result<HashMap<u128, Vec<String>>, CubeError> {
let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?;
// Log a debug statement so that we can rule out the filename list itself being too large for memory.
log::debug!(
"Listed existing {} files, count = {}",
name,
existing_metastore_files.len()
);
let mut snapshot_map = HashMap::<u128, Vec<String>>::new();
for existing in existing_metastore_files.into_iter() {
let path = existing.split("/").nth(0).map(|p| {
u128::from_str(
&p.replace(&format!("{}-", name), "")
.replace("-index-logs", "")
.replace("-logs", ""),
)
});
if let Some(Ok(millis)) = path {
snapshot_map
.entry(millis)
.or_insert(Vec::new())
.push(existing);
}
}
Ok(snapshot_map)
}

pub async fn delete_old_snapshots(&self) -> Result<Vec<String>, CubeError> {
let existing_metastore_files = self.remote_fs.list(format!("{}-", self.name)).await?;
let candidates = existing_metastore_files
.iter()
.filter_map(|existing| {
let path = existing.split("/").nth(0).map(|p| {
u128::from_str(
&p.replace(&format!("{}-", self.name), "")
.replace("-index-logs", "")
.replace("-logs", ""),
)
});
if let Some(Ok(millis)) = path {
Some((existing, millis))
} else {
None
}
})
.collect::<Vec<_>>();
let candidates_map =
Self::list_files_by_snapshot(self.remote_fs.as_ref(), &self.name).await?;

let lifetime_ms = (self.snapshots_lifetime as u128) * 1000;
let min_snapshots_count = self.minimum_snapshots_count as usize;

let mut snapshots_list = candidates
.iter()
.map(|(_, ms)| ms.to_owned())
.unique()
.collect::<Vec<_>>();
snapshots_list.sort_unstable_by(|a, b| b.cmp(a));
// snapshots_list sorted by oldest first.
let mut snapshots_list: Vec<u128> = candidates_map.keys().cloned().collect::<Vec<_>>();
snapshots_list.sort_unstable();

let snapshots_to_delete = snapshots_list
.into_iter()
.skip(min_snapshots_count)
.filter(|ms| {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
- ms
> lifetime_ms
})
.collect::<HashSet<_>>();
if snapshots_list.len() <= min_snapshots_count {
return Ok(vec![]);
}
snapshots_list.truncate(snapshots_list.len() - min_snapshots_count);

if !snapshots_to_delete.is_empty() {
let to_delete = candidates
.into_iter()
.filter_map(|(path, ms)| {
if snapshots_to_delete.contains(&ms) {
Some(path.to_owned())
} else {
None
}
})
.unique()
.collect::<Vec<_>>();
let cutoff_time_ms: u128 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
- lifetime_ms;

while !snapshots_list.is_empty() && *snapshots_list.last().unwrap() >= cutoff_time_ms {
snapshots_list.pop();
}

let snapshots_list = snapshots_list;

if snapshots_list.is_empty() {
// Avoid empty join_all, iteration, etc.
return Ok(vec![]);
}

let mut to_delete: Vec<String> = Vec::new();

let mut candidates_map = candidates_map;
for ms in snapshots_list {
to_delete.append(
candidates_map
.get_mut(&ms)
.expect("delete_old_snapshots candidates_map lookup should succeed"),
);
}

for batch in to_delete.chunks(
self.remote_files_cleanup_batch_size
.try_into()
.unwrap_or(usize::MAX),
) {
for v in join_all(
to_delete
batch
.iter()
.map(|f| self.remote_fs.delete_file(f.to_string()))
.collect::<Vec<_>>(),
Expand All @@ -201,11 +230,9 @@ impl BaseRocksStoreFs {
{
v?;
}

Ok(to_delete)
} else {
Ok(vec![])
}

Ok(to_delete)
}

pub async fn is_remote_metadata_exists(&self) -> Result<bool, CubeError> {
Expand Down Expand Up @@ -367,10 +394,10 @@ impl MetaStoreFs for BaseRocksStoreFs {
self.upload_snapsots_files(&remote_path, &checkpoint_path)
.await?;

self.delete_old_snapshots().await?;

self.write_metastore_current(&remote_path).await?;

self.delete_old_snapshots().await?;

Ok(())
}
async fn load_metastore_logs(
Expand Down
Loading