diff --git a/.github/workflows/nfstest-factory.yml b/.github/workflows/nfstest-factory.yml index c12c723..38e95ce 100644 --- a/.github/workflows/nfstest-factory.yml +++ b/.github/workflows/nfstest-factory.yml @@ -45,6 +45,27 @@ jobs: sed -i '/imagePullPolicy/d' deploy/k8s/deployment.yaml # Add imagePullPolicy: Never for local image sed -i 's|image: arcticwolf:test|image: arcticwolf:test\n imagePullPolicy: Never|g' deploy/k8s/deployment.yaml + # Enable folder quota with a bootstrap entry so the quota smoke + # test below has something to enforce against. The entry matches + # tests/test_nfs_quota.sh defaults (PVC_NAME=pvc-quota-test, + # QUOTA_BYTES=1048576). + python3 - <<'PY' + from pathlib import Path + p = Path("deploy/k8s/deployment.yaml") + text = p.read_text() + marker = ' [logging]\n level = "debug"' + addition = ( + '\n' + ' [quota]\n' + ' enabled = true\n' + ' db_path = "/tmp/.arcticwolf-quota.db"\n' + '\n' + ' [quota.bootstrap]\n' + ' "pvc-quota-test" = "1MB"\n' + ) + assert marker in text, "deployment.yaml logging block not found" + p.write_text(text.replace(marker, marker + addition)) + PY kubectl apply -f deploy/k8s/deployment.yaml kubectl wait --for=condition=Ready pod -l app=arcticwolf --timeout=120s @@ -68,6 +89,18 @@ jobs: echo "NFS mounted successfully" ls -la /mnt/nfs/ + # Exercise folder quota against the live mount before running the + # broader POSIX suite. The quota directory (pvc-quota-test) is + # isolated from the paths nfstest_posix touches, so the two suites + # do not interfere. + - name: Run folder quota smoke test + run: | + sudo env \ + MOUNT_POINT=/mnt/nfs \ + PVC_NAME=pvc-quota-test \ + QUOTA_BYTES=1048576 \ + bash tests/test_nfs_quota.sh + # skip test open until we start to review the failures # read/write tests are mandatory and must pass - name: Run NFS POSIX tests diff --git a/Cargo.toml b/Cargo.toml index 12df514..b0044ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,9 @@ clap = { version = "4", features = ["derive"] } # XDR serialization (runtime) xdr-codec = "0.4" +# Embedded KV store for quota persistence +redb = "3" + [dev-dependencies] tempfile = "3" diff --git a/arcticwolf.example.toml b/arcticwolf.example.toml index a6ffbbc..e7be8fa 100644 --- a/arcticwolf.example.toml +++ b/arcticwolf.example.toml @@ -19,3 +19,21 @@ export_path = "/tmp/nfs_exports" [logging] # Log level: "error", "warn", "info", "debug", "trace" (default: "info") level = "info" + +[quota] +# Enable folder quota enforcement (default: false) +# When enabled, first-level subdirectories under export_path may be +# assigned a byte limit; writes past the limit return NFS3ERR_DQUOT. +enabled = false +# redb database storing quota limits and tracked usage. +# The parent directory is created at startup if missing. +db_path = "/var/lib/arcticwolf/quota.db" + +# Optional declarative bootstrap. Each entry sets a quota on the first- +# level subdirectory at startup, but only if the directory has no quota +# entry yet. This keeps the bootstrap idempotent: editing it after initial +# rollout will not clobber the tracked usage. +# +# Sizes support suffixes B, KB, MB, GB, TB (1024-based). +# [quota.bootstrap] +# "pvc-example" = "10GB" diff --git a/src/config.rs b/src/config.rs index 3c272eb..47a80b9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,9 +6,11 @@ use clap::Parser; use serde::Deserialize; +use std::collections::HashMap; use std::path::PathBuf; const DEFAULT_CONFIG_PATH: &str = "/etc/arcticwolf/config.toml"; +const DEFAULT_QUOTA_DB_PATH: &str = "/var/lib/arcticwolf/quota.db"; #[derive(Parser, Debug)] #[command(name = "arcticwolf")] @@ -25,6 +27,7 @@ pub struct Config { pub server: ServerConfig, pub fsal: FsalConfig, pub logging: LoggingConfig, + pub quota: QuotaConfig, } #[derive(Debug, Clone, Deserialize)] @@ -48,6 +51,71 @@ pub struct LoggingConfig { pub level: Option, } +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct QuotaConfig { + /// Enable quota enforcement + pub enabled: bool, + /// Path to the redb database file storing quota limits and usage + pub db_path: PathBuf, + /// Declarative quota bootstrap: map of first-level subdirectory name + /// to a size string (e.g. "10GB"). Each entry is applied at startup + /// only when the directory has no existing quota entry, so the + /// bootstrap is idempotent and safe across restarts. + pub bootstrap: HashMap, +} + +impl Default for QuotaConfig { + fn default() -> Self { + Self { + enabled: false, + db_path: PathBuf::from(DEFAULT_QUOTA_DB_PATH), + bootstrap: HashMap::new(), + } + } +} + +/// Parse a human-readable size string into bytes +/// +/// Supported suffixes (case-insensitive): B, KB, MB, GB, TB +/// Uses 1024-based units (KiB/MiB/GiB/TiB semantics). Plain numbers are treated as bytes. +/// +/// # Examples +/// - "1024" -> 1024 +/// - "10KB" -> 10240 +/// - "5MB" -> 5242880 +/// - "2GB" -> 2147483648 +#[allow(dead_code)] +pub fn parse_size(s: &str) -> anyhow::Result { + let trimmed = s.trim(); + if trimmed.is_empty() { + anyhow::bail!("Empty size string"); + } + + let upper = trimmed.to_uppercase(); + let (num_part, multiplier): (&str, u64) = if let Some(prefix) = upper.strip_suffix("TB") { + (prefix, 1024u64.pow(4)) + } else if let Some(prefix) = upper.strip_suffix("GB") { + (prefix, 1024u64.pow(3)) + } else if let Some(prefix) = upper.strip_suffix("MB") { + (prefix, 1024u64.pow(2)) + } else if let Some(prefix) = upper.strip_suffix("KB") { + (prefix, 1024) + } else if let Some(prefix) = upper.strip_suffix('B') { + (prefix, 1) + } else { + (upper.as_str(), 1) + }; + + let num_str = num_part.trim(); + let num: u64 = num_str + .parse() + .map_err(|e| anyhow::anyhow!("Invalid size '{}': {}", s, e))?; + + num.checked_mul(multiplier) + .ok_or_else(|| anyhow::anyhow!("Size overflow: {}", s)) +} + impl Default for ServerConfig { fn default() -> Self { Self { @@ -227,4 +295,107 @@ mod tests { let result: Result = toml::from_str("this is not valid toml [[["); assert!(result.is_err()); } + + #[test] + fn test_quota_config_default() { + let config = QuotaConfig::default(); + assert!(!config.enabled); + assert_eq!(config.db_path, PathBuf::from(DEFAULT_QUOTA_DB_PATH)); + } + + #[test] + fn test_config_default_includes_quota() { + let config = Config::default(); + assert!(!config.quota.enabled); + } + + #[test] + fn test_parse_quota_toml() { + let toml = r#" + [quota] + enabled = true + db_path = "/tmp/quota.db" + "#; + + let config: Config = toml::from_str(toml).expect("Failed to parse TOML"); + assert!(config.quota.enabled); + assert_eq!(config.quota.db_path, PathBuf::from("/tmp/quota.db")); + assert!(config.quota.bootstrap.is_empty()); + } + + #[test] + fn test_parse_quota_bootstrap_toml() { + let toml = r#" + [quota] + enabled = true + db_path = "/tmp/quota.db" + + [quota.bootstrap] + "pvc-a" = "10GB" + "pvc-b" = "1MB" + "#; + + let config: Config = toml::from_str(toml).expect("Failed to parse TOML"); + assert_eq!(config.quota.bootstrap.len(), 2); + assert_eq!( + config.quota.bootstrap.get("pvc-a"), + Some(&"10GB".to_string()) + ); + assert_eq!( + config.quota.bootstrap.get("pvc-b"), + Some(&"1MB".to_string()) + ); + } + + #[test] + fn test_parse_size_bytes() { + assert_eq!(parse_size("0").unwrap(), 0); + assert_eq!(parse_size("1").unwrap(), 1); + assert_eq!(parse_size("1024").unwrap(), 1024); + assert_eq!(parse_size("512B").unwrap(), 512); + } + + #[test] + fn test_parse_size_kb() { + assert_eq!(parse_size("1KB").unwrap(), 1024); + assert_eq!(parse_size("10KB").unwrap(), 10 * 1024); + assert_eq!(parse_size("1kb").unwrap(), 1024); // case-insensitive + } + + #[test] + fn test_parse_size_mb() { + assert_eq!(parse_size("1MB").unwrap(), 1024 * 1024); + assert_eq!(parse_size("5MB").unwrap(), 5 * 1024 * 1024); + } + + #[test] + fn test_parse_size_gb() { + assert_eq!(parse_size("1GB").unwrap(), 1024u64.pow(3)); + assert_eq!(parse_size("10GB").unwrap(), 10 * 1024u64.pow(3)); + } + + #[test] + fn test_parse_size_tb() { + assert_eq!(parse_size("1TB").unwrap(), 1024u64.pow(4)); + assert_eq!(parse_size("2TB").unwrap(), 2 * 1024u64.pow(4)); + } + + #[test] + fn test_parse_size_whitespace() { + assert_eq!(parse_size(" 10MB ").unwrap(), 10 * 1024 * 1024); + } + + #[test] + fn test_parse_size_invalid() { + assert!(parse_size("").is_err()); + assert!(parse_size("abc").is_err()); + assert!(parse_size("10XB").is_err()); + assert!(parse_size("-5MB").is_err()); + } + + #[test] + fn test_parse_size_overflow() { + // u64::MAX / 1024^4 is ~16 million TB + assert!(parse_size("99999999999999999TB").is_err()); + } } diff --git a/src/fsal/local/mod.rs b/src/fsal/local/mod.rs index 6f53b6c..9f8e39f 100644 --- a/src/fsal/local/mod.rs +++ b/src/fsal/local/mod.rs @@ -11,8 +11,12 @@ use tokio::fs as tokio_fs; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tracing::{debug, warn}; +use std::sync::Arc; + use super::handle::{FileHandle, HandleManager}; -use super::{DirEntry, FileAttributes, FileTime, FileType, Filesystem}; +use super::quota::{QuotaManager, allocated_path_size}; +use super::{DirEntry, FileAttributes, FileTime, FileType, Filesystem, FsStats}; +use crate::config::QuotaConfig; /// Local filesystem implementation pub struct LocalFilesystem { @@ -22,6 +26,8 @@ pub struct LocalFilesystem { handle_manager: HandleManager, /// Root file handle root_handle: FileHandle, + /// Optional folder quota manager (present when quota is enabled in config) + quota_manager: Option>, } impl LocalFilesystem { @@ -29,7 +35,9 @@ impl LocalFilesystem { /// /// # Arguments /// * `root_path` - Root directory to export (e.g., "/export") - pub fn new>(root_path: P) -> Result { + /// * `quota` - Optional quota configuration. When present and `enabled` is + /// true, a [`QuotaManager`] is opened against the configured redb file. + pub fn new>(root_path: P, quota: Option<&QuotaConfig>) -> Result { let root_path = root_path.as_ref().canonicalize().context(format!( "Failed to canonicalize root path: {:?}", root_path.as_ref() @@ -48,15 +56,75 @@ impl LocalFilesystem { // Create root handle let root_handle = handle_manager.create_handle(root_path.clone()); + let quota_manager = match quota { + Some(cfg) if cfg.enabled => { + // Refuse to host the quota DB inside the exported tree: + // an NFS client would be able to read, modify or delete + // it via normal lookup/write/remove operations and + // corrupt enforcement state. Compare the canonicalised + // db parent against root_path so symlink shenanigans do + // not slip through. + check_db_path_outside_root(&cfg.db_path, &root_path)?; + + let qm = QuotaManager::new(&cfg.db_path, root_path.clone()).context(format!( + "Failed to initialise quota manager at {:?}", + cfg.db_path + ))?; + debug!("LocalFilesystem: quota enabled, db={:?}", cfg.db_path); + Some(Arc::new(qm)) + } + _ => None, + }; + debug!("LocalFilesystem created with root: {:?}", root_path); Ok(Self { root_path, handle_manager, root_handle, + quota_manager, }) } + /// Access the quota manager, if one is configured. + #[allow(dead_code)] + pub fn quota_manager(&self) -> Option<&QuotaManager> { + self.quota_manager.as_deref() + } + + /// Resolve a path to its owning quota directory, if any. Returns the + /// quota manager together with the directory name so callers can run + /// check/add/sub without repeated plumbing. + /// + /// The path is canonicalised first so a symlink that crosses PVCs + /// (e.g. `pvc-a/link -> pvc-b/data`) is accounted to the PVC where + /// the data actually lives, not the one the link sits in. When the + /// path does not exist yet — the common case for `write()` on a + /// fresh file — canonicalisation fails and we fall back to the + /// literal path, which is also where the new file will land. There + /// is a small TOCTOU window between this lookup and the underlying + /// FS call, but exploiting it requires racing two NFS operations + /// to swap a regular file for a symlink at exactly the right + /// moment, which is bounded and benign for the PVC use case. + async fn quota_target(&self, path: &Path) -> Option<(&QuotaManager, String)> { + let qm = self.quota_manager.as_ref()?; + let canonical = tokio_fs::canonicalize(path).await.ok(); + let lookup = canonical.as_deref().unwrap_or(path); + let dir = qm.resolve_quota_dir(lookup)?; + // Only return a target if a quota is actually configured for this dir. + qm.get_quota_info(&dir).await?; + Some((qm, dir)) + } + + /// Return the first-level quota-directory name for `path`, or `None` + /// when no quota manager is configured or the path sits at the export + /// root. Unlike [`quota_target`] this does not require the directory + /// to have an active quota entry — rename needs to compare both sides + /// even when only one side is tracked. + fn quota_dir_of(&self, path: &Path) -> Option { + self.quota_manager.as_ref()?.resolve_quota_dir(path) + } + /// Resolve a file handle to a full path fn resolve_handle(&self, handle: &FileHandle) -> Result { self.handle_manager @@ -352,6 +420,46 @@ impl Filesystem for LocalFilesystem { async fn write(&self, handle: &FileHandle, offset: u64, data: &[u8]) -> Result { let path = self.resolve_handle(handle)?; + // File handles map to paths and are not invalidated when the + // path is replaced with a symlink. Re-validate the resolved + // path so a stale handle cannot be combined with a symlink + // swap to write outside the export root — `OpenOptions::open` + // below follows symlinks, so without this check a client + // could escape the exported tree via the resolved target. + self.validate_path(&path)?; + + // Quota: charge by allocated bytes (st_blocks * 512), not by logical + // length. A client cannot bypass the quota by extending a file with + // setattr_size and then writing into the existing logical range, + // because hole-filling writes increase the on-disk footprint even + // when the logical size is unchanged. + let quota_target = self.quota_target(&path).await; + let old_alloc = if quota_target.is_some() { + // write() opens with `tokio_fs::OpenOptions` which follows + // symlinks, so account for the resolved target's footprint + // — otherwise a symlink in a quota dir lets the client + // bypass enforcement by writing through the link. + allocated_bytes_following(&path).await? + } else { + 0 + }; + if let Some((qm, dir)) = quota_target.as_ref() { + // Pre-check is conservative: assume the entire write will land + // in freshly allocated blocks. We trade two things to keep + // the hot path simple: + // * Overwrites in a near-full PVC may be rejected with + // EDQUOT even though the actual on-disk usage would not + // grow (e.g. rewriting an existing block with new data). + // A precise check would need fiemap/SEEK_HOLE per + // write, which is Linux-specific and slow. + // * Concurrent writers can each pass this check against + // the same usage snapshot — see the concurrency note + // on `QuotaManager::check_quota`. The post-write + // `add_usage` accounts for the real delta either way, + // so over-shoot is bounded and reconciliation repairs + // it. + qm.check_quota(dir, data.len() as u64).await?; + } let mut file = tokio_fs::OpenOptions::new() .write(true) @@ -372,6 +480,37 @@ impl Filesystem for LocalFilesystem { // Flush to disk file.sync_all().await.context("Failed to sync file")?; + // Quota: apply the real delta in allocated bytes after the write. + // The data has already been written and synced; surfacing a quota + // bookkeeping error here would tell the client the WRITE failed + // even though bytes are on disk, and the QuotaManager rolls back + // its cache on persist failure so future checks would also be + // wrong. Log it as a degraded state instead and let background + // reconciliation repair the drift. + if let Some((qm, dir)) = quota_target.as_ref() { + match allocated_bytes_following(&path).await { + Ok(new_alloc) => { + let delta = new_alloc.saturating_sub(old_alloc); + if delta > 0 + && let Err(err) = qm.add_usage(dir, delta).await + { + warn!( + "WRITE quota accounting failed after data was persisted: \ + path={:?} dir={} delta={} error={:#}", + path, dir, delta, err + ); + } + } + Err(err) => { + warn!( + "WRITE quota accounting skipped (stat failed) after \ + data was persisted: path={:?} dir={} error={:#}", + path, dir, err + ); + } + } + } + debug!( "WRITE: {:?} offset={} count={} -> {} bytes", path, @@ -385,6 +524,24 @@ impl Filesystem for LocalFilesystem { async fn setattr_size(&self, handle: &FileHandle, size: u64) -> Result<()> { let path = self.resolve_handle(handle)?; + // See `write()` for the rationale: validate the resolved path + // again so a stale handle + symlink swap cannot truncate or + // extend a file outside the export root. + self.validate_path(&path)?; + + // Quota: track using allocated bytes, consistent with write(). + // Truncating down releases blocks; sparse extension allocates + // none, so the delta is naturally zero — no special-case needed + // for the "extend" branch. + let quota_target = self.quota_target(&path).await; + let old_alloc = if quota_target.is_some() { + // setattr_size opens through the symlink (set_len follows), + // so quota tracking must look at the same target the kernel + // actually truncates. + allocated_bytes_following(&path).await? + } else { + 0 + }; let file = tokio_fs::OpenOptions::new() .write(true) @@ -396,6 +553,33 @@ impl Filesystem for LocalFilesystem { .await .context("Failed to set file size")?; + // Quota release is best-effort: the truncate has already taken + // effect on disk, so a redb error here must not propagate as a + // SETATTR failure (the client would re-issue and observe the new + // size anyway). See WRITE for the same rationale. + if let Some((qm, dir)) = quota_target { + match allocated_bytes_following(&path).await { + Ok(new_alloc) if new_alloc < old_alloc => { + let freed = old_alloc - new_alloc; + if let Err(err) = qm.sub_usage(&dir, freed).await { + warn!( + "SETATTR quota release failed after truncate: \ + path={:?} dir={} freed={} error={:#}", + path, dir, freed, err + ); + } + } + Ok(_) => {} + Err(err) => { + warn!( + "SETATTR quota release skipped (stat failed) after \ + truncate: path={:?} dir={} error={:#}", + path, dir, err + ); + } + } + } + debug!("SETATTR: {:?} size={}", path, size); Ok(()) @@ -478,12 +662,39 @@ impl Filesystem for LocalFilesystem { // Validate path is within export root self.validate_path(&full_path)?; + // Quota: figure out how many bytes this unlink will actually free. + // `refundable_bytes_on_unlink` accounts for symlinks (refund the + // link, not the target) and hard links (refund nothing while + // `nlink > 1`, otherwise a hard-linked file could be used to + // reclaim quota the data still occupies on disk). + let quota_target = self.quota_target(&full_path).await; + let freed_bytes = if quota_target.is_some() { + refundable_bytes_on_unlink(&full_path).await? + } else { + 0 + }; + // Remove file tokio_fs::remove_file(&full_path) .await .context(format!("Failed to remove file: {:?}", full_path))?; - debug!("REMOVE: {:?}", full_path); + // Quota release is best-effort: the file is already gone, so a + // redb error here must not propagate as a REMOVE failure (the + // client would re-issue and observe NOENT). See WRITE for the + // same rationale. + if let Some((qm, dir)) = quota_target + && freed_bytes > 0 + && let Err(err) = qm.sub_usage(&dir, freed_bytes).await + { + warn!( + "REMOVE quota release failed after unlink: \ + path={:?} dir={} freed={} error={:#}", + full_path, dir, freed_bytes, err + ); + } + + debug!("REMOVE: {:?} (freed {} bytes)", full_path, freed_bytes); Ok(()) } @@ -568,6 +779,49 @@ impl Filesystem for LocalFilesystem { self.validate_path(&from_full_path)?; self.validate_path(&to_full_path)?; + // Quota: only need to do work when the rename crosses a quota + // boundary. Within the same quota directory (or between two + // untracked directories) the usage is unchanged. Even when the + // first-level names differ, skip the (potentially expensive) + // recursive size walk if neither side has an active quota + // entry — a rename between untracked PVCs should not pay for it. + let from_quota = self.quota_dir_of(&from_full_path); + let to_quota = self.quota_dir_of(&to_full_path); + let cross_quota = from_quota != to_quota; + + let need_size = if cross_quota && let Some(ref qm) = self.quota_manager { + let from_tracked = match from_quota.as_ref() { + Some(d) => qm.get_quota_info(d).await.is_some(), + None => false, + }; + let to_tracked = match to_quota.as_ref() { + Some(d) => qm.get_quota_info(d).await.is_some(), + None => false, + }; + from_tracked || to_tracked + } else { + false + }; + + let size_bytes = if need_size { + // Compute the total byte footprint of the source before renaming. + let src = from_full_path.clone(); + tokio::task::spawn_blocking(move || allocated_path_size(&src)) + .await + .context("spawn_blocking failed")?? + } else { + 0 + }; + + if cross_quota + && size_bytes > 0 + && let Some(ref qm) = self.quota_manager + && let Some(ref dir) = to_quota + && qm.get_quota_info(dir).await.is_some() + { + qm.check_quota(dir, size_bytes).await?; + } + // Rename/move the file or directory tokio_fs::rename(&from_full_path, &to_full_path) .await @@ -576,6 +830,34 @@ impl Filesystem for LocalFilesystem { from_full_path, to_full_path ))?; + // Quota transfer is best-effort: the rename has already taken + // place on disk, so a redb error here must not propagate as a + // RENAME failure (the client would re-issue and observe NOENT + // on the source). See WRITE for the same rationale. + if cross_quota + && size_bytes > 0 + && let Some(ref qm) = self.quota_manager + { + if let Some(ref dir) = from_quota + && let Err(err) = qm.sub_usage(dir, size_bytes).await + { + warn!( + "RENAME quota release on source failed after rename: \ + dir={} bytes={} error={:#}", + dir, size_bytes, err + ); + } + if let Some(ref dir) = to_quota + && let Err(err) = qm.add_usage(dir, size_bytes).await + { + warn!( + "RENAME quota charge on target failed after rename: \ + dir={} bytes={} error={:#}", + dir, size_bytes, err + ); + } + } + debug!("RENAME: {:?} -> {:?}", from_full_path, to_full_path); Ok(()) @@ -796,6 +1078,271 @@ impl Filesystem for LocalFilesystem { let handle = self.handle_manager.create_handle(file_path.clone()); Ok(handle) } + + async fn statvfs(&self, handle: &FileHandle) -> Result { + let path = self.resolve_handle(handle)?; + let path_owned = path.clone(); + + // Always fetch the real filesystem stats: we reuse its inode fields + // unconditionally, and its byte fields when no quota applies. + let real_stats = tokio::task::spawn_blocking(move || statvfs_on_path(&path_owned)) + .await + .context("spawn_blocking failed")??; + + if let Some(ref qm) = self.quota_manager + && let Some(dir) = qm.resolve_quota_dir(&path) + && let Some((limit, usage)) = qm.get_quota_info(&dir).await + { + let free = limit.saturating_sub(usage); + return Ok(FsStats { + total_bytes: limit, + free_bytes: free, + avail_bytes: free, + // Inode counts always come from the underlying filesystem. + total_files: real_stats.total_files, + free_files: real_stats.free_files, + avail_files: real_stats.avail_files, + }); + } + + Ok(real_stats) + } + + fn start_quota_reconciliation(&self) { + let Some(qm) = self.quota_manager.clone() else { + return; + }; + tokio::spawn(async move { + tracing::info!("Quota reconciliation task started"); + qm.reconcile_all().await; + tracing::info!("Quota reconciliation task finished"); + }); + } + + async fn apply_quota_bootstrap( + &self, + bootstrap: &std::collections::HashMap, + ) -> Result<()> { + let Some(ref qm) = self.quota_manager else { + if !bootstrap.is_empty() { + tracing::warn!( + "Quota bootstrap requested but quota is disabled; ignoring {} entries", + bootstrap.len() + ); + } + return Ok(()); + }; + qm.apply_bootstrap(bootstrap).await + } +} + +/// Logically normalize a path: collapse `.` and `..` components and +/// drop empty segments. Does not touch the filesystem and so is safe to +/// run on paths whose components do not exist yet — that is exactly the +/// case `check_db_path_outside_root` needs when the DB has not been +/// created. +/// +/// Leading `..` components on a relative path are preserved (`"../x"` +/// stays `"../x"`), since dropping them would silently change the +/// semantics of the path. On a rooted path, a `..` at the root is +/// discarded (`"/.."` resolves to `"/"`). +fn normalize_path(path: &Path) -> PathBuf { + use std::ffi::OsStr; + use std::path::Component; + let parent = OsStr::new(".."); + let mut out: Vec = Vec::new(); + let mut prefix: Option = None; + let mut has_root = false; + for c in path.components() { + match c { + Component::Prefix(p) => prefix = Some(p.as_os_str().to_os_string()), + Component::RootDir => has_root = true, + Component::CurDir => {} + Component::ParentDir => match out.last() { + Some(last) if last != parent => { + // Pop a real segment (cancel it out). + out.pop(); + } + _ if !has_root => { + // Either out is empty or the tail is already "..": + // on a relative path we must preserve the `..` so + // the path keeps its original anchor. + out.push(parent.to_os_string()); + } + _ => { + // Rooted path: `..` past the root collapses to root. + } + }, + Component::Normal(s) => out.push(s.to_os_string()), + } + } + let mut result = PathBuf::new(); + if let Some(p) = prefix { + result.push(p); + } + if has_root { + result.push("/"); + } + for s in out { + result.push(s); + } + result +} + +/// Reject quota DB paths that resolve under the export root. +/// +/// A DB inside the exported tree is reachable via NFS lookup/read/write/ +/// remove and would let any client tamper with the bookkeeping. The +/// containment check is done in absolute terms so a relative `db_path` +/// (whose parent might not exist yet) cannot slip past — we anchor it +/// against `current_dir` and normalize away `..` components before the +/// `starts_with` test. Existing paths/parents are still canonicalised to +/// resolve symlinks; only the literal-tail fallback uses normalisation. +fn check_db_path_outside_root(db_path: &Path, root_path: &Path) -> Result<()> { + // Anchor relative paths against the current working directory so the + // containment comparison is meaningful (root_path is absolute). + let abs_db = if db_path.is_absolute() { + db_path.to_path_buf() + } else { + std::env::current_dir() + .context("Failed to read current directory while validating quota db path")? + .join(db_path) + }; + + let canonical_db = if abs_db.exists() { + abs_db.canonicalize().context(format!( + "Failed to canonicalize quota db path: {:?}", + abs_db + ))? + } else { + let parent = abs_db + .parent() + .ok_or_else(|| anyhow!("Quota db_path has no parent directory: {:?}", abs_db))?; + let canonical_parent = if parent.as_os_str().is_empty() { + std::env::current_dir() + .context("Failed to read current directory while validating quota db path")? + } else if parent.exists() { + parent.canonicalize().context(format!( + "Failed to canonicalize quota db parent: {:?}", + parent + ))? + } else { + // Parent will be created by QuotaManager::new(); we cannot + // ask the kernel to resolve symlinks/`..` for us, so do a + // logical normalisation. The path is already absolute thanks + // to the anchor step above, so the comparison below is sound. + normalize_path(parent) + }; + let file_name = abs_db + .file_name() + .ok_or_else(|| anyhow!("Quota db_path has no file name component: {:?}", abs_db))?; + canonical_parent.join(file_name) + }; + + if canonical_db.starts_with(root_path) { + return Err(anyhow!( + "Quota db_path {:?} resolves inside the export root {:?}; \ + move the database outside the exported tree so NFS clients \ + cannot read or modify it", + db_path, + root_path + )); + } + Ok(()) +} + +/// Return the on-disk byte footprint of the **symlink-resolved** path, +/// computed from `st_blocks`. Used by content-mutating operations +/// (`write`, `setattr_size`) where the kernel itself follows the link +/// when opening — accounting must follow the same target so writes +/// through a symlink in a quota directory still increase tracked usage. +/// +/// `NotFound` maps to zero; other stat failures are propagated so a +/// transient I/O error cannot silently undercount quota usage. +async fn allocated_bytes_following(path: &Path) -> Result { + match tokio_fs::metadata(path).await { + Ok(m) => Ok(m.blocks().saturating_mul(512)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(0), + Err(e) => { + Err(anyhow::Error::from(e)).context(format!("Failed to stat for quota: {:?}", path)) + } + } +} + +/// Return how many bytes the quota should refund when `path` is unlinked. +/// +/// Quota is charged exclusively on operations that mutate regular-file +/// content (`write`, `setattr_size`, cross-quota `rename`). Refunds must +/// be the symmetric inverse of those charges, otherwise a client can +/// drift the tracked usage below reality: +/// +/// * **Regular file, last link (`nlink == 1`):** unlinking truly frees +/// the inode's blocks → refund `blocks * 512`. +/// * **Regular file with surviving hard link (`nlink > 1`):** the inode +/// and its blocks remain reachable via another name (possibly in a +/// different PVC, since `link()` bypasses quota_dir attribution). +/// No bytes are freed → refund 0. +/// * **Symlinks, FIFOs, sockets, devices:** these are *not* charged at +/// creation time (the FSAL's `symlink`/`mknod` paths skip the quota +/// accounting). Refunding their `st_blocks` would let a client +/// create then remove long symlinks (each occupying a real block on +/// most filesystems) to depress the usage counter and reclaim quota +/// for data they never paid for. Refund 0 to keep the create/remove +/// accounting symmetric. +/// +/// There is a small TOCTOU between this stat and the actual `unlink`, +/// but it cannot introduce a leak in the dangerous direction — at +/// worst a refund-of-blocks gets skipped that should have been issued, +/// which background reconciliation will repair. +/// +/// `NotFound` maps to zero; other stat failures are propagated so a +/// transient I/O error cannot silently undercount quota usage. +async fn refundable_bytes_on_unlink(path: &Path) -> Result { + match tokio_fs::symlink_metadata(path).await { + Ok(m) if m.is_file() && m.nlink() == 1 => Ok(m.blocks().saturating_mul(512)), + Ok(_) => Ok(0), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(0), + Err(e) => { + Err(anyhow::Error::from(e)).context(format!("Failed to stat for quota: {:?}", path)) + } + } +} + +/// Call `libc::statvfs` on `path` and convert the result into `FsStats`. +fn statvfs_on_path(path: &Path) -> Result { + use std::ffi::CString; + use std::os::unix::ffi::OsStrExt; + + let c_path = + CString::new(path.as_os_str().as_bytes()).context("Path contains interior NUL byte")?; + + // Safety: zero-initializing libc::statvfs is valid; the struct is a POD + // descriptor that the kernel fills in. + let mut stat: libc::statvfs = unsafe { std::mem::zeroed() }; + let rc = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) }; + if rc != 0 { + let err = std::io::Error::last_os_error(); + return Err(anyhow!("statvfs failed for {:?}: {}", path, err)); + } + + // f_frsize is the fragment size used for f_blocks/f_bfree/f_bavail. + // Linux always populates it, but some exotic backends report 0; fall + // back to the logical block size (f_bsize) so FSSTAT never returns + // a bogus 0-byte filesystem. Multiplications use saturating_mul as a + // belt-and-braces guard against overflow on huge filesystems. + let block_size = if stat.f_frsize == 0 { + stat.f_bsize + } else { + stat.f_frsize + }; + Ok(FsStats { + total_bytes: stat.f_blocks.saturating_mul(block_size), + free_bytes: stat.f_bfree.saturating_mul(block_size), + avail_bytes: stat.f_bavail.saturating_mul(block_size), + total_files: stat.f_files, + free_files: stat.f_ffree, + avail_files: stat.f_favail, + }) } #[cfg(test)] @@ -806,7 +1353,7 @@ mod tests { /// Helper: Create a test filesystem with a temporary directory fn create_test_fs() -> (LocalFilesystem, TempDir) { let temp_dir = TempDir::new().expect("Failed to create temp dir"); - let fs = LocalFilesystem::new(temp_dir.path()).expect("Failed to create filesystem"); + let fs = LocalFilesystem::new(temp_dir.path(), None).expect("Failed to create filesystem"); (fs, temp_dir) } @@ -1046,4 +1593,796 @@ mod tests { "Multiple lookups should return same handle" ); } + + #[tokio::test] + async fn test_statvfs_returns_real_stats() { + let (fs, _temp_dir) = create_test_fs(); + let root = fs.root_handle().await; + + let stats = fs.statvfs(&root).await.expect("statvfs should succeed"); + + // We can't predict exact values across different hosts, but a real + // filesystem will report non-zero totals and some non-zero inode + // capacity; available <= free <= total. + assert!(stats.total_bytes > 0, "total_bytes should be non-zero"); + assert!(stats.free_bytes <= stats.total_bytes); + assert!(stats.avail_bytes <= stats.free_bytes); + assert!(stats.total_files > 0, "total_files should be non-zero"); + assert!(stats.free_files <= stats.total_files); + assert!(stats.avail_files <= stats.free_files); + } + + #[tokio::test] + async fn test_statvfs_invalid_handle() { + let (fs, _temp_dir) = create_test_fs(); + let bogus: FileHandle = vec![0xAA; 32]; + + let result = fs.statvfs(&bogus).await; + assert!(result.is_err(), "statvfs with invalid handle should fail"); + } + + /// Helper: Create a test filesystem with a QuotaManager wired up. + /// Returns (fs, export_tempdir, db_tempdir) — all three temp dirs are + /// kept alive by the caller. + fn create_test_fs_with_quota() -> (LocalFilesystem, TempDir, TempDir) { + let export_dir = TempDir::new().expect("Failed to create export temp dir"); + let db_dir = TempDir::new().expect("Failed to create db temp dir"); + let quota_cfg = QuotaConfig { + enabled: true, + db_path: db_dir.path().join("quota.db"), + bootstrap: std::collections::HashMap::new(), + }; + let fs = LocalFilesystem::new(export_dir.path(), Some("a_cfg)) + .expect("Failed to create filesystem with quota"); + (fs, export_dir, db_dir) + } + + #[tokio::test] + async fn test_quota_manager_created_when_enabled() { + let (fs, _export, _db) = create_test_fs_with_quota(); + assert!( + fs.quota_manager().is_some(), + "Quota manager should exist when config.enabled=true" + ); + } + + #[tokio::test] + async fn test_quota_manager_absent_when_disabled() { + let export = TempDir::new().unwrap(); + let db = TempDir::new().unwrap(); + let quota_cfg = QuotaConfig { + enabled: false, + db_path: db.path().join("quota.db"), + bootstrap: std::collections::HashMap::new(), + }; + let fs = LocalFilesystem::new(export.path(), Some("a_cfg)).unwrap(); + assert!( + fs.quota_manager().is_none(), + "Quota manager should be absent when config.enabled=false" + ); + } + + #[tokio::test] + async fn test_quota_manager_absent_when_no_config() { + let export = TempDir::new().unwrap(); + let fs = LocalFilesystem::new(export.path(), None).unwrap(); + assert!(fs.quota_manager().is_none()); + } + + #[tokio::test] + async fn test_db_path_inside_export_root_is_rejected() { + let export = TempDir::new().unwrap(); + // Place the DB literally inside the exported tree — NFS clients + // could otherwise reach it via lookup/write/remove. + let cfg = QuotaConfig { + enabled: true, + db_path: export.path().join("quota.db"), + bootstrap: std::collections::HashMap::new(), + }; + match LocalFilesystem::new(export.path(), Some(&cfg)) { + Ok(_) => panic!("LocalFilesystem::new should reject db inside export"), + Err(e) => assert!( + e.to_string().contains("inside the export root"), + "got: {:#}", + e + ), + } + } + + #[tokio::test] + async fn test_db_path_in_nested_subdir_inside_export_is_rejected() { + let export = TempDir::new().unwrap(); + let nested = export.path().join("a/b/c"); + let cfg = QuotaConfig { + enabled: true, + db_path: nested.join("quota.db"), + bootstrap: std::collections::HashMap::new(), + }; + match LocalFilesystem::new(export.path(), Some(&cfg)) { + Ok(_) => panic!("nested-under-root db path should be rejected"), + Err(e) => assert!( + e.to_string().contains("inside the export root"), + "got: {:#}", + e + ), + } + } + + #[tokio::test] + async fn test_db_path_outside_export_is_accepted() { + let export = TempDir::new().unwrap(); + let db_dir = TempDir::new().unwrap(); + let cfg = QuotaConfig { + enabled: true, + db_path: db_dir.path().join("quota.db"), + bootstrap: std::collections::HashMap::new(), + }; + let fs = LocalFilesystem::new(export.path(), Some(&cfg)) + .expect("db outside export should be accepted"); + assert!(fs.quota_manager().is_some()); + } + + #[tokio::test] + async fn test_db_path_with_dotdot_resolving_inside_export_is_rejected() { + let export = TempDir::new().unwrap(); + // Construct a path that uses ".." but normalizes back into the + // export root: /decoy/../. The intermediate + // "decoy" directory is non-existent, so the old code took the + // literal-parent branch and missed the containment check. + let sneaky = export.path().join("decoy").join("..").join("quota.db"); + let cfg = QuotaConfig { + enabled: true, + db_path: sneaky, + bootstrap: std::collections::HashMap::new(), + }; + match LocalFilesystem::new(export.path(), Some(&cfg)) { + Ok(_) => panic!("dotdot-relative db inside export should be rejected"), + Err(e) => assert!( + e.to_string().contains("inside the export root"), + "got: {:#}", + e + ), + } + } + + #[test] + fn test_normalize_path_collapses_dotdot() { + assert_eq!( + normalize_path(Path::new("/tmp/foo/../bar")), + PathBuf::from("/tmp/bar") + ); + assert_eq!( + normalize_path(Path::new("/tmp/./foo")), + PathBuf::from("/tmp/foo") + ); + assert_eq!( + normalize_path(Path::new("/a/b/c/../../d")), + PathBuf::from("/a/d") + ); + } + + #[test] + fn test_normalize_path_preserves_relative() { + assert_eq!( + normalize_path(Path::new("foo/bar/../baz")), + PathBuf::from("foo/baz") + ); + } + + #[test] + fn test_normalize_path_empty_and_root() { + assert_eq!(normalize_path(Path::new("/")), PathBuf::from("/")); + assert_eq!(normalize_path(Path::new("")), PathBuf::from("")); + } + + #[test] + fn test_normalize_path_preserves_leading_parent_on_relative() { + // Relative paths must keep their leading "..": dropping them + // would silently change which directory the path resolves + // against once it gets joined with a base. + assert_eq!(normalize_path(Path::new("../x")), PathBuf::from("../x")); + assert_eq!( + normalize_path(Path::new("../../x")), + PathBuf::from("../../x") + ); + // Once a real segment is consumed by "..", further "..": + // the remaining one is preserved. + assert_eq!( + normalize_path(Path::new("foo/../../x")), + PathBuf::from("../x") + ); + } + + #[test] + fn test_normalize_path_dotdot_past_root_is_clamped() { + // On a rooted path, ".." that would escape the root has no + // effect — POSIX semantics treat the parent of "/" as "/". + assert_eq!(normalize_path(Path::new("/..")), PathBuf::from("/")); + assert_eq!(normalize_path(Path::new("/../foo")), PathBuf::from("/foo")); + assert_eq!(normalize_path(Path::new("/a/../../b")), PathBuf::from("/b")); + } + + #[tokio::test] + async fn test_statvfs_reports_quota_for_quota_dir() { + let (fs, export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + + // Create a quota-tracked subdirectory and set a 1 MiB quota on it. + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + let qm = fs.quota_manager().unwrap(); + qm.set_quota("pvc-a", 1024 * 1024).await.unwrap(); + qm.add_usage("pvc-a", 200 * 1024).await.unwrap(); + + // Look up the subdir via the FSAL so we get its handle. + let pvc_handle = fs.lookup(&root, "pvc-a").await.unwrap(); + let stats = fs.statvfs(&pvc_handle).await.unwrap(); + + assert_eq!(stats.total_bytes, 1024 * 1024); + assert_eq!(stats.free_bytes, 1024 * 1024 - 200 * 1024); + assert_eq!(stats.avail_bytes, stats.free_bytes); + // Inode counts still come from the real filesystem. + assert!(stats.total_files > 0); + + drop(export); + } + + #[tokio::test] + async fn test_statvfs_falls_back_outside_quota_dir() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + + // Root itself has no quota directory mapping — should return real stats. + let stats = fs.statvfs(&root).await.unwrap(); + assert!(stats.total_bytes > 0); + // Real filesystem: free <= total, avail <= free. + assert!(stats.free_bytes <= stats.total_bytes); + assert!(stats.avail_bytes <= stats.free_bytes); + } + + #[tokio::test] + async fn test_statvfs_for_file_inside_quota_dir() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + + fs.mkdir(&root, "pvc-b", 0o755).await.unwrap(); + let qm = fs.quota_manager().unwrap(); + qm.set_quota("pvc-b", 5000).await.unwrap(); + qm.add_usage("pvc-b", 1000).await.unwrap(); + + let dir = fs.lookup(&root, "pvc-b").await.unwrap(); + // Create a file inside the quota dir; statvfs on the file should + // still report the parent quota. + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + + let stats = fs.statvfs(&file).await.unwrap(); + assert_eq!(stats.total_bytes, 5000); + assert_eq!(stats.free_bytes, 4000); + } + + // -- Stage 5: quota enforcement --------------------------------------- + + // Quota usage is tracked in allocated bytes (st_blocks * 512). On + // tmpfs / ext4 the page size is 4 KiB, so the tests below use writes + // and limits in multiples of 4 KiB to keep expected values exact and + // independent of the underlying filesystem's rounding. + // + // Test-environment assumption: tempfile creates temp dirs on the + // OS default scratch filesystem (tmpfs on Linux CI, ext4 on most + // Linux distros) — both round 1-byte writes up to a 4 KiB block. + // If these tests are ever run on a filesystem with a different + // allocation unit (e.g. ZFS records, btrfs CoW, network FS) the + // exact comparisons below will need to be relaxed to ranges. + const BLOCK: usize = 4096; + + #[tokio::test] + async fn test_write_within_quota_succeeds() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (4 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + + let payload = vec![0u8; BLOCK]; + let written = fs.write(&file, 0, &payload).await.expect("write ok"); + assert_eq!(written as usize, payload.len()); + + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + } + + #[tokio::test] + async fn test_write_exceeds_quota_is_rejected() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", BLOCK as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + + let payload = vec![0u8; 2 * BLOCK]; + let err = fs + .write(&file, 0, &payload) + .await + .expect_err("write over quota should fail"); + assert!(err.to_string().contains("Quota exceeded"), "got: {}", err); + + // Nothing was accounted (and nothing was written, since we check before). + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some((BLOCK as u64, 0)) + ); + } + + #[tokio::test] + async fn test_write_overwriting_existing_data_does_not_double_count() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (4 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + + fs.write(&file, 0, &vec![0u8; BLOCK]).await.unwrap(); + // Overwrite the same block; allocated footprint is unchanged. + fs.write(&file, 0, &vec![1u8; BLOCK]).await.unwrap(); + + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + } + + #[tokio::test] + async fn test_remove_releases_quota() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (4 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; BLOCK]).await.unwrap(); + + fs.remove(&dir, "data.bin").await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, 0)) + ); + } + + #[tokio::test] + async fn test_remove_hardlink_does_not_refund_quota_until_last_unlink() { + // Hard link bypass attack: create file -> hard-link it -> + // remove the original. Without the nlink check, the FSAL would + // refund the data's blocks on the first unlink even though + // those blocks remain reachable via the surviving link, and + // the client could rewrite the same volume of data, doubling + // their effective quota each round. + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (4 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; BLOCK]).await.unwrap(); + // Sanity: one block was charged. + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + + // Add a second hard link to the same inode. + fs.link(&file, &dir, "data2.bin").await.unwrap(); + + // Removing the first name must NOT refund quota — the inode + // and its blocks survive via "data2.bin". + fs.remove(&dir, "data.bin").await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)), + "quota must not be refunded while another hard link still exists" + ); + + // Removing the last name finally frees the blocks; quota refunds. + fs.remove(&dir, "data2.bin").await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, 0)), + "last unlink frees the blocks → quota refunds" + ); + } + + #[tokio::test] + async fn test_remove_symlink_does_not_refund_quota() { + // Symlink bypass attack: symlink creation is *not* charged + // against quota, so refunding bytes on remove would let a + // client drift the usage counter below reality. The attacker + // writes legitimate data (charged), then creates and removes + // long symlinks repeatedly to claw back quota for storage + // they actually paid for, eventually doubling their effective + // budget. + let (fs, export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (8 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; BLOCK]).await.unwrap(); + let charged = fs + .quota_manager() + .unwrap() + .get_quota_info("pvc-a") + .await + .unwrap(); + assert_eq!(charged, ((8 * BLOCK) as u64, BLOCK as u64)); + + // Create a symlink with a long target so the kernel has to + // allocate at least one block for it on tmpfs/ext4 (fast + // symlinks fit inline, so a long target ensures st_blocks > 0). + // Note: the FSAL's `symlink` API does not charge quota, mirror + // that here by going through the host filesystem directly. + let long_target = "x".repeat(200); + std::os::unix::fs::symlink(&long_target, export.path().join("pvc-a/long-link")).unwrap(); + + // Remove the symlink via the FSAL — must NOT refund anything. + fs.remove(&dir, "long-link").await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(charged), + "removing a symlink must not refund quota — symlink \ + creation is not charged, so refunding here would let a \ + client drift the usage counter below reality" + ); + } + + #[tokio::test] + async fn test_write_rejects_handle_after_symlink_swap_outside_export() { + // File handles map to paths and are not invalidated when the + // path is replaced. If a client gets a handle for a real file, + // then (out of band) replaces the file with a symlink pointing + // outside the export, the resolved path would still look fine + // to `resolve_handle` — but the kernel-level open follows the + // symlink. validate_path() must catch this so WRITE cannot + // escape the exported tree. + let (fs, _export) = create_test_fs(); + let root = fs.root_handle().await; + let original_handle = fs.create(&root, "victim.bin", 0o644).await.unwrap(); + + // Pretend the file got swapped for a symlink to /etc/hostname, + // a path that is guaranteed to exist on Linux test runners and + // is definitely outside the temp export. + let victim_path = _export.path().join("victim.bin"); + std::fs::remove_file(&victim_path).unwrap(); + std::os::unix::fs::symlink("/etc/hostname", &victim_path).unwrap(); + + let err = fs + .write(&original_handle, 0, b"pwned") + .await + .expect_err("WRITE through swapped symlink should be rejected"); + assert!( + err.to_string().contains("outside export root"), + "got: {:#}", + err + ); + + // /etc/hostname must still contain its original content. + let hostname = std::fs::read_to_string("/etc/hostname").unwrap(); + assert!( + !hostname.contains("pwned"), + "WRITE leaked outside export: /etc/hostname now contains the payload" + ); + } + + #[tokio::test] + async fn test_setattr_size_rejects_handle_after_symlink_swap_outside_export() { + // Same attack, exercised against SETATTR truncate: a stale + // handle plus a symlink swap could otherwise let a client + // truncate any file outside the export tree. + let (fs, _export) = create_test_fs(); + let root = fs.root_handle().await; + let handle = fs.create(&root, "victim.bin", 0o644).await.unwrap(); + + let victim_path = _export.path().join("victim.bin"); + std::fs::remove_file(&victim_path).unwrap(); + std::os::unix::fs::symlink("/etc/hostname", &victim_path).unwrap(); + + let err = fs + .setattr_size(&handle, 0) + .await + .expect_err("SETATTR size through swapped symlink should be rejected"); + assert!( + err.to_string().contains("outside export root"), + "got: {:#}", + err + ); + + // /etc/hostname must still be a normal non-empty file. + let meta = std::fs::metadata("/etc/hostname").unwrap(); + assert!( + meta.len() > 0, + "SETATTR leaked outside export: /etc/hostname was truncated" + ); + } + + #[tokio::test] + async fn test_truncate_down_releases_quota() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (8 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "data.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; 4 * BLOCK]).await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((8 * BLOCK) as u64, (4 * BLOCK) as u64)) + ); + + // Truncate down to one block: 3 blocks worth of allocated bytes + // are released back to the quota. + fs.setattr_size(&file, BLOCK as u64).await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((8 * BLOCK) as u64, BLOCK as u64)) + ); + } + + #[tokio::test] + async fn test_truncate_up_is_not_tracked() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", 1000) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "sparse.bin", 0o644).await.unwrap(); + + // Extend far beyond the quota limit: sparse files don't consume + // real bytes, so the usage counter should stay at zero. Real + // writes into those holes are still blocked by write(). + fs.setattr_size(&file, 100_000).await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some((1000, 0)) + ); + } + + #[tokio::test] + async fn test_rename_within_same_quota_dir_does_not_change_usage() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.quota_manager() + .unwrap() + .set_quota("pvc-a", (4 * BLOCK) as u64) + .await + .unwrap(); + + let dir = fs.lookup(&root, "pvc-a").await.unwrap(); + let file = fs.create(&dir, "a.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; BLOCK]).await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + + fs.rename(&dir, "a.bin", &dir, "b.bin").await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + } + + #[tokio::test] + async fn test_rename_across_quota_dirs_transfers_usage() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.mkdir(&root, "pvc-b", 0o755).await.unwrap(); + let qm = fs.quota_manager().unwrap(); + qm.set_quota("pvc-a", (4 * BLOCK) as u64).await.unwrap(); + qm.set_quota("pvc-b", (4 * BLOCK) as u64).await.unwrap(); + + let dir_a = fs.lookup(&root, "pvc-a").await.unwrap(); + let dir_b = fs.lookup(&root, "pvc-b").await.unwrap(); + + let file = fs.create(&dir_a, "f.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; BLOCK]).await.unwrap(); + + fs.rename(&dir_a, "f.bin", &dir_b, "f.bin").await.unwrap(); + + assert_eq!( + qm.get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, 0)) + ); + assert_eq!( + qm.get_quota_info("pvc-b").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + } + + #[tokio::test] + async fn test_start_quota_reconciliation_runs_scan() { + let (fs, export, _db) = create_test_fs_with_quota(); + let qm = fs.quota_manager().unwrap(); + qm.set_quota("pvc-a", 1_000_000).await.unwrap(); + // Stale usage recorded in redb/in-memory. + qm.add_usage("pvc-a", 9999).await.unwrap(); + + // Put real files on disk totaling two blocks. Reconciliation now + // accounts in allocated bytes (st_blocks * 512), so block-aligned + // writes give a deterministic expected value. + let pvc_dir = export.path().join("pvc-a"); + std::fs::create_dir_all(&pvc_dir).unwrap(); + std::fs::write(pvc_dir.join("a.bin"), vec![0u8; BLOCK]).unwrap(); + std::fs::write(pvc_dir.join("b.bin"), vec![0u8; BLOCK]).unwrap(); + let expected_usage = (2 * BLOCK) as u64; + + fs.start_quota_reconciliation(); + + // Poll until the background task has reconciled. Use a short loop + // with a cap to avoid hanging a broken test. + let mut waited = 0u64; + loop { + let info = fs.quota_manager().unwrap().get_quota_info("pvc-a").await; + if info == Some((1_000_000, expected_usage)) { + break; + } + if waited > 2000 { + panic!("Reconciliation did not complete, last={:?}", info); + } + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + waited += 20; + } + } + + #[tokio::test] + async fn test_start_quota_reconciliation_no_quota_is_noop() { + let (fs, _export) = create_test_fs(); + // Must not panic even though no quota manager is configured. + fs.start_quota_reconciliation(); + } + + #[tokio::test] + async fn test_apply_quota_bootstrap_seeds_new_entries() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let mut bootstrap = std::collections::HashMap::new(); + bootstrap.insert("pvc-a".to_string(), "1MB".to_string()); + + fs.apply_quota_bootstrap(&bootstrap).await.unwrap(); + assert_eq!( + fs.quota_manager().unwrap().get_quota_info("pvc-a").await, + Some((1024 * 1024, 0)) + ); + } + + #[tokio::test] + async fn test_apply_quota_bootstrap_noop_when_quota_disabled() { + let (fs, _export) = create_test_fs(); + let mut bootstrap = std::collections::HashMap::new(); + bootstrap.insert("pvc-a".to_string(), "1MB".to_string()); + + // Should not error even though no quota manager is configured; + // the entries are logged and discarded. + fs.apply_quota_bootstrap(&bootstrap).await.unwrap(); + } + + #[tokio::test] + async fn test_rename_across_quota_dirs_rejected_when_target_full() { + let (fs, _export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.mkdir(&root, "pvc-b", 0o755).await.unwrap(); + let qm = fs.quota_manager().unwrap(); + qm.set_quota("pvc-a", (4 * BLOCK) as u64).await.unwrap(); + qm.set_quota("pvc-b", BLOCK as u64).await.unwrap(); + + let dir_a = fs.lookup(&root, "pvc-a").await.unwrap(); + let dir_b = fs.lookup(&root, "pvc-b").await.unwrap(); + + let file = fs.create(&dir_a, "f.bin", 0o644).await.unwrap(); + fs.write(&file, 0, &vec![0u8; 2 * BLOCK]).await.unwrap(); + + // Target quota is one block; source file occupies two blocks — + // the cross-quota transfer must be rejected. + let err = fs + .rename(&dir_a, "f.bin", &dir_b, "f.bin") + .await + .expect_err("rename into full quota should fail"); + assert!(err.to_string().contains("Quota exceeded"), "got: {}", err); + + // Source unchanged; target still empty. + assert_eq!( + qm.get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, (2 * BLOCK) as u64)) + ); + assert_eq!(qm.get_quota_info("pvc-b").await, Some((BLOCK as u64, 0))); + } + + #[tokio::test] + async fn test_write_through_cross_pvc_symlink_charges_target_quota() { + let (fs, export, _db) = create_test_fs_with_quota(); + let root = fs.root_handle().await; + fs.mkdir(&root, "pvc-a", 0o755).await.unwrap(); + fs.mkdir(&root, "pvc-b", 0o755).await.unwrap(); + let qm = fs.quota_manager().unwrap(); + qm.set_quota("pvc-a", (4 * BLOCK) as u64).await.unwrap(); + qm.set_quota("pvc-b", (4 * BLOCK) as u64).await.unwrap(); + + let dir_b = fs.lookup(&root, "pvc-b").await.unwrap(); + // Create the real target file inside pvc-b. + let target = fs.create(&dir_b, "data.bin", 0o644).await.unwrap(); + + // Hand-create a cross-PVC symlink (the FSAL symlink op validates + // names but not targets, and we do not need to go through it for + // this test — point pvc-a/link at pvc-b/data.bin directly on the + // host filesystem to set up the scenario). + std::os::unix::fs::symlink( + export.path().join("pvc-b").join("data.bin"), + export.path().join("pvc-a").join("link"), + ) + .unwrap(); + + // Look up the link via the FSAL so we get a handle for it. + let dir_a = fs.lookup(&root, "pvc-a").await.unwrap(); + let link_handle = fs.lookup(&dir_a, "link").await.unwrap(); + + // Write through the link. The kernel will follow the symlink and + // mutate pvc-b/data.bin; quota_target now canonicalises first, + // so the bytes must be charged to pvc-b, not pvc-a. + fs.write(&link_handle, 0, &vec![0u8; BLOCK]).await.unwrap(); + // Sanity: the target's blocks really did grow. + let target_alloc = fs.statvfs(&target).await.unwrap(); + assert!(target_alloc.free_bytes < (4 * BLOCK) as u64); + + assert_eq!( + qm.get_quota_info("pvc-a").await, + Some(((4 * BLOCK) as u64, 0)) + ); + assert_eq!( + qm.get_quota_info("pvc-b").await, + Some(((4 * BLOCK) as u64, BLOCK as u64)) + ); + } } diff --git a/src/fsal/mod.rs b/src/fsal/mod.rs index d93cc2c..189202c 100644 --- a/src/fsal/mod.rs +++ b/src/fsal/mod.rs @@ -5,6 +5,7 @@ pub mod handle; pub mod local; +pub mod quota; // Future backends (uncomment when implemented) // #[cfg(feature = "s3")] @@ -14,8 +15,10 @@ pub mod local; // #[cfg(test)] // pub mod memory; +use crate::config::QuotaConfig; use anyhow::Result; use async_trait::async_trait; +use std::collections::HashMap; use std::path::PathBuf; #[allow(unused_imports)] @@ -89,6 +92,27 @@ pub struct DirEntry { pub file_type: FileType, } +/// Filesystem space statistics +/// +/// Mirrors the fields returned by NFSv3 FSSTAT (procedure 18). +/// Byte counts may reflect a folder quota when one applies; inode counts +/// always come from the underlying filesystem. +#[derive(Debug, Clone, Copy)] +pub struct FsStats { + /// Total bytes available (from quota or underlying FS) + pub total_bytes: u64, + /// Free bytes + pub free_bytes: u64, + /// Bytes available to non-privileged users + pub avail_bytes: u64, + /// Total number of inodes + pub total_files: u64, + /// Free inodes + pub free_files: u64, + /// Inodes available to non-privileged users + pub avail_files: u64, +} + /// Filesystem trait /// /// This trait defines the interface that all filesystem backends must implement. @@ -309,6 +333,31 @@ pub trait Filesystem: Send + Sync { mode: u32, rdev: (u32, u32), ) -> Result; + + /// Return filesystem space statistics for the subtree identified by `handle`. + /// + /// Used by NFS FSSTAT. Byte counts reflect a configured folder quota + /// when applicable; inode counts always come from the underlying + /// filesystem. + async fn statvfs(&self, handle: &FileHandle) -> Result; + + /// Start a background reconciliation task that scans tracked quota + /// directories and corrects usage drift. + /// + /// The default implementation does nothing. Backends that support + /// quotas override this to spawn their own task. + fn start_quota_reconciliation(&self) {} + + /// Apply a declarative quota bootstrap at startup. Entries install + /// a quota only when the target directory has none yet; already + /// tracked directories are left alone so the bootstrap is + /// idempotent across restarts. + /// + /// The default implementation does nothing. Backends that support + /// quotas override this. + async fn apply_quota_bootstrap(&self, _bootstrap: &HashMap) -> Result<()> { + Ok(()) + } } /// Filesystem backend types @@ -334,6 +383,8 @@ pub struct BackendConfig { pub backend_type: BackendType, /// Root path for local backend pub local_root: Option, + /// Quota configuration (applied to the local backend when enabled) + pub quota: Option, /// S3 configuration (future) #[allow(dead_code)] pub s3_config: Option, @@ -366,11 +417,18 @@ impl BackendConfig { Self { backend_type: BackendType::Local, local_root: Some(root.into()), + quota: None, s3_config: None, ceph_config: None, } } + /// Attach quota configuration to this backend configuration. + pub fn with_quota(mut self, quota: QuotaConfig) -> Self { + self.quota = Some(quota); + self + } + /// Create filesystem instance from configuration pub fn create_filesystem(&self) -> Result> { match self.backend_type { @@ -379,7 +437,7 @@ impl BackendConfig { .local_root .as_ref() .ok_or_else(|| anyhow::anyhow!("Local root path not configured"))?; - let fs = LocalFilesystem::new(root)?; + let fs = LocalFilesystem::new(root, self.quota.as_ref())?; Ok(Box::new(fs)) } BackendType::S3 => { diff --git a/src/fsal/quota.rs b/src/fsal/quota.rs new file mode 100644 index 0000000..517d685 --- /dev/null +++ b/src/fsal/quota.rs @@ -0,0 +1,973 @@ +// Folder Quota Manager +// +// Provides per-subdirectory byte quotas persisted to a redb database, with +// an in-memory cache for fast quota checks on the hot path. +// +// Quotas are keyed by the first-level subdirectory name under the export +// root (e.g. a PVC folder). Operations that cross into a quota directory +// consult this manager before and after the underlying filesystem call. + +use anyhow::{Context, Result, anyhow}; +use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::debug; + +/// redb table: first-level directory name -> (limit_bytes, usage_bytes) +const QUOTA_TABLE: TableDefinition<&str, (u64, u64)> = TableDefinition::new("quotas"); + +/// In-memory view of a single quota entry. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct QuotaEntry { + pub limit: u64, + pub usage: u64, +} + +/// Manages folder quotas with redb persistence and an in-memory cache. +/// +/// Reads go to the cache for low latency. Writes update the cache first, +/// then synchronously persist to redb via `spawn_blocking` so the tokio +/// runtime is not blocked on disk I/O. +pub struct QuotaManager { + /// Canonical path to the export root. + root_path: PathBuf, + /// In-memory cache keyed by first-level subdirectory name. + entries: Arc>>, + /// Persistent redb handle. + db: Arc, +} + +impl QuotaManager { + /// Open or create the redb database at `db_path` and load all quota + /// entries into the in-memory cache. + /// + /// Creates the parent directory of `db_path` if it does not exist. + pub fn new(db_path: impl AsRef, root_path: PathBuf) -> Result { + let db_path = db_path.as_ref(); + + if let Some(parent) = db_path.parent() + && !parent.as_os_str().is_empty() + && !parent.exists() + { + std::fs::create_dir_all(parent) + .context(format!("Failed to create quota DB directory: {:?}", parent))?; + } + + let db = + Database::create(db_path).context(format!("Failed to open quota DB: {:?}", db_path))?; + + // Ensure the table exists. + { + let txn = db.begin_write().context("Failed to begin write txn")?; + { + let _ = txn + .open_table(QUOTA_TABLE) + .context("Failed to open quota table")?; + } + txn.commit().context("Failed to commit table init")?; + } + + // Populate the in-memory cache from existing entries. Stored keys + // are also re-validated here as a defence in depth: a bad key + // (manual edit, file corruption, older buggy version) must not + // sneak into the cache, because reconciliation would later join + // it onto `root_path` and walk a path outside the export tree. + let mut entries: HashMap = HashMap::new(); + let mut skipped: u64 = 0; + { + let txn = db.begin_read().context("Failed to begin read txn")?; + let table = txn + .open_table(QUOTA_TABLE) + .context("Failed to open quota table")?; + for item in table.iter().context("Failed to iterate quota table")? { + let (key, value) = item.context("Failed to read quota entry")?; + let key_str = key.value().to_string(); + if let Err(e) = validate_quota_dir(&key_str) { + tracing::warn!("Quota: skipping unsafe stored key {:?}: {}", key_str, e); + skipped += 1; + continue; + } + let (limit, usage) = value.value(); + entries.insert(key_str, QuotaEntry { limit, usage }); + } + } + + debug!( + "Quota: loaded {} entries from {:?} (root={:?}, skipped={})", + entries.len(), + db_path, + root_path, + skipped + ); + + Ok(Self { + root_path, + entries: Arc::new(RwLock::new(entries)), + db: Arc::new(db), + }) + } + + /// Given an absolute path inside the export, return the first-level + /// subdirectory name. Returns `None` if the path is the root itself + /// or does not live under the export root. + pub fn resolve_quota_dir(&self, path: &Path) -> Option { + let relative = path.strip_prefix(&self.root_path).ok()?; + let first = relative.components().next()?; + Some(first.as_os_str().to_string_lossy().into_owned()) + } + + /// Check whether adding `additional_bytes` to `quota_dir` would exceed + /// its configured limit. Returns `Ok(())` when the directory has no + /// quota (not tracked). + /// + /// Concurrency note: this is an **advisory** pre-check. The read lock + /// is dropped before the caller actually adds usage, so two writers + /// can both pass `check_quota` against the same usage snapshot and + /// then both call [`add_usage`], briefly exceeding the limit by up + /// to `(N - 1) * max_write_size` for `N` concurrent writers. For the + /// PVC use case this is acceptable — a single client typically owns + /// each PVC and writes are bounded to ~1 MiB chunks; reconciliation + /// (or a subsequent over-limit write) repairs the drift. Strict + /// atomicity would require a check-and-reserve API that holds the + /// write lock across both steps, at the cost of more contention. + pub async fn check_quota(&self, quota_dir: &str, additional_bytes: u64) -> Result<()> { + let entries = self.entries.read().await; + if let Some(entry) = entries.get(quota_dir) { + let projected = entry.usage.saturating_add(additional_bytes); + if projected > entry.limit { + return Err(anyhow!( + "Quota exceeded for '{}': {} + {} > {}", + quota_dir, + entry.usage, + additional_bytes, + entry.limit + )); + } + } + Ok(()) + } + + /// Increase the tracked usage of `quota_dir` by `bytes` and persist + /// the new value. No-op for directories without a quota. + /// + /// The write lock is held across the redb commit so the in-memory + /// cache is never observable in a state that has not also been + /// persisted; if persistence fails the cache is rolled back before + /// the lock is released. + pub async fn add_usage(&self, quota_dir: &str, bytes: u64) -> Result<()> { + let mut entries = self.entries.write().await; + + let (limit, old_usage, new_usage) = match entries.get_mut(quota_dir) { + Some(entry) => { + let old = entry.usage; + let new = old.saturating_add(bytes); + entry.usage = new; + (entry.limit, old, new) + } + None => return Ok(()), + }; + + if let Err(e) = self + .persist_entry(quota_dir.to_string(), limit, new_usage) + .await + { + if let Some(entry) = entries.get_mut(quota_dir) { + entry.usage = old_usage; + } + return Err(e); + } + Ok(()) + } + + /// Decrease the tracked usage of `quota_dir` by `bytes` (saturating) + /// and persist. No-op for directories without a quota. + /// + /// The write lock is held across the redb commit; see [`add_usage`] + /// for the consistency rationale. + pub async fn sub_usage(&self, quota_dir: &str, bytes: u64) -> Result<()> { + let mut entries = self.entries.write().await; + + let (limit, old_usage, new_usage) = match entries.get_mut(quota_dir) { + Some(entry) => { + let old = entry.usage; + let new = old.saturating_sub(bytes); + entry.usage = new; + (entry.limit, old, new) + } + None => return Ok(()), + }; + + if let Err(e) = self + .persist_entry(quota_dir.to_string(), limit, new_usage) + .await + { + if let Some(entry) = entries.get_mut(quota_dir) { + entry.usage = old_usage; + } + return Err(e); + } + Ok(()) + } + + /// Set or update the quota limit for a directory. Preserves existing + /// usage if the entry already exists; initializes to zero otherwise. + /// + /// On persistence failure the cache is restored to its previous + /// state (either the old limit, or removal of a freshly inserted + /// entry). + pub async fn set_quota(&self, quota_dir: &str, limit: u64) -> Result<()> { + validate_quota_dir(quota_dir)?; + + let mut entries = self.entries.write().await; + + let previous_limit = entries.get(quota_dir).map(|e| e.limit); + let entry = entries + .entry(quota_dir.to_string()) + .or_insert(QuotaEntry { limit: 0, usage: 0 }); + entry.limit = limit; + let usage = entry.usage; + + if let Err(e) = self + .persist_entry(quota_dir.to_string(), limit, usage) + .await + { + match previous_limit { + Some(old) => { + if let Some(entry) = entries.get_mut(quota_dir) { + entry.limit = old; + } + } + None => { + entries.remove(quota_dir); + } + } + return Err(e); + } + Ok(()) + } + + /// Remove a quota entry entirely. + /// + /// The cache entry is reinstated if the redb removal fails so the + /// in-memory and on-disk views stay in sync. + /// + /// Currently only exercised by unit tests — no runtime code path + /// removes a quota — but the API is kept ready for an admin tool + /// that needs to retire a PVC entry without restarting the server. + #[allow(dead_code)] + pub async fn remove_quota(&self, quota_dir: &str) -> Result<()> { + let mut entries = self.entries.write().await; + + let removed = entries.remove(quota_dir); + + let db = self.db.clone(); + let key = quota_dir.to_string(); + let result = tokio::task::spawn_blocking(move || -> Result<()> { + let txn = db.begin_write().context("Failed to begin write txn")?; + { + let mut table = txn + .open_table(QUOTA_TABLE) + .context("Failed to open quota table")?; + table + .remove(key.as_str()) + .context("Failed to remove quota entry")?; + } + txn.commit().context("Failed to commit quota removal")?; + Ok(()) + }) + .await + .context("Failed to run DB task")?; + + if let Err(e) = result { + if let Some(entry) = removed { + entries.insert(quota_dir.to_string(), entry); + } + return Err(e); + } + + Ok(()) + } + + /// Return `(limit, usage)` for the directory, or `None` if not tracked. + pub async fn get_quota_info(&self, quota_dir: &str) -> Option<(u64, u64)> { + let entries = self.entries.read().await; + entries.get(quota_dir).map(|e| (e.limit, e.usage)) + } + + /// Return the list of tracked quota directory names. + pub async fn tracked_dirs(&self) -> Vec { + let entries = self.entries.read().await; + entries.keys().cloned().collect() + } + + /// Apply a declarative bootstrap map at startup. + /// + /// For each `(dir, size_str)` entry, installs a quota limit on `dir` + /// only when that directory does not already have a quota recorded. + /// This makes the bootstrap idempotent across restarts: changing the + /// config string after the first boot will not silently overwrite the + /// usage counter that is already being tracked. + pub async fn apply_bootstrap(&self, bootstrap: &HashMap) -> Result<()> { + for (dir, size_str) in bootstrap { + // Validate up front so a bad config is reported as a startup + // error rather than silently creating an orphan redb entry + // that reconciliation might later try to scan with a path + // like `/../escape`. + validate_quota_dir(dir) + .with_context(|| format!("Invalid bootstrap quota directory '{}'", dir))?; + + if self.get_quota_info(dir).await.is_some() { + tracing::debug!("Quota bootstrap '{}': already present, skipped", dir); + continue; + } + let limit = crate::config::parse_size(size_str) + .with_context(|| format!("Invalid bootstrap size for '{}'", dir))?; + self.set_quota(dir, limit).await?; + tracing::info!("Quota bootstrap: installed '{}' = {} bytes", dir, limit); + } + Ok(()) + } + + /// Walk the named quota directory on disk, recompute its true byte + /// footprint, and reconcile the tracked usage with it. Non-existent + /// directories reconcile to zero usage. + /// + /// Returns `Some((before, after))` when the entry existed (useful for + /// logging drift) or `None` if the directory has no quota configured. + pub async fn scan_and_reconcile(&self, quota_dir: &str) -> Result> { + // Snapshot the entry under a short-lived read lock so we can return + // early when there is nothing to reconcile. + if self.entries.read().await.get(quota_dir).is_none() { + return Ok(None); + } + + // Walk the directory without holding any quota lock — the scan is + // I/O-bound and we don't want to stall live writes. + let dir_path = self.root_path.join(quota_dir); + let scanned: u64 = tokio::task::spawn_blocking(move || allocated_path_size(&dir_path)) + .await + .context("spawn_blocking failed for scan")??; + + // Take the write lock and hold it across the redb commit. This + // prevents two races at once: + // * an interleaving add_usage/sub_usage from being clobbered by + // this reconciliation (lost-update); + // * the cache showing the new scanned value while persistence + // has actually failed (cache/DB drift). + let mut entries = self.entries.write().await; + + let (old_usage, new_usage, limit) = match entries.get_mut(quota_dir) { + Some(entry) => { + let old = entry.usage; + entry.usage = scanned; + (old, entry.usage, entry.limit) + } + // The entry was removed while we were scanning — nothing to do. + None => return Ok(None), + }; + + if let Err(e) = self + .persist_entry(quota_dir.to_string(), limit, new_usage) + .await + { + if let Some(entry) = entries.get_mut(quota_dir) { + entry.usage = old_usage; + } + return Err(e); + } + + Ok(Some((old_usage, new_usage))) + } + + /// Reconcile every tracked quota directory. Errors on individual + /// directories are logged and skipped so one missing PVC does not + /// prevent others from being scanned. + pub async fn reconcile_all(&self) { + let dirs = self.tracked_dirs().await; + for dir in dirs { + match self.scan_and_reconcile(&dir).await { + Ok(Some((before, after))) if before != after => { + tracing::info!( + "Quota reconcile '{}': usage {} -> {} ({:+})", + dir, + before, + after, + after as i128 - before as i128 + ); + } + Ok(_) => { + tracing::debug!("Quota reconcile '{}': unchanged", dir); + } + Err(e) => { + tracing::warn!("Quota reconcile '{}' failed: {}", dir, e); + } + } + } + } + + /// Persist a single entry to redb in a blocking task. + async fn persist_entry(&self, key: String, limit: u64, usage: u64) -> Result<()> { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || -> Result<()> { + let txn = db.begin_write().context("Failed to begin write txn")?; + { + let mut table = txn + .open_table(QUOTA_TABLE) + .context("Failed to open quota table")?; + table + .insert(key.as_str(), &(limit, usage)) + .context("Failed to insert quota entry")?; + } + txn.commit().context("Failed to commit quota update")?; + Ok(()) + }) + .await + .context("Failed to run DB task")??; + Ok(()) + } +} + +/// Reject quota directory keys that are not a single safe path component. +/// +/// `resolve_quota_dir()` only ever returns the first component of a path +/// relative to the export root, so a stored key with a path separator, +/// `..`, or an empty value can never match any live filesystem path — +/// it would just sit in redb as a phantom entry. Worse, reconciliation +/// joins the key onto the export root and walks it, which would let a +/// malformed config like `"../escape"` direct the scanner outside the +/// export tree. Validate at the ingress points (bootstrap, set_quota) +/// and again on load (defence in depth) to fail fast or skip cleanly. +fn validate_quota_dir(name: &str) -> Result<()> { + if name.is_empty() { + return Err(anyhow!("Quota directory name must not be empty")); + } + if name == "." || name == ".." { + return Err(anyhow!( + "Quota directory name must not be '.' or '..': got '{}'", + name + )); + } + if name.contains('/') || name.contains('\\') { + return Err(anyhow!( + "Quota directory name must be a single path component (no '/' or '\\\\'): got '{}'", + name + )); + } + if name.contains('\0') { + return Err(anyhow!( + "Quota directory name must not contain NUL bytes: got '{}'", + name.escape_default() + )); + } + Ok(()) +} + +/// Recursively sum the on-disk byte footprint (st_blocks * 512) of all +/// regular files rooted at `path`. The unit matches what `write()` and +/// `remove()` charge against the quota, so callers — reconciliation +/// (scanning a quota directory) and cross-quota rename (computing the +/// usage to transfer) — get consistent numbers. +/// +/// Behaviour: +/// * Missing path → returns `0` (e.g. a PVC directory was deleted +/// out of band before reconciliation runs). +/// * Plain file → returns its allocated bytes. +/// * Directory → recursive walk. `symlink_metadata` is used per entry +/// so a malicious symlink cannot escape the subtree or trap the +/// walk in a cycle. +/// * Other types (symlink, fifo, …) → not followed, contribute 0. +pub(crate) fn allocated_path_size(path: &Path) -> Result { + use std::os::unix::fs::MetadataExt; + + if !path.exists() { + return Ok(0); + } + + let meta = + std::fs::symlink_metadata(path).context(format!("Failed to stat path: {:?}", path))?; + if meta.is_file() { + return Ok(meta.blocks().saturating_mul(512)); + } + if !meta.is_dir() { + return Ok(0); + } + + let mut total: u64 = 0; + let mut stack = vec![path.to_path_buf()]; + while let Some(current) = stack.pop() { + let rd = std::fs::read_dir(¤t).context(format!( + "Failed to read dir while summing size: {:?}", + current + ))?; + for entry in rd { + let entry = entry?; + let entry_path = entry.path(); + let m = std::fs::symlink_metadata(&entry_path).context(format!( + "Failed to stat path while summing size: {:?}", + entry_path + ))?; + if m.is_dir() { + stack.push(entry_path); + } else if m.is_file() { + total = total.saturating_add(m.blocks().saturating_mul(512)); + } + } + } + Ok(total) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn make_manager(db_dir: &TempDir, root: &Path) -> QuotaManager { + let db_path = db_dir.path().join("quota.db"); + QuotaManager::new(db_path, root.to_path_buf()).expect("create quota manager") + } + + #[tokio::test] + async fn test_new_empty_db() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + assert!(qm.get_quota_info("anything").await.is_none()); + } + + #[tokio::test] + async fn test_set_and_get_quota() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 10 * 1024 * 1024).await.unwrap(); + + let info = qm.get_quota_info("pvc-a").await; + assert_eq!(info, Some((10 * 1024 * 1024, 0))); + } + + #[tokio::test] + async fn test_add_and_sub_usage() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 400).await.unwrap(); + qm.add_usage("pvc-a", 200).await.unwrap(); + assert_eq!(qm.get_quota_info("pvc-a").await, Some((1000, 600))); + + qm.sub_usage("pvc-a", 100).await.unwrap(); + assert_eq!(qm.get_quota_info("pvc-a").await, Some((1000, 500))); + } + + #[tokio::test] + async fn test_sub_usage_saturating() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 100).await.unwrap(); + qm.sub_usage("pvc-a", 500).await.unwrap(); + assert_eq!(qm.get_quota_info("pvc-a").await, Some((1000, 0))); + } + + #[tokio::test] + async fn test_check_quota_under_limit() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 400).await.unwrap(); + + qm.check_quota("pvc-a", 500).await.unwrap(); // 400 + 500 = 900 <= 1000 + qm.check_quota("pvc-a", 601).await.unwrap_err(); // 400 + 601 > 1000 + } + + #[tokio::test] + async fn test_check_quota_at_boundary() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 600).await.unwrap(); + + qm.check_quota("pvc-a", 400).await.unwrap(); // exactly at limit + qm.check_quota("pvc-a", 401).await.unwrap_err(); + } + + #[tokio::test] + async fn test_check_quota_untracked_dir_allows() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + // No quota configured, any write is allowed. + qm.check_quota("untracked", u64::MAX).await.unwrap(); + } + + #[tokio::test] + async fn test_check_quota_error_message_contains_quota_exceeded() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 100).await.unwrap(); + let err = qm.check_quota("pvc-a", 1000).await.unwrap_err(); + // NFS handlers rely on this substring to map to NFS3ERR_DQUOT. + assert!(err.to_string().contains("Quota exceeded"), "got: {}", err); + } + + #[tokio::test] + async fn test_remove_quota() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 500).await.unwrap(); + assert!(qm.get_quota_info("pvc-a").await.is_some()); + + qm.remove_quota("pvc-a").await.unwrap(); + assert!(qm.get_quota_info("pvc-a").await.is_none()); + } + + #[tokio::test] + async fn test_update_preserves_usage() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 500).await.unwrap(); + // Raise the limit; usage should stay. + qm.set_quota("pvc-a", 2000).await.unwrap(); + assert_eq!(qm.get_quota_info("pvc-a").await, Some((2000, 500))); + } + + #[tokio::test] + async fn test_persistence_across_reopen() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let db_path = tmp.path().join("quota.db"); + + { + let qm = QuotaManager::new(&db_path, root.path().to_path_buf()).unwrap(); + qm.set_quota("pvc-a", 1000).await.unwrap(); + qm.add_usage("pvc-a", 750).await.unwrap(); + qm.set_quota("pvc-b", 2000).await.unwrap(); + } + + // Reopen: entries should be loaded from disk. + let qm = QuotaManager::new(&db_path, root.path().to_path_buf()).unwrap(); + assert_eq!(qm.get_quota_info("pvc-a").await, Some((1000, 750))); + assert_eq!(qm.get_quota_info("pvc-b").await, Some((2000, 0))); + } + + #[tokio::test] + async fn test_resolve_quota_dir_first_level() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let root_path = root.path().canonicalize().unwrap(); + let qm = QuotaManager::new(tmp.path().join("quota.db"), root_path.clone()).unwrap(); + + assert_eq!( + qm.resolve_quota_dir(&root_path.join("pvc-a")), + Some("pvc-a".to_string()) + ); + assert_eq!( + qm.resolve_quota_dir(&root_path.join("pvc-a/file.txt")), + Some("pvc-a".to_string()) + ); + assert_eq!( + qm.resolve_quota_dir(&root_path.join("pvc-a/sub/deep/file")), + Some("pvc-a".to_string()) + ); + } + + #[tokio::test] + async fn test_resolve_quota_dir_root_itself_returns_none() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let root_path = root.path().canonicalize().unwrap(); + let qm = QuotaManager::new(tmp.path().join("quota.db"), root_path.clone()).unwrap(); + + assert_eq!(qm.resolve_quota_dir(&root_path), None); + } + + #[tokio::test] + async fn test_resolve_quota_dir_outside_root() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let root_path = root.path().canonicalize().unwrap(); + let qm = QuotaManager::new(tmp.path().join("quota.db"), root_path).unwrap(); + + assert_eq!(qm.resolve_quota_dir(Path::new("/nowhere/else")), None); + } + + #[tokio::test] + async fn test_add_usage_on_untracked_dir_is_noop() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + qm.add_usage("untracked", 500).await.unwrap(); + assert!(qm.get_quota_info("untracked").await.is_none()); + } + + #[test] + fn test_validate_quota_dir() { + assert!(validate_quota_dir("pvc-a").is_ok()); + assert!(validate_quota_dir("with spaces").is_ok()); + + assert!(validate_quota_dir("").is_err()); + assert!(validate_quota_dir(".").is_err()); + assert!(validate_quota_dir("..").is_err()); + assert!(validate_quota_dir("a/b").is_err()); + assert!(validate_quota_dir("../escape").is_err()); + assert!(validate_quota_dir("/abs").is_err()); + assert!(validate_quota_dir("a\\b").is_err()); + assert!(validate_quota_dir("with\0nul").is_err()); + } + + #[tokio::test] + async fn test_new_skips_unsafe_stored_keys() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let db_path = tmp.path().join("quota.db"); + + // Plant a mix of safe and unsafe entries directly in redb, + // simulating a corrupted DB or a manual edit. + { + let db = Database::create(&db_path).unwrap(); + let txn = db.begin_write().unwrap(); + { + let mut table = txn.open_table(QUOTA_TABLE).unwrap(); + table.insert("pvc-good", &(1024u64, 0u64)).unwrap(); + table.insert("../escape", &(2048u64, 0u64)).unwrap(); + table.insert("a/b", &(4096u64, 0u64)).unwrap(); + table.insert("", &(8192u64, 0u64)).unwrap(); + } + txn.commit().unwrap(); + } + + let qm = QuotaManager::new(&db_path, root.path().to_path_buf()).unwrap(); + + // Only the safe key survives the load. + assert_eq!(qm.get_quota_info("pvc-good").await, Some((1024, 0))); + assert!(qm.get_quota_info("../escape").await.is_none()); + assert!(qm.get_quota_info("a/b").await.is_none()); + assert!(qm.get_quota_info("").await.is_none()); + } + + #[tokio::test] + async fn test_scan_and_reconcile_corrects_drift() { + // Reconciliation accounts in allocated bytes (st_blocks * 512) so + // it agrees with what write()/remove() charge against the quota. + // 4 KiB writes line up with the typical filesystem page size, + // making the expected value exact across tmpfs/ext4. + const BLOCK: u64 = 4096; + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let root_path = root.path().canonicalize().unwrap(); + let qm = QuotaManager::new(tmp.path().join("quota.db"), root_path.clone()).unwrap(); + + // Configure quota; tracked usage starts wrong (stale from a crash). + qm.set_quota("pvc-a", 1024 * 1024).await.unwrap(); + qm.add_usage("pvc-a", 9999).await.unwrap(); + + // Create real files under the PVC directory totaling 3 blocks. + let pvc_dir = root_path.join("pvc-a"); + std::fs::create_dir_all(&pvc_dir).unwrap(); + std::fs::write(pvc_dir.join("a.bin"), vec![0u8; BLOCK as usize]).unwrap(); + std::fs::write(pvc_dir.join("b.bin"), vec![0u8; (2 * BLOCK) as usize]).unwrap(); + + let result = qm.scan_and_reconcile("pvc-a").await.unwrap(); + assert_eq!(result, Some((9999, 3 * BLOCK))); + assert_eq!( + qm.get_quota_info("pvc-a").await, + Some((1024 * 1024, 3 * BLOCK)) + ); + } + + #[tokio::test] + async fn test_scan_and_reconcile_returns_none_for_untracked() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = QuotaManager::new( + tmp.path().join("quota.db"), + root.path().canonicalize().unwrap(), + ) + .unwrap(); + + let result = qm.scan_and_reconcile("nonexistent").await.unwrap(); + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_scan_and_reconcile_missing_directory_is_zero() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = QuotaManager::new( + tmp.path().join("quota.db"), + root.path().canonicalize().unwrap(), + ) + .unwrap(); + + qm.set_quota("pvc-missing", 1000).await.unwrap(); + qm.add_usage("pvc-missing", 500).await.unwrap(); + + let result = qm.scan_and_reconcile("pvc-missing").await.unwrap(); + assert_eq!(result, Some((500, 0))); + assert_eq!(qm.get_quota_info("pvc-missing").await, Some((1000, 0))); + } + + #[tokio::test] + async fn test_reconcile_all_processes_every_tracked_dir() { + const BLOCK: u64 = 4096; + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let root_path = root.path().canonicalize().unwrap(); + let qm = QuotaManager::new(tmp.path().join("quota.db"), root_path.clone()).unwrap(); + + qm.set_quota("pvc-a", 16 * BLOCK).await.unwrap(); + qm.set_quota("pvc-b", 16 * BLOCK).await.unwrap(); + qm.add_usage("pvc-a", 9999).await.unwrap(); + qm.add_usage("pvc-b", 9999).await.unwrap(); + + std::fs::create_dir_all(root_path.join("pvc-a")).unwrap(); + std::fs::write(root_path.join("pvc-a/f.bin"), vec![0u8; BLOCK as usize]).unwrap(); + std::fs::create_dir_all(root_path.join("pvc-b")).unwrap(); + std::fs::write( + root_path.join("pvc-b/f.bin"), + vec![0u8; (2 * BLOCK) as usize], + ) + .unwrap(); + + qm.reconcile_all().await; + + assert_eq!(qm.get_quota_info("pvc-a").await, Some((16 * BLOCK, BLOCK))); + assert_eq!( + qm.get_quota_info("pvc-b").await, + Some((16 * BLOCK, 2 * BLOCK)) + ); + } + + #[tokio::test] + async fn test_apply_bootstrap_installs_new_entries() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + let mut bootstrap = HashMap::new(); + bootstrap.insert("pvc-a".to_string(), "1MB".to_string()); + bootstrap.insert("pvc-b".to_string(), "500KB".to_string()); + + qm.apply_bootstrap(&bootstrap).await.unwrap(); + + assert_eq!(qm.get_quota_info("pvc-a").await, Some((1024 * 1024, 0))); + assert_eq!(qm.get_quota_info("pvc-b").await, Some((500 * 1024, 0))); + } + + #[tokio::test] + async fn test_apply_bootstrap_skips_existing_entries() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + // Pre-existing entry with different limit and some usage. + qm.set_quota("pvc-a", 2048).await.unwrap(); + qm.add_usage("pvc-a", 512).await.unwrap(); + + let mut bootstrap = HashMap::new(); + bootstrap.insert("pvc-a".to_string(), "1MB".to_string()); + qm.apply_bootstrap(&bootstrap).await.unwrap(); + + // Limit and usage must be preserved (bootstrap is no-op). + assert_eq!(qm.get_quota_info("pvc-a").await, Some((2048, 512))); + } + + #[tokio::test] + async fn test_apply_bootstrap_rejects_invalid_size() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + let mut bootstrap = HashMap::new(); + bootstrap.insert("pvc-a".to_string(), "garbage".to_string()); + + let err = qm.apply_bootstrap(&bootstrap).await.unwrap_err(); + assert!( + err.to_string().contains("Invalid bootstrap size"), + "got: {}", + err + ); + } + + #[tokio::test] + async fn test_apply_bootstrap_rejects_unsafe_keys() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + for bad in ["", ".", "..", "a/b", "../escape", "/abs", "a\\b"] { + let mut bootstrap = HashMap::new(); + bootstrap.insert(bad.to_string(), "1MB".to_string()); + let err = qm + .apply_bootstrap(&bootstrap) + .await + .unwrap_err() + .to_string(); + assert!( + err.contains("Invalid bootstrap quota directory"), + "key '{}' should be rejected, got: {}", + bad, + err + ); + } + } + + #[tokio::test] + async fn test_set_quota_rejects_unsafe_key() { + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let qm = make_manager(&tmp, root.path()); + + let err = qm.set_quota("../escape", 1024).await.unwrap_err(); + assert!( + err.to_string().contains("must be a single path component"), + "got: {}", + err + ); + assert!(qm.get_quota_info("../escape").await.is_none()); + } + + #[tokio::test] + async fn test_reconciled_values_survive_reopen() { + const BLOCK: u64 = 4096; + let tmp = TempDir::new().unwrap(); + let root = TempDir::new().unwrap(); + let root_path = root.path().canonicalize().unwrap(); + let db_path = tmp.path().join("quota.db"); + + { + let qm = QuotaManager::new(&db_path, root_path.clone()).unwrap(); + qm.set_quota("pvc-a", 8 * BLOCK).await.unwrap(); + qm.add_usage("pvc-a", 4_000).await.unwrap(); + + std::fs::create_dir_all(root_path.join("pvc-a")).unwrap(); + std::fs::write(root_path.join("pvc-a/f.bin"), vec![0u8; BLOCK as usize]).unwrap(); + + qm.scan_and_reconcile("pvc-a").await.unwrap(); + } + + let qm = QuotaManager::new(&db_path, root_path).unwrap(); + assert_eq!(qm.get_quota_info("pvc-a").await, Some((8 * BLOCK, BLOCK))); + } +} diff --git a/src/lib.rs b/src/lib.rs index 13ed327..3d6b3a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ compile_error!("Arctic Wolf NFS server only supports Linux"); // // This library provides the core components for building an NFSv3 server +pub mod config; pub mod fsal; pub mod mount; pub mod nfs; diff --git a/src/main.rs b/src/main.rs index 9f29861..4483b8d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,6 +101,14 @@ async fn main() -> Result<()> { println!(" NFS port: {}", config.server.nfs_port); println!(" FSAL backend: {}", config.fsal.backend); println!(" Export path: {}", config.fsal.export_path.display()); + println!( + " Quota: {}", + if config.quota.enabled { + format!("enabled (db={})", config.quota.db_path.display()) + } else { + "disabled".to_string() + } + ); println!(" Log level: {}", log_level_str); println!(); @@ -115,11 +123,36 @@ async fn main() -> Result<()> { ); } - let fsal_config = BackendConfig::local(&config.fsal.export_path); + let fsal_config = + BackendConfig::local(&config.fsal.export_path).with_quota(config.quota.clone()); let filesystem: Arc = Arc::from(fsal_config.create_filesystem()?); let root_handle = filesystem.root_handle().await; println!(" Root handle: {} bytes", root_handle.len()); + + // Apply any declarative bootstrap entries from config before taking + // traffic. The backend silently ignores bootstrap when quota is + // disabled, so gate the user-facing message on the same flag — + // otherwise operators see "applied N entries" even when nothing + // actually took effect. + if config.quota.enabled && !config.quota.bootstrap.is_empty() { + filesystem + .apply_quota_bootstrap(&config.quota.bootstrap) + .await?; + println!( + " Quota bootstrap: applied {} entries", + config.quota.bootstrap.len() + ); + } + + // Kick off a background pass that reconciles stored quota usage with + // the actual on-disk size. The server starts accepting traffic + // immediately; drift from out-of-band filesystem changes is corrected + // while requests are being served. + if config.quota.enabled { + filesystem.start_quota_reconciliation(); + println!(" Quota reconciliation: scheduled (background)"); + } println!(); // Create portmapper registry diff --git a/src/nfs/fsstat.rs b/src/nfs/fsstat.rs index f88d436..e02e366 100644 --- a/src/nfs/fsstat.rs +++ b/src/nfs/fsstat.rs @@ -51,14 +51,24 @@ pub async fn handle_fsstat( } }; - // Get filesystem statistics - // For now, use hardcoded values - in production this would query the actual filesystem - let tbytes = 1024 * 1024 * 1024 * 100u64; // 100 GB total - let fbytes = 1024 * 1024 * 1024 * 50u64; // 50 GB free - let abytes = 1024 * 1024 * 1024 * 50u64; // 50 GB available to non-root - let tfiles = 1000000u64; // 1M total inodes - let ffiles = 500000u64; // 500k free inodes - let afiles = 500000u64; // 500k available inodes to non-root + // Query filesystem statistics via the FSAL. The backend returns real + // statvfs values; a later stage folds quota-aware byte counts into this + // same path. + let stats = match filesystem.statvfs(&args.fsroot.0).await { + Ok(s) => s, + Err(e) => { + debug!("FSSTAT statvfs failed: {}", e); + let res_data = NfsMessage::create_fsstat_error_response(nfsstat3::NFS3ERR_IO)?; + return RpcMessage::create_success_reply_with_data(xid, res_data); + } + }; + + let tbytes = stats.total_bytes; + let fbytes = stats.free_bytes; + let abytes = stats.avail_bytes; + let tfiles = stats.total_files; + let ffiles = stats.free_files; + let afiles = stats.avail_files; let invarsec = 0u32; // filesystem not expected to change without client intervention debug!( diff --git a/src/nfs/mkdir.rs b/src/nfs/mkdir.rs index 4de4617..fdf917b 100644 --- a/src/nfs/mkdir.rs +++ b/src/nfs/mkdir.rs @@ -207,7 +207,7 @@ mod tests { fs::create_dir_all(&test_dir).unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_mkdir".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_mkdir".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; @@ -269,7 +269,7 @@ mod tests { fs::create_dir(test_dir.join("existingdir")).unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_mkdir_exists".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_mkdir_exists".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; diff --git a/src/nfs/readdirplus.rs b/src/nfs/readdirplus.rs index c5380e9..2c3809a 100644 --- a/src/nfs/readdirplus.rs +++ b/src/nfs/readdirplus.rs @@ -184,7 +184,7 @@ mod tests { fs::create_dir(test_dir.join("subdir")).unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_readdirplus".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_readdirplus".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; diff --git a/src/nfs/remove.rs b/src/nfs/remove.rs index 011adbf..7b1255a 100644 --- a/src/nfs/remove.rs +++ b/src/nfs/remove.rs @@ -155,7 +155,7 @@ mod tests { fs::write(&test_file, "test content").unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_remove".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_remove".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; @@ -194,7 +194,8 @@ mod tests { fs::create_dir_all(&test_dir).unwrap(); // Create filesystem (file does NOT exist) - let fs = LocalFilesystem::new("/tmp/nfs_test_remove_nonexistent".to_string()).unwrap(); + let fs = + LocalFilesystem::new("/tmp/nfs_test_remove_nonexistent".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; diff --git a/src/nfs/rename.rs b/src/nfs/rename.rs index 18347f7..09618b1 100644 --- a/src/nfs/rename.rs +++ b/src/nfs/rename.rs @@ -95,7 +95,16 @@ pub async fn handle_rename( // Determine appropriate error code let error_string = e.to_string(); - let status = if error_string.contains("not found") || error_string.contains("No such") { + let status = if error_string.contains("Quota exceeded") + || error_string.contains("Disk quota exceeded") + { + // Match both our QuotaManager error and the Linux + // EDQUOT rendering ("Disk quota exceeded") so a + // cross-quota rename hitting an OS-level quota is + // reported as NFS3ERR_DQUOT instead of falling through + // to NFS3ERR_IO. + nfsstat3::NFS3ERR_DQUOT + } else if error_string.contains("not found") || error_string.contains("No such") { nfsstat3::NFS3ERR_NOENT } else if error_string.contains("already exists") || error_string.contains("File exists") @@ -233,7 +242,7 @@ mod tests { file.write_all(b"test content").unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_rename".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_rename".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; @@ -286,7 +295,7 @@ mod tests { fs::create_dir(test_dir.join("olddir")).unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_rename_dir".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_rename_dir".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; diff --git a/src/nfs/rmdir.rs b/src/nfs/rmdir.rs index f38ac44..a5fed14 100644 --- a/src/nfs/rmdir.rs +++ b/src/nfs/rmdir.rs @@ -160,7 +160,7 @@ mod tests { fs::create_dir(&target_dir).unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_rmdir".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_rmdir".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; @@ -199,7 +199,7 @@ mod tests { fs::create_dir_all(&test_dir).unwrap(); // Create filesystem (directory does NOT exist) - let fs = LocalFilesystem::new("/tmp/nfs_test_rmdir_nonexistent".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_rmdir_nonexistent".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; @@ -237,7 +237,7 @@ mod tests { fs::write(target_dir.join("somefile.txt"), "data").unwrap(); // Create filesystem - let fs = LocalFilesystem::new("/tmp/nfs_test_rmdir_notempty".to_string()).unwrap(); + let fs = LocalFilesystem::new("/tmp/nfs_test_rmdir_notempty".to_string(), None).unwrap(); // Get root handle let root_handle = fs.root_handle().await; diff --git a/src/nfs/setattr.rs b/src/nfs/setattr.rs index 75ac573..c75f71d 100644 --- a/src/nfs/setattr.rs +++ b/src/nfs/setattr.rs @@ -61,11 +61,21 @@ pub async fn handle_setattr( if let Err(e) = filesystem.setattr_size(&args.object.0, *new_size).await { debug!("SETATTR: failed to set size: {}", e); - let error_status = if e.to_string().contains("not found") { + // Bind once and reuse for every classification arm. + let error_string = e.to_string(); + let error_status = if error_string.contains("not found") { nfsstat3::NFS3ERR_STALE - } else if e.to_string().contains("Permission denied") { + } else if error_string.contains("Permission denied") { nfsstat3::NFS3ERR_ACCES - } else if e.to_string().contains("Read-only") { + } else if error_string.contains("Quota exceeded") + || error_string.contains("Disk quota exceeded") + { + // Cover both our QuotaManager's "Quota exceeded" and the + // OS-level EDQUOT formatting "Disk quota exceeded" that + // an underlying filesystem (e.g. XFS project quotas) + // may surface during a SETATTR truncate. + nfsstat3::NFS3ERR_DQUOT + } else if error_string.contains("Read-only") { nfsstat3::NFS3ERR_ROFS } else { nfsstat3::NFS3ERR_IO @@ -81,9 +91,10 @@ pub async fn handle_setattr( if let Err(e) = filesystem.setattr_mode(&args.object.0, *mode).await { debug!("SETATTR: failed to set mode: {}", e); - let error_status = if e.to_string().contains("not found") { + let error_string = e.to_string(); + let error_status = if error_string.contains("not found") { nfsstat3::NFS3ERR_STALE - } else if e.to_string().contains("Permission denied") { + } else if error_string.contains("Permission denied") { nfsstat3::NFS3ERR_ACCES } else { nfsstat3::NFS3ERR_IO @@ -108,9 +119,10 @@ pub async fn handle_setattr( if let Err(e) = filesystem.setattr_owner(&args.object.0, uid, gid).await { debug!("SETATTR: failed to set owner: {}", e); - let error_status = if e.to_string().contains("not found") { + let error_string = e.to_string(); + let error_status = if error_string.contains("not found") { nfsstat3::NFS3ERR_STALE - } else if e.to_string().contains("Permission denied") { + } else if error_string.contains("Permission denied") { nfsstat3::NFS3ERR_ACCES } else { nfsstat3::NFS3ERR_IO diff --git a/src/nfs/write.rs b/src/nfs/write.rs index 65437bc..14bd061 100644 --- a/src/nfs/write.rs +++ b/src/nfs/write.rs @@ -50,22 +50,33 @@ pub async fn handle_write( Ok(count) => count, Err(e) => { debug!("WRITE failed: {}", e); + // Bind the error string once: it is otherwise reformatted on + // every `contains` arm and the cost adds up on the hot path. + let error_string = e.to_string(); // Return appropriate NFS error - let error_status = if e.to_string().contains("not found") - || e.to_string().contains("Invalid handle") - { - nfsstat3::NFS3ERR_STALE - } else if e.to_string().contains("Not a file") { - nfsstat3::NFS3ERR_ISDIR - } else if e.to_string().contains("Permission denied") { - nfsstat3::NFS3ERR_ACCES - } else if e.to_string().contains("No space") { - nfsstat3::NFS3ERR_NOSPC - } else if e.to_string().contains("Read-only") { - nfsstat3::NFS3ERR_ROFS - } else { - nfsstat3::NFS3ERR_IO - }; + let error_status = + if error_string.contains("not found") || error_string.contains("Invalid handle") { + nfsstat3::NFS3ERR_STALE + } else if error_string.contains("Not a file") { + nfsstat3::NFS3ERR_ISDIR + } else if error_string.contains("Permission denied") { + nfsstat3::NFS3ERR_ACCES + } else if error_string.contains("Quota exceeded") + || error_string.contains("Disk quota exceeded") + { + // Two cases: our own QuotaManager prefixes errors with + // "Quota exceeded ...", and Linux's EDQUOT (which the + // underlying filesystem may return when an OS-level + // project/user quota is in effect) renders as + // "Disk quota exceeded". + nfsstat3::NFS3ERR_DQUOT + } else if error_string.contains("No space") { + nfsstat3::NFS3ERR_NOSPC + } else if error_string.contains("Read-only") { + nfsstat3::NFS3ERR_ROFS + } else { + nfsstat3::NFS3ERR_IO + }; let res_data = NfsMessage::create_write_error_response(error_status)?; return RpcMessage::create_success_reply_with_data(xid, res_data); diff --git a/tests/test_nfs_quota.sh b/tests/test_nfs_quota.sh new file mode 100755 index 0000000..897ecf3 --- /dev/null +++ b/tests/test_nfs_quota.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash +# NFS folder quota smoke test. +# +# Exercises the quota feature end-to-end using a real kernel NFS client: +# 1. Write under the limit -> succeeds +# 2. Write exceeding limit -> kernel reports EDQUOT (Disk quota exceeded) +# 3. Remove the file -> frees quota; subsequent write succeeds +# 4. Truncate down -> frees quota; subsequent write succeeds +# 5. Statfs (`df`) -> reports the quota limit, not the real FS +# +# Requires the server to be running with quota enabled and a bootstrap +# entry for the PVC directory. The CI workflow configures this; local +# developers can run it after preparing a matching config.toml. + +set -euo pipefail + +MOUNT_POINT="${MOUNT_POINT:-/mnt/nfs}" +PVC_NAME="${PVC_NAME:-pvc-quota-test}" +# The bootstrap entry must match this value. 1 MiB keeps the test fast. +QUOTA_BYTES="${QUOTA_BYTES:-1048576}" + +PVC_DIR="${MOUNT_POINT}/${PVC_NAME}" +# Per-run scratch file for capturing dd's stderr; mktemp avoids collisions +# when multiple invocations run in parallel (e.g. local + CI on the same +# host) which a fixed path like /tmp/dd_err cannot. +DD_ERR=$(mktemp -t arcticwolf-quota-dd-err.XXXXXX) + +log() { echo "== $*" ; } +fail() { echo "FAIL: $*" >&2 ; exit 1 ; } + +cleanup() { + # Leave the directory in a clean state between runs so CI can replay, + # and remove the per-run dd stderr scratch file. + rm -rf "${PVC_DIR}"/* 2>/dev/null || true + rm -f "${DD_ERR}" 2>/dev/null || true +} +trap cleanup EXIT + +log "Mount point: ${MOUNT_POINT}" +log "PVC dir: ${PVC_DIR}" +log "Quota: ${QUOTA_BYTES} bytes" + +mkdir -p "${PVC_DIR}" + +log "Test 1: write just under the limit should succeed" +# 512 KiB write — well under the 1 MiB limit. +dd if=/dev/zero of="${PVC_DIR}/small.bin" bs=1024 count=512 status=none \ + || fail "under-limit write unexpectedly failed" + +log "Test 2: write past the limit should be rejected with EDQUOT" +# 2 MiB write — larger than the 1 MiB limit. +set +e +dd if=/dev/zero of="${PVC_DIR}/huge.bin" bs=1024 count=2048 status=none 2> "${DD_ERR}" +rc=$? +set -e +if [[ ${rc} -eq 0 ]]; then + fail "over-limit write unexpectedly succeeded" +fi +if ! grep -qi "disk quota exceeded\|quota exceeded\|EDQUOT" "${DD_ERR}"; then + cat "${DD_ERR}" >&2 + fail "over-limit write returned an error but not EDQUOT" +fi +log " -> write rejected as expected: $(head -1 "${DD_ERR}")" +# The partial file that was created before the failure should be cleaned up +# so subsequent tests start from a known baseline. +rm -f "${PVC_DIR}/huge.bin" + +log "Test 3: remove then re-write should succeed" +# REMOVE waits for the redb commit before returning, so quota is already +# freed by the time the unlink RPC completes — no settling delay needed. +rm -f "${PVC_DIR}/small.bin" +dd if=/dev/zero of="${PVC_DIR}/small.bin" bs=1024 count=512 status=none \ + || fail "write after remove unexpectedly failed" + +log "Test 4: truncate down then re-write should succeed" +# Truncate the file down to 0, which should free all of its quota. +# SETATTR also persists synchronously, so no sleep is needed here either. +: > "${PVC_DIR}/small.bin" +# 900 KiB write — would exceed the limit if the truncate didn't release. +dd if=/dev/zero of="${PVC_DIR}/small.bin" bs=1024 count=900 status=none \ + || fail "write after truncate unexpectedly failed" + +log "Test 5: df should report the quota as the filesystem size" +df_out=$(df -B1 --output=size,used,avail "${PVC_DIR}" | tail -1) +total=$(awk '{print $1}' <<<"${df_out}") +echo " df total=${total} expected=${QUOTA_BYTES}" +if [[ "${total}" != "${QUOTA_BYTES}" ]]; then + fail "df total-bytes mismatch: got ${total}, expected ${QUOTA_BYTES}" +fi + +log "All quota tests passed."