diff --git a/Cargo.lock b/Cargo.lock index 7b6e404aa3..6e32f5edf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3120,7 +3120,7 @@ dependencies = [ [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/tikv/rust-rocksdb.git#224bed6ffa29ba3bbe9a91ef6bda7186200c59a8" +source = "git+https://github.com/tikv/rust-rocksdb.git#c92c467a3ab0b60484a0db83fcf89366791716cd" dependencies = [ "bindgen 0.65.1", "bzip2-sys", @@ -3139,7 +3139,7 @@ dependencies = [ [[package]] name = "libtitan_sys" version = "0.0.1" -source = "git+https://github.com/tikv/rust-rocksdb.git#224bed6ffa29ba3bbe9a91ef6bda7186200c59a8" +source = "git+https://github.com/tikv/rust-rocksdb.git#c92c467a3ab0b60484a0db83fcf89366791716cd" dependencies = [ "bzip2-sys", "cc", @@ -5384,7 +5384,7 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/tikv/rust-rocksdb.git#224bed6ffa29ba3bbe9a91ef6bda7186200c59a8" +source = "git+https://github.com/tikv/rust-rocksdb.git#c92c467a3ab0b60484a0db83fcf89366791716cd" dependencies = [ "libc 0.2.151", "librocksdb_sys", @@ -5857,7 +5857,6 @@ dependencies = [ "error_code", "fail", "file_system", - "fs2", "futures 0.3.15", "grpcio", "grpcio-health", @@ -7014,7 +7013,7 @@ dependencies = [ [[package]] name = "tikv" -version = "8.1.1" +version = "8.1.2" dependencies = [ "anyhow", "api_version", @@ -7248,6 +7247,7 @@ dependencies = [ "derive_more", "error_code", "fail", + "fs2", "futures 0.3.15", "futures-util", "gag", diff --git a/Cargo.toml b/Cargo.toml index 9ed2109cfa..7597f4f3d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tikv" -version = "8.1.1" +version = "8.1.2" authors = ["The TiKV Authors"] description = "A distributed transactional key-value database powered by Rust and Raft" license = "Apache-2.0" diff --git a/components/encryption/src/master_key/kms.rs b/components/encryption/src/master_key/kms.rs index db3c62194f..7c3f56ca0c 100644 --- a/components/encryption/src/master_key/kms.rs +++ b/components/encryption/src/master_key/kms.rs @@ -141,7 +141,11 @@ impl KmsBackend { self.kms_provider.decrypt_data_key(&ciphertext_key), ) })) - .map_err(cloud_convert_error("decrypt encrypted key failed".into()))?; + .map_err(|e| { + Error::WrongMasterKey(box_err!(cloud_convert_error( + "decrypt encrypted key failed".into(), + )(e))) + })?; let data_key = DataKeyPair { encrypted: ciphertext_key, plaintext: PlainKey::new(plaintext, CryptographyType::AesGcm256) @@ -154,6 +158,12 @@ impl KmsBackend { } } } + + #[cfg(test)] + fn clear_state(&mut self) { + let mut opt_state = self.state.lock().unwrap(); + *opt_state = None; + } } impl Backend for KmsBackend { @@ -173,7 +183,10 @@ impl Backend for KmsBackend { #[cfg(test)] mod fake { use async_trait::async_trait; - use cloud::{error::Result, kms::KmsProvider}; + use cloud::{ + error::{Error as CloudError, KmsError, Result}, + kms::KmsProvider, + }; use super::*; @@ -183,12 +196,14 @@ mod fake { #[derive(Debug)] pub struct FakeKms { plaintext_key: PlainKey, + should_decrypt_data_key_fail: bool, } impl FakeKms { - pub fn new(plaintext_key: Vec) -> Self { + pub fn new(plaintext_key: Vec, should_decrypt_data_key_fail: bool) -> Self { Self { plaintext_key: PlainKey::new(plaintext_key, CryptographyType::AesGcm256).unwrap(), + should_decrypt_data_key_fail, } } } @@ -204,7 +219,13 @@ mod fake { } async fn decrypt_data_key(&self, _ciphertext: &EncryptedKey) -> Result> { - Ok(vec![1u8, 32]) + if self.should_decrypt_data_key_fail { + Err(CloudError::KmsError(KmsError::WrongMasterKey(box_err!( + "wrong master key" + )))) + } else { + Ok(vec![1u8, 32]) + } } fn name(&self) -> &str { @@ -241,21 +262,36 @@ mod tests { assert_eq!(state2.cached(&encrypted2), true); } + const PLAIN_TEXT_HEX: &str = "25431587e9ecffc7c37f8d6d52a9bc3310651d46fb0e3bad2726c8f2db653749"; + const CIPHER_TEXT_HEX: &str = + "84e5f23f95648fa247cb28eef53abec947dbf05ac953734618111583840bd980"; + const PLAINKEY_HEX: &str = "c3d99825f2181f4808acd2068eac7441a65bd428f14d2aab43fefc0129091139"; + const IV_HEX: &str = "cafabd9672ca6c79a2fbdc22"; + + #[cfg(test)] + fn prepare_data_for_encrypt() -> (Iv, Vec, Vec, Vec) { + let iv = Vec::from_hex(IV_HEX).unwrap(); + let iv = Iv::from_slice(iv.as_slice()).unwrap(); + let pt = Vec::from_hex(PLAIN_TEXT_HEX).unwrap(); + let plainkey = Vec::from_hex(PLAINKEY_HEX).unwrap(); + let ct = Vec::from_hex(CIPHER_TEXT_HEX).unwrap(); + (iv, pt, plainkey, ct) + } + + #[cfg(test)] + fn prepare_kms_backend(plainkey: Vec, should_decrypt_data_key_fail: bool) -> KmsBackend { + KmsBackend::new(Box::new(FakeKms::new( + plainkey, + should_decrypt_data_key_fail, + ))) + .unwrap() + } + #[test] fn test_kms_backend() { - // See more http://csrc.nist.gov/groups/STM/cavp/documents/mac/gcmtestvectors.zip - let pt = Vec::from_hex("25431587e9ecffc7c37f8d6d52a9bc3310651d46fb0e3bad2726c8f2db653749") - .unwrap(); - let ct = Vec::from_hex("84e5f23f95648fa247cb28eef53abec947dbf05ac953734618111583840bd980") - .unwrap(); - let plainkey = - Vec::from_hex("c3d99825f2181f4808acd2068eac7441a65bd428f14d2aab43fefc0129091139") - .unwrap(); - - let iv = Vec::from_hex("cafabd9672ca6c79a2fbdc22").unwrap(); - - let backend = KmsBackend::new(Box::new(FakeKms::new(plainkey))).unwrap(); - let iv = Iv::from_slice(iv.as_slice()).unwrap(); + let (iv, pt, plainkey, ct) = prepare_data_for_encrypt(); + let backend = prepare_kms_backend(plainkey, false); + let encrypted_content = backend.encrypt_content(&pt, iv).unwrap(); assert_eq!(encrypted_content.get_content(), ct.as_slice()); let plaintext = backend.decrypt_content(&encrypted_content).unwrap(); @@ -293,4 +329,19 @@ mod tests { Error::Other(_) ); } + + #[test] + fn test_kms_backend_wrong_key() { + let (iv, pt, plainkey, ..) = prepare_data_for_encrypt(); + let mut backend = prepare_kms_backend(plainkey, true); + + let encrypted_content = backend.encrypt_content(&pt, iv).unwrap(); + // Clear the cached state to ensure that the subsequent + // backend.decrypt_content() invocation bypasses the cache and triggers the + // mocked FakeKMS::decrypt_data_key() function. + backend.clear_state(); + + let err = backend.decrypt_content(&encrypted_content).unwrap_err(); + assert_matches!(err, Error::WrongMasterKey(_)); + } } diff --git a/components/engine_rocks/src/properties.rs b/components/engine_rocks/src/properties.rs index b9032e53f8..01ca0447a0 100644 --- a/components/engine_rocks/src/properties.rs +++ b/components/engine_rocks/src/properties.rs @@ -567,6 +567,7 @@ pub fn get_range_stats( num_entries, num_versions: props.num_versions, num_rows: props.num_rows, + num_deletes: props.num_deletes, }) } diff --git a/components/engine_traits/src/misc.rs b/components/engine_traits/src/misc.rs index e08e7cad13..fa3efe43c4 100644 --- a/components/engine_traits/src/misc.rs +++ b/components/engine_traits/src/misc.rs @@ -57,12 +57,25 @@ pub trait StatisticsReporter { #[derive(Default)] pub struct RangeStats { - // The number of entries + // The number of entries in write cf. pub num_entries: u64, // The number of MVCC versions of all rows (num_entries - tombstones). pub num_versions: u64, // The number of rows. pub num_rows: u64, + // The number of MVCC deletes of all rows. + pub num_deletes: u64, +} + +impl RangeStats { + /// The number of redundant keys in the range. + /// It's calculated by `num_entries - num_versions + num_deleted`. + pub fn redundant_keys(&self) -> u64 { + // Consider the number of `mvcc_deletes` as the number of redundant keys. + self.num_entries + .saturating_sub(self.num_rows) + .saturating_add(self.num_deletes) + } } pub trait MiscExt: CfNamesExt + FlowControlFactorsExt + WriteBatchExt { diff --git a/components/raftstore-v2/src/worker/pd/store.rs b/components/raftstore-v2/src/worker/pd/store.rs index 926ad307cf..226fef08d1 100644 --- a/components/raftstore-v2/src/worker/pd/store.rs +++ b/components/raftstore-v2/src/worker/pd/store.rs @@ -23,6 +23,7 @@ use slog::{error, info, warn}; use tikv_util::{ metrics::RecordPairVec, store::QueryStats, + sys::disk::get_disk_space_stats, time::{Duration, Instant as TiInstant, UnixSecs}, topn::TopN, }; @@ -442,7 +443,8 @@ where /// Returns (capacity, used, available). fn collect_engine_size(&self) -> Option<(u64, u64, u64)> { - let disk_stats = match fs2::statvfs(self.tablet_registry.tablet_root()) { + let (disk_cap, disk_avail) = match get_disk_space_stats(self.tablet_registry.tablet_root()) + { Err(e) => { error!( self.logger, @@ -452,9 +454,8 @@ where ); return None; } - Ok(stats) => stats, + Ok((total_size, available_size)) => (total_size, available_size), }; - let disk_cap = disk_stats.total_space(); let capacity = if self.cfg.value().capacity.0 == 0 { disk_cap } else { @@ -481,7 +482,7 @@ where let mut available = capacity.checked_sub(used_size).unwrap_or_default(); // We only care about rocksdb SST file size, so we should check disk available // here. - available = cmp::min(available, disk_stats.available_space()); + available = cmp::min(available, disk_avail); Some((capacity, used_size, available)) } } diff --git a/components/raftstore/src/store/worker/compact.rs b/components/raftstore/src/store/worker/compact.rs index 4dfe180ceb..0c1fc74dda 100644 --- a/components/raftstore/src/store/worker/compact.rs +++ b/components/raftstore/src/store/worker/compact.rs @@ -445,7 +445,7 @@ pub fn need_compact(range_stats: &RangeStats, compact_threshold: &CompactThresho // We trigger region compaction when their are to many tombstones as well as // redundant keys, both of which can severly impact scan operation: let estimate_num_del = range_stats.num_entries - range_stats.num_versions; - let redundant_keys = range_stats.num_entries - range_stats.num_rows; + let redundant_keys = range_stats.redundant_keys(); (redundant_keys >= compact_threshold.redundant_rows_threshold && redundant_keys * 100 >= compact_threshold.redundant_rows_percent_threshold * range_stats.num_entries) diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index d6f644ee00..0c20be4847 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -44,7 +44,7 @@ use tikv_util::{ box_err, debug, error, info, metrics::ThreadInfoStatistics, store::QueryStats, - sys::{thread::StdThreadBuildWrapper, SysQuota}, + sys::{disk::get_disk_space_stats, thread::StdThreadBuildWrapper, SysQuota}, thd_name, time::{Instant as TiInstant, UnixSecs}, timer::GLOBAL_TIMER_HANDLE, @@ -2433,7 +2433,7 @@ fn collect_engine_size( return Some((engine_size.capacity, engine_size.used, engine_size.avail)); } let store_info = store_info.unwrap(); - let disk_stats = match fs2::statvfs(store_info.kv_engine.path()) { + let (disk_cap, disk_avail) = match get_disk_space_stats(store_info.kv_engine.path()) { Err(e) => { error!( "get disk stat for rocksdb failed"; @@ -2442,9 +2442,8 @@ fn collect_engine_size( ); return None; } - Ok(stats) => stats, + Ok((total_size, available_size)) => (total_size, available_size), }; - let disk_cap = disk_stats.total_space(); let capacity = if store_info.capacity == 0 || disk_cap < store_info.capacity { disk_cap } else { @@ -2468,7 +2467,7 @@ fn collect_engine_size( let mut available = capacity.checked_sub(used_size).unwrap_or_default(); // We only care about rocksdb SST file size, so we should check disk available // here. - available = cmp::min(available, disk_stats.available_space()); + available = cmp::min(available, disk_avail); Some((capacity, used_size, available)) } diff --git a/components/server/Cargo.toml b/components/server/Cargo.toml index ea1e26bc4d..f5cf38f1ac 100644 --- a/components/server/Cargo.toml +++ b/components/server/Cargo.toml @@ -15,18 +15,10 @@ sse = ["tikv/sse"] memory-engine = [] mem-profiling = ["tikv/mem-profiling"] failpoints = ["tikv/failpoints"] -test-engine-kv-rocksdb = [ - "tikv/test-engine-kv-rocksdb" -] -test-engine-raft-raft-engine = [ - "tikv/test-engine-raft-raft-engine" -] -test-engines-rocksdb = [ - "tikv/test-engines-rocksdb", -] -test-engines-panic = [ - "tikv/test-engines-panic", -] +test-engine-kv-rocksdb = ["tikv/test-engine-kv-rocksdb"] +test-engine-raft-raft-engine = ["tikv/test-engine-raft-raft-engine"] +test-engines-rocksdb = ["tikv/test-engines-rocksdb"] +test-engines-panic = ["tikv/test-engines-panic"] nortcheck = ["engine_rocks/nortcheck"] backup-stream-debug = ["backup-stream/backup-stream-debug"] @@ -49,7 +41,6 @@ engine_traits = { workspace = true } error_code = { workspace = true } fail = "0.5" file_system = { workspace = true } -fs2 = "0.4" futures = "0.3" grpcio = { workspace = true } grpcio-health = { workspace = true } @@ -59,7 +50,10 @@ hybrid_engine = { workspace = true } keys = { workspace = true } kvproto = { workspace = true } libc = "0.2" -log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } +log = { version = "0.4", features = [ + "max_level_trace", + "release_max_level_debug", +] } log_wrappers = { workspace = true } pd_client = { workspace = true } prometheus = { version = "0.13", features = ["nightly"] } diff --git a/components/server/src/common.rs b/components/server/src/common.rs index 06c1770498..5902b37c1e 100644 --- a/components/server/src/common.rs +++ b/components/server/src/common.rs @@ -216,8 +216,9 @@ impl TikvServerCore { } } - let disk_stats = fs2::statvfs(&self.config.storage.data_dir).unwrap(); - let mut capacity = disk_stats.total_space(); + let (disk_cap, disk_avail) = + disk::get_disk_space_stats(&self.config.storage.data_dir).unwrap(); + let mut capacity = disk_cap; if self.config.raft_store.capacity.0 > 0 { capacity = cmp::min(capacity, self.config.raft_store.capacity.0); } @@ -225,11 +226,7 @@ impl TikvServerCore { let kv_reserved_size = calculate_reserved_space(capacity, self.config.storage.reserve_space.0); disk::set_disk_reserved_space(kv_reserved_size); - reserve_physical_space( - &self.config.storage.data_dir, - disk_stats.available_space(), - kv_reserved_size, - ); + reserve_physical_space(&self.config.storage.data_dir, disk_avail, kv_reserved_size); let raft_data_dir = if self.config.raft_engine.enable { self.config.raft_engine.config().dir @@ -240,18 +237,13 @@ impl TikvServerCore { let separated_raft_mount_path = path_in_diff_mount_point(&self.config.storage.data_dir, &raft_data_dir); if separated_raft_mount_path { - let raft_disk_stats = fs2::statvfs(&raft_data_dir).unwrap(); + let (raft_disk_cap, raft_disk_avail) = + disk::get_disk_space_stats(&raft_data_dir).unwrap(); // reserve space for raft engine if raft engine is deployed separately - let raft_reserved_size = calculate_reserved_space( - raft_disk_stats.total_space(), - self.config.storage.reserve_raft_space.0, - ); + let raft_reserved_size = + calculate_reserved_space(raft_disk_cap, self.config.storage.reserve_raft_space.0); disk::set_raft_disk_reserved_space(raft_reserved_size); - reserve_physical_space( - &raft_data_dir, - raft_disk_stats.available_space(), - raft_reserved_size, - ); + reserve_physical_space(&raft_data_dir, raft_disk_avail, raft_reserved_size); } } @@ -896,3 +888,340 @@ impl EngineMetricsManager { } } } + +fn calculate_disk_usage(a: disk::DiskUsage, b: disk::DiskUsage) -> disk::DiskUsage { + match (a, b) { + (disk::DiskUsage::AlreadyFull, _) => disk::DiskUsage::AlreadyFull, + (_, disk::DiskUsage::AlreadyFull) => disk::DiskUsage::AlreadyFull, + (disk::DiskUsage::AlmostFull, _) => disk::DiskUsage::AlmostFull, + (_, disk::DiskUsage::AlmostFull) => disk::DiskUsage::AlmostFull, + (disk::DiskUsage::Normal, disk::DiskUsage::Normal) => disk::DiskUsage::Normal, + } +} + +/// A checker to inspect the disk usage of kv engine and raft engine. +/// The caller should call `inspect` periodically to get the disk usage status +/// manually. +#[derive(Clone)] +pub struct DiskUsageChecker { + /// The path of kv engine. + kvdb_path: String, + /// The path of raft engine. + raft_path: String, + /// The path of auxiliary directory of raft engine if specified. + raft_auxiliary_path: Option, + /// Whether the main directory of raft engine is separated from kv engine. + separated_raft_mount_path: bool, + /// Whether the auxiliary directory of raft engine is separated from kv + /// engine. + separated_raft_auxiliary_mount_path: bool, + /// Whether the auxiliary directory of raft engine is both separated from + /// the main directory of raft engine and kv engine. + separated_raft_auxiliary_and_kvdb_mount_path: bool, + /// The threshold of disk usage of kv engine to trigger the almost full + /// status. + kvdb_almost_full_thd: u64, + /// The threshold of disk usage of raft engine to trigger the almost full + /// status. + raft_almost_full_thd: u64, + /// The specified disk capacity for the whole disk. + config_disk_capacity: u64, +} + +impl DiskUsageChecker { + pub fn new( + kvdb_path: String, + raft_path: String, + raft_auxiliary_path: Option, + separated_raft_mount_path: bool, + separated_raft_auxiliary_mount_path: bool, + separated_raft_auxiliary_and_kvdb_mount_path: bool, + kvdb_almost_full_thd: u64, + raft_almost_full_thd: u64, + config_disk_capacity: u64, + ) -> Self { + DiskUsageChecker { + kvdb_path, + raft_path, + raft_auxiliary_path, + separated_raft_mount_path, + separated_raft_auxiliary_mount_path, + separated_raft_auxiliary_and_kvdb_mount_path, + kvdb_almost_full_thd, + raft_almost_full_thd, + config_disk_capacity, + } + } + + /// Inspect the disk usage of kv engine and raft engine. + /// The `kvdb_used_size` is the used size of kv engine, and the + /// `raft_used_size` is the used size of raft engine. + /// + /// Returns the disk usage status of the whole disk, kv engine and raft + /// engine, the whole disk capacity and available size. + pub fn inspect( + &self, + kvdb_used_size: u64, + raft_used_size: u64, + ) -> ( + disk::DiskUsage, // whole disk status + disk::DiskUsage, // kvdb disk status + disk::DiskUsage, // raft disk status + u64, // whole capacity + u64, // whole available + ) { + // By default, the almost full threshold of kv engine is half of the + // configured value. + let kvdb_already_full_thd = self.kvdb_almost_full_thd / 2; + let raft_already_full_thd = self.raft_almost_full_thd / 2; + // Check the disk space of raft engine. + let raft_disk_status = { + if !self.separated_raft_mount_path || self.raft_almost_full_thd == 0 { + disk::DiskUsage::Normal + } else { + let (raft_disk_cap, raft_disk_avail) = match disk::get_disk_space_stats( + &self.raft_path, + ) { + Err(e) => { + error!( + "get disk stat for raft engine failed"; + "raft_engine_path" => &self.raft_path, + "err" => ?e + ); + return ( + disk::DiskUsage::Normal, + disk::DiskUsage::Normal, + disk::DiskUsage::Normal, + 0, + 0, + ); + } + Ok((cap, avail)) => { + if !self.separated_raft_auxiliary_mount_path { + // If the auxiliary directory of raft engine is not separated from + // kv engine, returns u64::MAX to indicate that the disk space of + // the raft engine should not be checked. + (std::u64::MAX, std::u64::MAX) + } else if self.separated_raft_auxiliary_and_kvdb_mount_path { + // If the auxiliary directory of raft engine is separated from kv + // engine and the main directory of + // raft engine, the disk space of + // the auxiliary directory should be + // checked. + assert!(self.raft_auxiliary_path.is_some()); + let (auxiliary_disk_cap, auxiliary_disk_avail) = + match disk::get_disk_space_stats( + self.raft_auxiliary_path.as_ref().unwrap(), + ) { + Err(e) => { + error!( + "get auxiliary disk stat for raft engine failed"; + "raft_engine_path" => self.raft_auxiliary_path.as_ref().unwrap(), + "err" => ?e + ); + (0_u64, 0_u64) + } + Ok((total, avail)) => (total, avail), + }; + (cap + auxiliary_disk_cap, avail + auxiliary_disk_avail) + } else { + (cap, avail) + } + } + }; + let raft_disk_available = cmp::min( + raft_disk_cap + .checked_sub(raft_used_size) + .unwrap_or_default(), + raft_disk_avail, + ); + if raft_disk_available <= raft_already_full_thd { + disk::DiskUsage::AlreadyFull + } else if raft_disk_available <= self.raft_almost_full_thd { + disk::DiskUsage::AlmostFull + } else { + disk::DiskUsage::Normal + } + } + }; + // Check the disk space of kv engine. + let (disk_cap, disk_avail) = match disk::get_disk_space_stats(&self.kvdb_path) { + Err(e) => { + error!( + "get disk stat for kv store failed"; + "kv_path" => &self.kvdb_path, + "err" => ?e + ); + return ( + disk::DiskUsage::Normal, + disk::DiskUsage::Normal, + disk::DiskUsage::Normal, + 0, + 0, + ); + } + Ok((total, avail)) => (total, avail), + }; + let capacity = if self.config_disk_capacity == 0 || disk_cap < self.config_disk_capacity { + disk_cap + } else { + self.config_disk_capacity + }; + let available = cmp::min( + capacity.checked_sub(kvdb_used_size).unwrap_or_default(), + disk_avail, + ); + let cur_kv_disk_status = if available <= kvdb_already_full_thd { + disk::DiskUsage::AlreadyFull + } else if available <= self.kvdb_almost_full_thd { + disk::DiskUsage::AlmostFull + } else { + disk::DiskUsage::Normal + }; + let cur_disk_status = calculate_disk_usage(raft_disk_status, cur_kv_disk_status); + ( + cur_disk_status, + cur_kv_disk_status, + raft_disk_status, + capacity, + available, + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_disk_usage_checker() { + let kvdb_path = "/tmp/tikv-kvdb".to_owned(); + let raft_path = "/tmp/tikv-raft".to_owned(); + let raft_spill_path = "/tmp/tikv-raft/spill".to_owned(); + + // Case 1: mock the kvdb and raft engine are not separated. + fail::cfg("mock_disk_space_stats", "return(10000,5000)").unwrap(); + let disk_usage_checker = DiskUsageChecker::new( + kvdb_path.clone(), + raft_path.clone(), + Some(raft_spill_path.clone()), + false, + true, + false, + 100, + 100, + 1000, + ); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 1000); + assert_eq!(disk_status, disk::DiskUsage::AlreadyFull); + assert_eq!(kvdb_status, disk::DiskUsage::AlreadyFull); + assert_eq!(raft_status, disk::DiskUsage::Normal); + + let disk_usage_checker = DiskUsageChecker::new( + kvdb_path.clone(), + raft_path.clone(), + Some(raft_spill_path.clone()), + false, + true, + false, + 100, + 100, + 4100, + ); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 1000); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::AlmostFull); + assert_eq!(disk_status, disk::DiskUsage::AlmostFull); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(3999, 1000); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::Normal); + fail::remove("mock_disk_space_stats"); + + // Case 2: mock the kvdb and raft engine are separated. + fail::cfg( + "mock_disk_space_stats", + "1*return(500,200)->1*return(5000,2000)->1*return(500,200)->1*return(5000,2000)->1*return(500,200)->1*return(5000,2000)", + ) + .unwrap(); + let disk_usage_checker = DiskUsageChecker::new( + kvdb_path.clone(), + raft_path.clone(), + Some(raft_spill_path.clone()), + true, + true, + false, + 100, + 100, + 6000, + ); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 450); + assert_eq!(raft_status, disk::DiskUsage::AlreadyFull); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::AlreadyFull); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 400); + assert_eq!(raft_status, disk::DiskUsage::AlmostFull); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::AlmostFull); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 399); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::Normal); + fail::remove("mock_disk_space_stats"); + + fail::cfg( + "mock_disk_space_stats", + "1*return(500,200)->1*return(5000,2000)->1*return(500,200)->1*return(5000,2000)->1*return(500,200)->1*return(5000,2000)", + ) + .unwrap(); + let disk_usage_checker = DiskUsageChecker::new( + kvdb_path.clone(), + raft_path.clone(), + Some(raft_spill_path.clone()), + true, + false, + false, + 100, + 100, + 6000, + ); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 450); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::Normal); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 500); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::Normal); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4900, 500); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::AlmostFull); + assert_eq!(disk_status, disk::DiskUsage::AlmostFull); + fail::remove("mock_disk_space_stats"); + + // Case 3: mock the kvdb and raft engine are separated and the auxiliary + // directory of raft engine is separated from the main directory of + // raft. + fail::cfg( + "mock_disk_space_stats", + "1*return(500,200)->1*return(100,20)->1*return(5000,2000)", + ) + .unwrap(); + let disk_usage_checker = DiskUsageChecker::new( + kvdb_path.clone(), + raft_path.clone(), + Some(raft_spill_path.clone()), + true, + true, + true, + 100, + 100, + 6000, + ); + let (disk_status, kvdb_status, raft_status, ..) = disk_usage_checker.inspect(4000, 450); + assert_eq!(raft_status, disk::DiskUsage::Normal); + assert_eq!(kvdb_status, disk::DiskUsage::Normal); + assert_eq!(disk_status, disk::DiskUsage::Normal); + fail::remove("mock_disk_space_stats"); + } +} diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 5e27e86ae9..a5f9d1e781 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -12,7 +12,6 @@ //! explicitly stopped. We keep these components in the `TikvServer` struct. use std::{ - cmp, collections::HashMap, convert::TryFrom, path::{Path, PathBuf}, @@ -131,8 +130,8 @@ use tokio::runtime::Builder; use crate::{ common::{ - ConfiguredRaftEngine, EngineMetricsManager, EnginesResourceInfo, KvEngineBuilder, - TikvServerCore, + ConfiguredRaftEngine, DiskUsageChecker, EngineMetricsManager, EnginesResourceInfo, + KvEngineBuilder, TikvServerCore, }, memory::*, setup::*, @@ -1377,77 +1376,53 @@ where let raft_path = engines.raft.get_engine_path().to_string(); let separated_raft_mount_path = path_in_diff_mount_point(raft_path.as_str(), engines.kv.path()); - let raft_almost_full_threshold = reserve_raft_space; - let raft_already_full_threshold = reserve_raft_space / 2; - - let almost_full_threshold = reserve_space; - let already_full_threshold = reserve_space / 2; - fn calculate_disk_usage(a: disk::DiskUsage, b: disk::DiskUsage) -> disk::DiskUsage { - match (a, b) { - (disk::DiskUsage::AlreadyFull, _) => disk::DiskUsage::AlreadyFull, - (_, disk::DiskUsage::AlreadyFull) => disk::DiskUsage::AlreadyFull, - (disk::DiskUsage::AlmostFull, _) => disk::DiskUsage::AlmostFull, - (_, disk::DiskUsage::AlmostFull) => disk::DiskUsage::AlmostFull, - (disk::DiskUsage::Normal, disk::DiskUsage::Normal) => disk::DiskUsage::Normal, - } - } + // If the auxiliary directory of raft engine is specified, it's needed to be + // checked. Otherwise, it's not needed to be checked. And as the configuration + // is static, it's safe to check it only once. + let raft_auxiliay_path = if self.core.config.raft_engine.enable { + self.core.config.raft_engine.config().spill_dir.clone() + } else { + None + }; + let (separated_raft_auxillay_mount_path, separated_raft_auxiliary_with_kvdb) = + raft_auxiliay_path + .as_ref() + .map(|path| { + let seperated_with_kvdb = + path_in_diff_mount_point(path.as_str(), engines.kv.path()); + let seperated_with_raft = + path_in_diff_mount_point(path.as_str(), raft_path.as_str()); + ( + seperated_with_kvdb && seperated_with_raft, + seperated_with_kvdb, + ) + }) + .unwrap_or((false, false)); + let disk_usage_checker = DiskUsageChecker::new( + store_path.as_path().to_str().unwrap().to_string(), + raft_path, + raft_auxiliay_path, + separated_raft_mount_path, + separated_raft_auxillay_mount_path, + separated_raft_auxiliary_with_kvdb, + reserve_space, + reserve_raft_space, + config_disk_capacity, + ); self.core.background_worker .spawn_interval_task(DEFAULT_STORAGE_STATS_INTERVAL, move || { - let disk_stats = match fs2::statvfs(&store_path) { - Err(e) => { - error!( - "get disk stat for kv store failed"; - "kv_path" => store_path.to_str(), - "err" => ?e - ); - return; - } - Ok(stats) => stats, - }; - let disk_cap = disk_stats.total_space(); let snap_size = snap_mgr.get_total_snap_size().unwrap(); - let kv_size = engines .kv .get_engine_used_size() .expect("get kv engine size"); - let raft_size = engines .raft .get_engine_size() .expect("get raft engine size"); - - let mut raft_disk_status = disk::DiskUsage::Normal; - if separated_raft_mount_path && reserve_raft_space != 0 { - let raft_disk_stats = match fs2::statvfs(&raft_path) { - Err(e) => { - error!( - "get disk stat for raft engine failed"; - "raft_engine_path" => raft_path.clone(), - "err" => ?e - ); - return; - } - Ok(stats) => stats, - }; - let raft_disk_cap = raft_disk_stats.total_space(); - let mut raft_disk_available = - raft_disk_cap.checked_sub(raft_size).unwrap_or_default(); - raft_disk_available = cmp::min(raft_disk_available, raft_disk_stats.available_space()); - raft_disk_status = if raft_disk_available <= raft_already_full_threshold - { - disk::DiskUsage::AlreadyFull - } else if raft_disk_available <= raft_almost_full_threshold - { - disk::DiskUsage::AlmostFull - } else { - disk::DiskUsage::Normal - }; - } let placeholer_file_path = PathBuf::from_str(&data_dir) .unwrap() .join(Path::new(file_system::SPACE_PLACEHOLDER_FILE)); - let placeholder_size: u64 = file_system::get_file_size(placeholer_file_path).unwrap_or(0); @@ -1456,24 +1431,9 @@ where } else { snap_size + kv_size + placeholder_size }; - let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity { - disk_cap - } else { - config_disk_capacity - }; - - let mut available = capacity.checked_sub(used_size).unwrap_or_default(); - available = cmp::min(available, disk_stats.available_space()); - + // Check the disk usage and update the disk usage status. + let (cur_disk_status, cur_kv_disk_status, raft_disk_status, capacity, available) = disk_usage_checker.inspect(used_size, raft_size); let prev_disk_status = disk::get_disk_status(0); //0 no need care about failpoint. - let cur_kv_disk_status = if available <= already_full_threshold { - disk::DiskUsage::AlreadyFull - } else if available <= almost_full_threshold { - disk::DiskUsage::AlmostFull - } else { - disk::DiskUsage::Normal - }; - let cur_disk_status = calculate_disk_usage(raft_disk_status, cur_kv_disk_status); if prev_disk_status != cur_disk_status { warn!( "disk usage {:?}->{:?} (raft engine usage: {:?}, kv engine usage: {:?}), seperated raft mount={}, kv available={}, snap={}, kv={}, raft={}, capacity={}", diff --git a/components/server/src/server2.rs b/components/server/src/server2.rs index a642d465f8..cceed5efd9 100644 --- a/components/server/src/server2.rs +++ b/components/server/src/server2.rs @@ -12,7 +12,6 @@ //! explicitly stopped. We keep these components in the `TikvServer` struct. use std::{ - cmp, collections::HashMap, marker::PhantomData, path::{Path, PathBuf}, @@ -122,7 +121,10 @@ use tikv_util::{ use tokio::runtime::Builder; use crate::{ - common::{ConfiguredRaftEngine, EngineMetricsManager, EnginesResourceInfo, TikvServerCore}, + common::{ + ConfiguredRaftEngine, DiskUsageChecker, EngineMetricsManager, EnginesResourceInfo, + TikvServerCore, + }, memory::*, setup::*, signal_handler, @@ -1149,36 +1151,42 @@ where let raft_path = raft_engine.get_engine_path().to_string(); let separated_raft_mount_path = path_in_diff_mount_point(raft_path.as_str(), tablet_registry.tablet_root()); - let raft_almost_full_threshold = reserve_raft_space; - let raft_already_full_threshold = reserve_raft_space / 2; - - let almost_full_threshold = reserve_space; - let already_full_threshold = reserve_space / 2; - fn calculate_disk_usage(a: disk::DiskUsage, b: disk::DiskUsage) -> disk::DiskUsage { - match (a, b) { - (disk::DiskUsage::AlreadyFull, _) => disk::DiskUsage::AlreadyFull, - (_, disk::DiskUsage::AlreadyFull) => disk::DiskUsage::AlreadyFull, - (disk::DiskUsage::AlmostFull, _) => disk::DiskUsage::AlmostFull, - (_, disk::DiskUsage::AlmostFull) => disk::DiskUsage::AlmostFull, - (disk::DiskUsage::Normal, disk::DiskUsage::Normal) => disk::DiskUsage::Normal, - } - } + // If the auxiliary directory of raft engine is specified, it's needed to be + // checked. Otherwise, it's not needed to be checked. And as the configuration + // is static, it's safe to check it only once. + let raft_auxiliay_path = if self.core.config.raft_engine.enable { + self.core.config.raft_engine.config().spill_dir.clone() + } else { + None + }; + let (separated_raft_auxillay_mount_path, separated_raft_auxiliary_with_kvdb) = + raft_auxiliay_path + .as_ref() + .map(|path| { + let seperated_with_kvdb = + path_in_diff_mount_point(path.as_str(), tablet_registry.tablet_root()); + let seperated_with_raft = + path_in_diff_mount_point(path.as_str(), raft_path.as_str()); + ( + seperated_with_kvdb && seperated_with_raft, + seperated_with_kvdb, + ) + }) + .unwrap_or((false, false)); + let disk_usage_checker = DiskUsageChecker::new( + store_path.as_path().to_str().unwrap().to_string(), + raft_path, + raft_auxiliay_path, + separated_raft_mount_path, + separated_raft_auxillay_mount_path, + separated_raft_auxiliary_with_kvdb, + reserve_space, + reserve_raft_space, + config_disk_capacity, + ); self.core.background_worker .spawn_interval_task(DEFAULT_STORAGE_STATS_INTERVAL, move || { - let disk_stats = match fs2::statvfs(&store_path) { - Err(e) => { - error!( - "get disk stat for kv store failed"; - "kv_path" => store_path.to_str(), - "err" => ?e - ); - return; - } - Ok(stats) => stats, - }; - let disk_cap = disk_stats.total_space(); let snap_size = snap_mgr.total_snap_size().unwrap(); - let mut kv_size = 0; tablet_registry.for_each_opened_tablet(|_, cached| { if let Some(tablet) = cached.latest() { @@ -1186,42 +1194,12 @@ where } true }); - let raft_size = raft_engine .get_engine_size() .expect("get raft engine size"); - - let mut raft_disk_status = disk::DiskUsage::Normal; - if separated_raft_mount_path && reserve_raft_space != 0 { - let raft_disk_stats = match fs2::statvfs(&raft_path) { - Err(e) => { - error!( - "get disk stat for raft engine failed"; - "raft_engine_path" => raft_path.clone(), - "err" => ?e - ); - return; - } - Ok(stats) => stats, - }; - let raft_disk_cap = raft_disk_stats.total_space(); - let mut raft_disk_available = - raft_disk_cap.checked_sub(raft_size).unwrap_or_default(); - raft_disk_available = cmp::min(raft_disk_available, raft_disk_stats.available_space()); - raft_disk_status = if raft_disk_available <= raft_already_full_threshold - { - disk::DiskUsage::AlreadyFull - } else if raft_disk_available <= raft_almost_full_threshold - { - disk::DiskUsage::AlmostFull - } else { - disk::DiskUsage::Normal - }; - } let placeholer_file_path = PathBuf::from_str(&data_dir) .unwrap() .join(Path::new(file_system::SPACE_PLACEHOLDER_FILE)); - let placeholder_size: u64 = file_system::get_file_size(placeholer_file_path).unwrap_or(0); @@ -1230,24 +1208,9 @@ where } else { snap_size + kv_size + placeholder_size }; - let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity { - disk_cap - } else { - config_disk_capacity - }; - - let mut available = capacity.checked_sub(used_size).unwrap_or_default(); - available = cmp::min(available, disk_stats.available_space()); - + // Check the disk usage and update the disk usage status. + let (cur_disk_status, cur_kv_disk_status, raft_disk_status, capacity, available) = disk_usage_checker.inspect(used_size, raft_size); let prev_disk_status = disk::get_disk_status(0); //0 no need care about failpoint. - let cur_kv_disk_status = if available <= already_full_threshold { - disk::DiskUsage::AlreadyFull - } else if available <= almost_full_threshold { - disk::DiskUsage::AlmostFull - } else { - disk::DiskUsage::Normal - }; - let cur_disk_status = calculate_disk_usage(raft_disk_status, cur_kv_disk_status); if prev_disk_status != cur_disk_status { warn!( "disk usage {:?}->{:?} (raft engine usage: {:?}, kv engine usage: {:?}), seperated raft mount={}, kv available={}, snap={}, kv={}, raft={}, capacity={}", diff --git a/components/tikv_util/Cargo.toml b/components/tikv_util/Cargo.toml index b99a722a02..9f5e9ad2c2 100644 --- a/components/tikv_util/Cargo.toml +++ b/components/tikv_util/Cargo.toml @@ -24,6 +24,7 @@ crossbeam-skiplist = "0.1" derive_more = "0.99.3" error_code = { workspace = true } fail = "0.5" +fs2 = "0.4" futures = { version = "0.3", features = ["compat", "thread-pool"] } futures-util = { version = "0.3", default-features = false, features = ["io"] } grpcio = { workspace = true } @@ -31,7 +32,10 @@ http = "0.2.0" kvproto = { workspace = true } lazy_static = "1.3" libc = "0.2" -log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } +log = { version = "0.4", features = [ + "max_level_trace", + "release_max_level_debug", +] } log_wrappers = { workspace = true } nix = "0.24" num-traits = "0.2" diff --git a/components/tikv_util/src/sys/disk.rs b/components/tikv_util/src/sys/disk.rs index c8fe87a56b..5918bdd8e3 100644 --- a/components/tikv_util/src/sys/disk.rs +++ b/components/tikv_util/src/sys/disk.rs @@ -1,5 +1,8 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::atomic::{AtomicI32, AtomicU64, Ordering}; +use std::{ + path::Path, + sync::atomic::{AtomicI32, AtomicU64, Ordering}, +}; use fail::fail_point; pub use kvproto::disk_usage::DiskUsage; @@ -78,3 +81,16 @@ pub fn get_disk_status(_store_id: u64) -> DiskUsage { _ => panic!("Disk Status Value not meet expectations"), } } + +pub fn get_disk_space_stats>(path: P) -> std::io::Result<(u64, u64)> { + fail_point!("mock_disk_space_stats", |stats| { + let stats = stats.unwrap(); + let values = stats.split(',').collect::>(); + Ok(( + values[0].parse::().unwrap(), + values[1].parse::().unwrap(), + )) + }); + let disk_stats = fs2::statvfs(path)?; + Ok((disk_stats.total_space(), disk_stats.available_space())) +} diff --git a/components/tikv_util/src/sys/mod.rs b/components/tikv_util/src/sys/mod.rs index 8e0334fb17..028a70ce18 100644 --- a/components/tikv_util/src/sys/mod.rs +++ b/components/tikv_util/src/sys/mod.rs @@ -281,10 +281,11 @@ pub fn path_in_diff_mount_point(_path1: impl AsRef, _path2: impl AsRef 0); + assert!(available > 0); + assert!(capacity >= available); + + disk::get_disk_space_stats("/non-exist-path").unwrap_err(); + } }