Skip to content

Commit bca2362

Browse files
authored
perf(cubestore) Delete old metastore and cachestore snapshots with constant memory usage on S3 (#9668)
* Adds a metastore deletion batch size parameter using CUBESTORE_SNAPSHOTS_DELETION_BATCH_SIZE * Makes internal S3 and GCS file listing operations use less intermediate memory. * With a new ExtendedRemoteFs interface, makes us use constant memory when deleting outdated metastore and cachestore snapshots on S3.
1 parent 3f71e56 commit bca2362

File tree

11 files changed

+443
-139
lines changed

11 files changed

+443
-139
lines changed

rust/cubestore/Cargo.lock

Lines changed: 24 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ chrono-tz = "0.8.2"
5252
lazy_static = "1.4.0"
5353
mockall = "0.8.1"
5454
async-std = "0.99"
55+
async-stream = "0.3.6"
5556
itertools = "0.11.0"
5657
bigdecimal = { version = "0.2.0", features = ["serde"] }
5758
# Right now, it's not possible to use the 0.33 release because it has bugs

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::remotefs::gcs::GCSRemoteFs;
2828
use crate::remotefs::minio::MINIORemoteFs;
2929
use crate::remotefs::queue::QueueRemoteFs;
3030
use crate::remotefs::s3::S3RemoteFs;
31-
use crate::remotefs::{LocalDirRemoteFs, RemoteFs};
31+
use crate::remotefs::{ExtendedRemoteFs, LocalDirRemoteFs, RemoteFs};
3232
use crate::scheduler::SchedulerImpl;
3333
use crate::sql::cache::SqlResultCache;
3434
use crate::sql::{SqlService, SqlServiceImpl};
@@ -518,6 +518,8 @@ pub trait ConfigObj: DIService {
518518

519519
fn dump_dir(&self) -> &Option<PathBuf>;
520520

521+
fn snapshots_deletion_batch_size(&self) -> u64;
522+
521523
fn minimum_metastore_snapshots_count(&self) -> u64;
522524

523525
fn metastore_snapshots_lifetime(&self) -> u64;
@@ -630,6 +632,7 @@ pub struct ConfigObjImpl {
630632
pub drop_ws_processing_messages_after_secs: u64,
631633
pub drop_ws_complete_messages_after_secs: u64,
632634
pub skip_kafka_parsing_errors: bool,
635+
pub snapshots_deletion_batch_size: u64,
633636
pub minimum_metastore_snapshots_count: u64,
634637
pub metastore_snapshots_lifetime: u64,
635638
pub minimum_cachestore_snapshots_count: u64,
@@ -953,6 +956,10 @@ impl ConfigObj for ConfigObjImpl {
953956
&self.dump_dir
954957
}
955958

959+
fn snapshots_deletion_batch_size(&self) -> u64 {
960+
self.snapshots_deletion_batch_size
961+
}
962+
956963
fn minimum_metastore_snapshots_count(&self) -> u64 {
957964
self.minimum_metastore_snapshots_count
958965
}
@@ -1486,6 +1493,11 @@ impl Config {
14861493
10 * 60,
14871494
),
14881495
skip_kafka_parsing_errors: env_parse("CUBESTORE_SKIP_KAFKA_PARSING_ERRORS", false),
1496+
// Presently, not useful to make more than upload_concurrency times constant
1497+
snapshots_deletion_batch_size: env_parse(
1498+
"CUBESTORE_SNAPSHOTS_DELETION_BATCH_SIZE",
1499+
80,
1500+
),
14891501
minimum_metastore_snapshots_count: env_parse(
14901502
"CUBESTORE_MINIMUM_METASTORE_SNAPSHOTS_COUNT",
14911503
5,
@@ -1652,6 +1664,7 @@ impl Config {
16521664
drop_ws_processing_messages_after_secs: 60,
16531665
drop_ws_complete_messages_after_secs: 10,
16541666
skip_kafka_parsing_errors: false,
1667+
snapshots_deletion_batch_size: 80,
16551668
minimum_metastore_snapshots_count: 3,
16561669
metastore_snapshots_lifetime: 24 * 3600,
16571670
minimum_cachestore_snapshots_count: 3,
@@ -1894,7 +1907,8 @@ impl Config {
18941907
self.injector
18951908
.register("cachestore_fs", async move |i| {
18961909
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
1897-
let original_remote_fs = i.get_service("original_remote_fs").await;
1910+
let original_remote_fs: Arc<dyn ExtendedRemoteFs> =
1911+
i.get_service("original_remote_fs").await;
18981912
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new_for_cachestore(
18991913
original_remote_fs,
19001914
i.get_service_typed().await,
@@ -1969,7 +1983,8 @@ impl Config {
19691983
self.injector
19701984
.register("metastore_fs", async move |i| {
19711985
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
1972-
let original_remote_fs = i.get_service("original_remote_fs").await;
1986+
let original_remote_fs: Arc<dyn ExtendedRemoteFs> =
1987+
i.get_service("original_remote_fs").await;
19731988
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new_for_metastore(
19741989
original_remote_fs,
19751990
i.get_service_typed().await,

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6546,8 +6546,9 @@ mod tests {
65466546

65476547
#[tokio::test]
65486548
async fn delete_old_snapshots() {
6549+
let metastore_snapshots_lifetime_secs = 1;
65496550
let config = Config::test("delete_old_snapshots").update_config(|mut obj| {
6550-
obj.metastore_snapshots_lifetime = 1;
6551+
obj.metastore_snapshots_lifetime = metastore_snapshots_lifetime_secs;
65516552
obj.minimum_metastore_snapshots_count = 2;
65526553
obj
65536554
});
@@ -6616,14 +6617,22 @@ mod tests {
66166617
.await
66176618
.unwrap();
66186619

6619-
assert_eq!(uploaded3.len(), 3);
6620+
assert_eq!(
6621+
uploaded3.len(),
6622+
3,
6623+
"uploaded3 keys: {}",
6624+
uploaded3.keys().join(", ")
6625+
);
66206626

66216627
meta_store
66226628
.create_schema("foo4".to_string(), false)
66236629
.await
66246630
.unwrap();
66256631

6626-
tokio::time::sleep(Duration::from_millis(1100)).await;
6632+
tokio::time::sleep(Duration::from_millis(
6633+
metastore_snapshots_lifetime_secs * 1000 + 100,
6634+
))
6635+
.await;
66276636
meta_store.upload_check_point().await.unwrap();
66286637

66296638
let uploaded4 =

0 commit comments

Comments
 (0)