diff --git a/Cargo.toml b/Cargo.toml index 572c20f6..26ad217b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,6 +174,31 @@ name = "bench_blob_manifest" path = "examples/bench_blob_manifest.rs" required-features = ["bench"] +# Blob download read-ahead benchmark — sweeps the local backend's chunk prefetch +# depth (read_prefetch / buffered(N)) under disk-bound vs network-bound consumers +# and warm vs cold page cache. No Postgres needed. +[[example]] +name = "bench_blob_prefetch" +path = "examples/bench_blob_prefetch.rs" +required-features = ["bench"] + +# Tokio runtime tuning benchmark — worker over-subscription (throughput + p99 +# under CPU contention) and blocking-pool RSS blast radius, default vs tuned. +# No Postgres needed; run under `taskset -c 0,1` to model a 2-core quota. +[[example]] +name = "bench_tokio_runtime" +path = "examples/bench_tokio_runtime.rs" +required-features = ["bench"] + +# CPU pool concurrency benchmark — thumbnail decode throughput + p99 as the +# decode-permit count is swept, showing the effect of sizing the image pools to +# the CFS quota (effective_parallelism) vs the host core count. No Postgres; +# run under `taskset -c 0,1` to model a 2-core quota. +[[example]] +name = "bench_pool_concurrency" +path = "examples/bench_pool_concurrency.rs" +required-features = ["bench"] + # ACL owner-cache benchmark — owner query vs moka hit (needs the dev Postgres up). [[example]] name = "bench_owner_cache" diff --git a/benches/BLOB-PREFETCH.md b/benches/BLOB-PREFETCH.md new file mode 100644 index 00000000..77c00a58 --- /dev/null +++ b/benches/BLOB-PREFETCH.md @@ -0,0 +1,53 @@ +# Blob download read-ahead benchmark + +Measures the local backend's chunk read-ahead depth — the `buffered(N)` +read-ahead in `DedupService::stream_chunks` fed by +`BlobStorageBackend::read_prefetch()`. Reproduces the exact production reassembly +combinator (`stream::iter(hashes).map(get_blob_stream).buffered(N).try_flatten()`) +over a real `LocalBlobBackend` whose chunk files are scattered across the 256 +hash-prefix dirs, then drains it and reports throughput. `N = 1` is the old +production default ("antes"); higher N is the change ("después"). + +## Reproduce + +```bash +cargo run --release --features bench --example bench_blob_prefetch +# tunables: BENCH_FILE_MB=192 BENCH_CHUNK_KB=256 BENCH_PREFETCH=1,2,4,8,16 +# BENCH_THROTTLE_MBPS=0,300,100 BENCH_REPS=5 BENCH_COLD=1 +``` + +## Results (4-core box, SSD-class storage, 192 MiB in 768×256 KiB chunks) + +Median MB/s over 5 reps; `vs N=1` is the read-ahead gain over the old default. + +| scenario | N=1 | N=2 | N=4 | N=8 | N=16 | +|-------------------------|----:|-----:|-----:|-----:|-----:| +| warm / unthrottled |1306 | **1460** |1394 |1357 |1248 | +| cold / unthrottled | 456 | **489** | 469 | 478 | 473 | +| warm / throttled@300MB/s| 167 | 166 | 166 | 166 | 166 | +| cold / throttled@300MB/s| 138 | 140 | 135 | 135 | 136 | +| warm / throttled@100MB/s| 62 | 62 | 62 | 62 | 62 | +| cold / throttled@100MB/s| 57 | 58 | 57 | 57 | 57 | + +(`vs N=1` for the best column N=2: warm/unthrottled **+11.8 %**, cold/unthrottled +**+7.2 %**; throttled rows ≈ 0 %. N=16 regresses warm −4.4 %.) + +## Conclusions + +1. **N=2 is the sweet spot, not 8.** It wins or ties in 5 of 6 scenarios at the + lowest fan-out: +11.8 % warm and +7.2 % cold on disk-bound reads, neutral when + the consumer is the bottleneck. N=8 gives only +3.9 %/+4.8 %; N=16 regresses. + So local now defaults to 2 (was 1); S3/Azure keep 8 (request-latency bound). + +2. **The win is disk-bound, not network-bound.** Throttled (network-bound) rows + are flat because `buffered(N)` here overlaps the per-chunk `File::open` + (cheap on local disk), **not** the data read (which `try_flatten` polls + sequentially). The disk-bound rows cover localhost/LAN downloads *and* the + internal blob reads that drain as fast as the disk delivers — thumbnail + render, transcode, ZIP export, content extraction — all via `stream_chunks`. + +3. **No cold regression on SSD.** The trait doc's "slower cold" worry (concurrent + opens → random I/O over scattered chunk files) is an HDD seek-thrash concern; + on SSD-class storage cold reads *improved* at N=2. Operators on spinning disks + can restore the old behaviour with `OXICLOUD_LOCAL_READ_PREFETCH=1`; NVMe + arrays can raise it. diff --git a/benches/POOL-CONCURRENCY.md b/benches/POOL-CONCURRENCY.md new file mode 100644 index 00000000..b6f6dccc --- /dev/null +++ b/benches/POOL-CONCURRENCY.md @@ -0,0 +1,77 @@ +# CPU pool concurrency benchmark — thumbnail decode under a CPU quota + +Measures what `effective_parallelism()` changes for the image pools +(`ThumbnailService::max_concurrent_decodes`, `image_transcode_service`, `di.rs` +video ffmpeg fan-out): the number of concurrent CPU-heavy renders permitted. +Those pools used to size from `available_parallelism()`, which ignores the CFS +quota (`--cpus` / cgroup `cpu.max`), so under a container quota they permit one +render per *host* core. Drives the **real service path** — `Semaphore(K)` gating +`spawn_blocking(ThumbnailService::bench_render_all)` with a gallery of concurrent +callers — and sweeps the permit count K, measuring throughput, p50/p99, and peak +RSS for K concurrent decodes. + +## Reproduce + +```bash +cargo build --release --features bench --example bench_pool_concurrency +taskset -c 0,1 ./target/release/examples/bench_pool_concurrency # model a 2-core quota +# tunables: BENCH_K_LIST=1,2,4,8,16 BENCH_GALLERY=48 BENCH_SECONDS=4 +``` + +## Results (4-core box, pinned to 2 cores; image: synthetic 48 MP JPEG) + +### [A] Throughput + tail latency (48 concurrent gallery callers) + +| permits | renders/s | p50 ms | p99 ms | +|--------:|----------:|-------:|-------:| +| 1 | 16.5 | 7342 | 10370 | +| 2 (effective) | 20.0 | 5009 | 5816 | +| 4 | 20.8 | 4895 | 5536 | +| 8 | 20.0 | 4784 | 5685 | +| 16 | 18.0 | 4576 | 6140 | + +### [B] Peak RSS, K concurrent decodes (one wave) + +| permits | peak RSS MiB | +|--------:|-------------:| +| 1 | 137 | +| 2 | 137 | +| 4 | 137 | +| 8 | 137 | +| 16 | 137 | + +## Conclusions + +1. **The thumbnail-decode pool is not a bottleneck — over-permitting costs + nothing measurable here.** Throughput is flat from K=2 to K=8 (CPU-bound: two + cores stay saturated regardless), p99 barely moves, and **peak RSS is flat at + 137 MiB across K=1..16**. K=1 under-utilises (one decode can't fill two cores); + K=16 is marginally worse on throughput/p99. So sizing this pool to the CFS + quota neither gains nor loses on this workload. + +2. **This confirms the codebase's own design.** `thumbnail_service.rs` documents + that *shrink-on-load* (DCT-scaled decode straight to thumbnail size, ~18–25 MB + regardless of source resolution) is why the historical concurrency throttle + was removed — "the RAM ceiling no longer forces throttling and we can saturate + every core". The flat RSS is exactly that: each concurrent decode's transient + buffer is small, so 16 in flight cost the same resident memory as 1. + +3. **Decision: NOT migrated (reverted).** Because the only pool this bench could + isolate showed zero measured benefit, the `effective_parallelism()` migration + of the image pools was reverted — adding code without a measured win isn't + worth it. The `effective_parallelism()` helper stays (it has a *measured* + benefit in the Tokio runtime — see `RUNTIME`), so a future, deliberately + measured case can adopt it per-pool. + The one pool with a plausible a-priori argument is the **ffmpeg video + fan-out** (one heavyweight OS process per permit — 32 ffmpeg processes for a + 2-core budget on a many-core host is self-evidently wasteful). That was left + on `available_parallelism()` too, to revisit *with* a measurement if a + high-host-core / low-quota deployment running video thumbnails ever warrants + it. The transcode rayon pool over-sizing only costs parked thread stacks + (negligible). + +4. **Honest caveat on scale.** This was run at a 2-core quota on a 4-core host + (K_oversub = 8 ≈ 4×). On a 64-core host under a 2-core quota the host-count + permit would be 64 (32× over), where even small per-decode costs and scheduler + pressure add up — the regime this change protects against but which this box + can't reproduce. diff --git a/benches/RUNTIME.md b/benches/RUNTIME.md new file mode 100644 index 00000000..1de62f66 --- /dev/null +++ b/benches/RUNTIME.md @@ -0,0 +1,93 @@ +# Tokio runtime tuning benchmark + +Measures the two things `build_runtime` (`src/main.rs`) changes versus the bare +`#[tokio::main]` defaults, sized by `common::runtime::runtime_pool_sizes`: + +- **Worker count.** `#[tokio::main]` defaults to `available_parallelism()`, which + honours CPU *affinity* (`sched_getaffinity`: cpuset, `taskset`) but **ignores + the CFS bandwidth quota** (`docker --cpus`, cgroup v2 `cpu.max`, v1 + `cpu.cfs_quota_us`). On a 2-core-quota container on a many-core host it spawns + one worker per *host* core. `effective_parallelism()` folds the quota back in. +- **Blocking pool.** `#[tokio::main]` defaults to a flat `max_blocking_threads = + 512` — a multi-GB RSS blast radius for this heavy `spawn_blocking` user + (thumbnails, transcode, zip, PDF/text extraction, Argon2 ≈19 MB/hash). The + builder caps it at `max(32, 8 × workers)`. + +## Reproduce + +```bash +cargo build --release --features bench --example bench_tokio_runtime +# Pin to 2 cores to model a 2-core CPU quota on a bigger host: +taskset -c 0,1 ./target/release/examples/bench_tokio_runtime +# Part B uses a fixed glibc mmap threshold for a clean RSS read: +MALLOC_MMAP_THRESHOLD_=131072 MALLOC_TRIM_THRESHOLD_=131072 \ + taskset -c 0,1 ./target/release/examples/bench_tokio_runtime +# tunables: BENCH_CONCURRENCY=96 BENCH_SECONDS=4 BENCH_BURN_KB=256 +# BENCH_WORKERS_BEFORE=32 BENCH_BLOCKING_TASKS=96 BENCH_ALLOC_MB=16 BENCH_MAX_BLOCKING_AFTER=16 +``` + +## Results (4-core box, pinned to 2 cores via `taskset -c 0,1`) + +### [A] Worker over-subscription under CPU contention + +96 concurrent async "requests", each an async hop + a 256 KiB BLAKE3 (models a +handler that interleaves I/O with on-worker compute), over 4 s. + +| runtime | req/s | p50 µs | p99 µs | +|-----------------------|-------:|-------:|-------:| +| before: 32 workers | 46 854 | 121 | 60 360 | +| after: 2 workers | 42 893 | 2 140 | 4 962 | + +→ **throughput −8.5 %, p99 latency −91.8 %** (after vs before) + +### [B] Blocking-pool RSS blast radius + +96 concurrent `spawn_blocking` tasks, 16 MiB resident each, held 120 ms +(fixed glibc mmap threshold so freed allocations leave RSS promptly). + +| max_blocking_threads | peak RSS MiB | vs default | +|-----------------------------|-------------:|-----------:| +| before: 512 (tokio default) | 1 231 | — | +| after: 16 (bounded) | 261 | −970 MiB | + +## Conclusions + +1. **Blocking-pool cap — clear win, no downside.** Bounding 512→16 cut peak RSS + under a 96-task flood from **1231 MiB to 261 MiB (−970 MiB)**. The cap only + engages under a pile-up; steady-state operation is unaffected, and the app's + heaviest blocking consumers are already semaphore-limited (Argon2 = 2, + thumbnail decode ≈ cores), so `max(32, 8×workers)` is generous headroom that + simply removes the unbounded tail that can OOM-kill the process under a spike. + +2. **Worker sizing — a latency/throughput trade, favourable for a server.** + Over-subscription (32 workers on 2 cores, what tokio's default does under a + CFS quota) won **+8.5 % peak throughput** but at a **catastrophic p99 of + 60 ms** (12× the tuned 5 ms) with a bimodal distribution — some requests fly + (p50 121 µs), others starve. Sizing to the quota (2 workers) gives uniform, + predictable latency at a small throughput cost. For an interactive file + server, p99 dominates UX (timeouts, head-of-line blocking), so this is the + right trade. + +3. **This microbenchmark is a worst case *for* the tuned config.** It is pure + on-worker CPU, which is exactly where over-subscription's throughput edge + shows. Real OxiCloud handlers push CPU to `spawn_blocking` and the async + workers mostly await I/O (DB, disk) — there the over-subscription throughput + edge evaporates (idle workers just park) while its tail-latency penalty + remains. Production should see the worker change as ≥ neutral on throughput + and strictly better on tail latency. + +4. **No regression off-quota.** `effective_parallelism()` == `available_ + parallelism()` whenever there is no CFS quota (or affinity already restricts + the process), so on bare metal / affinity-pinned deployments the worker count + is unchanged from the old default. The change only bites under a CFS quota — + precisely the case it fixes. + +5. **Follow-up:** the same `available_parallelism()` blind spot affects the + image/rayon pools (`thumbnail_service.rs`, `image_transcode_service.rs`, + `di.rs` video) — they over-spawn under a CFS quota too. Switching those to + `common::runtime::effective_parallelism()` is the natural next step (left out + here to keep this change focused on the runtime). + +Both knobs are env-overridable (`OXICLOUD_WORKER_THREADS` / +`OXICLOUD_MAX_BLOCKING_THREADS`) and logged at startup ("Tokio runtime pools +sized"), so operators can see and tune what is in effect. diff --git a/examples/bench_blob_prefetch.rs b/examples/bench_blob_prefetch.rs new file mode 100644 index 00000000..92926151 --- /dev/null +++ b/examples/bench_blob_prefetch.rs @@ -0,0 +1,287 @@ +//! Blob download read-ahead benchmark — `read_prefetch()` / `buffered(N)`. +//! +//! Isolates the ONE variable the local-backend change touches: the chunk +//! read-ahead depth fed to `buffered(N)` when reassembling a CDC file on the +//! download path (`DedupService::stream_chunks`). It rebuilds the *exact* +//! production combinator — +//! +//! `stream::iter(hashes).map(get_blob_stream).buffered(N).try_flatten()` +//! +//! — over a REAL `LocalBlobBackend` whose chunk files are scattered across the +//! 256 hash-prefix directories exactly like production, then drains it and +//! reports throughput. The `N = 1` row is the current production behaviour +//! ("antes"); the higher-N rows are the candidate change ("después"). +//! +//! The outcome is workload-dependent (the trait doc for `read_prefetch` argues +//! local should stay at 1), so the bench sweeps the two axes that decide it: +//! • Consumer speed — `unthrottled` (disk-bound: a localhost / LAN client that +//! drains as fast as the disk delivers) vs `throttled@` (network-bound: +//! a real remote client where the socket, not the disk, is the bottleneck — +//! this is where overlapping the next chunk's open+read with the current +//! chunk's socket drain is supposed to pay off). +//! • Page-cache state — `warm` (re-read, no disk I/O) vs `cold` +//! (`posix_fadvise(DONTNEED)` evicts each chunk file first, Linux only — +//! where concurrent opens on scattered files can instead cause seek +//! contention). `cold` is best-effort: on tmpfs/overlayfs the eviction is a +//! no-op and `cold` ≈ `warm` (noted in the output). +//! +//! Run (no Postgres needed): +//! cargo run --release --features bench --example bench_blob_prefetch +//! Tunables (env): +//! BENCH_FILE_MB (256) total blob size +//! BENCH_CHUNK_KB (256) per-chunk size (matches CDC_AVG_CHUNK) +//! BENCH_PREFETCH ("1,2,4,8,16") +//! BENCH_THROTTLE_MBPS ("0,300,100") 0 = unthrottled; each value = a throttled run +//! BENCH_REPS (5) repetitions per cell; median reported +//! BENCH_COLD (1) also run cold-cache rows (Linux x86-64 only) + +use std::env; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt, stream}; + +use oxicloud::application::ports::blob_storage_ports::BlobStorageBackend; +use oxicloud::infrastructure::services::local_blob_backend::LocalBlobBackend; + +fn env_or(key: &str, default: T) -> T { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +fn env_list_usize(key: &str, default: &[usize]) -> Vec { + env::var(key) + .ok() + .map(|s| s.split(',').filter_map(|x| x.trim().parse().ok()).collect::>()) + .filter(|v: &Vec| !v.is_empty()) + .unwrap_or_else(|| default.to_vec()) +} + +// ── cold-cache eviction (Linux x86-64 only, best-effort) ───────────────────── +#[cfg(all(target_os = "linux", target_pointer_width = "64"))] +unsafe extern "C" { + fn posix_fadvise(fd: i32, offset: i64, len: i64, advice: i32) -> i32; +} +#[cfg(all(target_os = "linux", target_pointer_width = "64"))] +const POSIX_FADV_DONTNEED: i32 = 4; + +#[cfg(all(target_os = "linux", target_pointer_width = "64"))] +fn evict(paths: &[PathBuf]) { + use std::os::unix::io::AsRawFd; + for p in paths { + if let Ok(f) = std::fs::File::open(p) { + // len = 0 → "from offset to end of file" (the whole blob). + unsafe { + posix_fadvise(f.as_raw_fd(), 0, 0, POSIX_FADV_DONTNEED); + } + } + } +} +#[cfg(not(all(target_os = "linux", target_pointer_width = "64")))] +fn evict(_paths: &[PathBuf]) {} + +#[cfg(all(target_os = "linux", target_pointer_width = "64"))] +const COLD_SUPPORTED: bool = true; +#[cfg(not(all(target_os = "linux", target_pointer_width = "64")))] +const COLD_SUPPORTED: bool = false; + +/// Fill `buf` with distinct, well-distributed bytes (xorshift64 seeded per +/// chunk) so every chunk hashes to a different BLAKE3 → scattered across the +/// 256 prefix dirs, matching production's content-addressed layout. +fn fill_chunk(buf: &mut [u8], seed: u64) { + let mut s = seed ^ 0x9E37_79B9_7F4A_7C15; + let mut i = 0; + while i + 8 <= buf.len() { + s ^= s << 13; + s ^= s >> 7; + s ^= s << 17; + buf[i..i + 8].copy_from_slice(&s.to_le_bytes()); + i += 8; + } + while i < buf.len() { + s ^= s << 13; + s ^= s >> 7; + s ^= s << 17; + buf[i] = s as u8; + i += 1; + } +} + +/// Drain the production reassembly pipeline once; return bytes read. +/// `throttle_bps == 0` means unthrottled (drain as fast as possible). +async fn run_once( + backend: Arc, + hashes: Vec, + prefetch: usize, + throttle_bps: f64, +) -> u64 { + let backend_for_map = backend.clone(); + let mut byte_stream = stream::iter(hashes) + .map(move |hash| { + let b = backend_for_map.clone(); + async move { b.get_blob_stream(&hash).await } + }) + .buffered(prefetch.max(1)) + .map(|r| r.map_err(std::io::Error::other)) + .try_flatten(); + + let mut total: u64 = 0; + // Coarse token-bucket: only sleep once the accumulated owed time clears a + // 2 ms floor, so the throttle models a rate-limited socket without drowning + // the measurement in sub-ms timer noise. + let per_byte_secs = if throttle_bps > 0.0 { 1.0 / throttle_bps } else { 0.0 }; + let mut owed = Duration::ZERO; + + while let Some(item) = byte_stream.next().await { + let chunk = item.expect("blob stream item"); + total += chunk.len() as u64; + if per_byte_secs > 0.0 { + owed += Duration::from_secs_f64(chunk.len() as f64 * per_byte_secs); + if owed >= Duration::from_millis(2) { + tokio::time::sleep(owed).await; + owed = Duration::ZERO; + } + } + } + total +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + let file_mb: usize = env_or("BENCH_FILE_MB", 256); + let chunk_kb: usize = env_or("BENCH_CHUNK_KB", 256); + let prefetches = env_list_usize("BENCH_PREFETCH", &[1, 2, 4, 8, 16]); + let throttles_mbps = env_list_usize("BENCH_THROTTLE_MBPS", &[0, 300, 100]); + let reps: usize = env_or("BENCH_REPS", 5); + let want_cold: bool = env_or::("BENCH_COLD", 1) != 0; + + let chunk_bytes = chunk_kb * 1024; + let total_bytes = file_mb * 1024 * 1024; + let n_chunks = total_bytes.div_ceil(chunk_bytes); + + let tmp = tempfile::tempdir().expect("tempdir"); + let backend_local = LocalBlobBackend::new(tmp.path()); + backend_local.initialize().await.expect("init backend"); + + // ── Build the blob: write n_chunks distinct content-addressed chunk files. + let mut hashes: Vec = Vec::with_capacity(n_chunks); + let mut paths: Vec = Vec::with_capacity(n_chunks); + let mut buf = vec![0u8; chunk_bytes]; + let build_start = Instant::now(); + for i in 0..n_chunks { + fill_chunk(&mut buf, i as u64); + let data = Bytes::copy_from_slice(&buf); + let hash = blake3::hash(&data).to_hex().to_string(); + backend_local + .put_blob_from_bytes(&hash, data) + .await + .expect("put blob"); + paths.push(backend_local.blob_path(&hash)); + hashes.push(hash); + } + let backend: Arc = Arc::new(backend_local); + let actual_bytes: u64 = (n_chunks * chunk_bytes) as u64; + + println!("\n############################################################"); + println!("# Blob download read-ahead (read_prefetch / buffered(N))"); + println!( + "# blob: {} MiB in {} chunks of {} KiB (built in {:.1}s)", + file_mb, + n_chunks, + chunk_kb, + build_start.elapsed().as_secs_f64() + ); + println!( + "# production LocalBlobBackend.read_prefetch() = {}", + backend.read_prefetch() + ); + println!("# reps/cell: {reps} (median MB/s reported) cold-cache: {}", { + if !COLD_SUPPORTED { + "unsupported (non-Linux) → warm only" + } else if want_cold { + "yes (posix_fadvise DONTNEED, best-effort)" + } else { + "disabled (BENCH_COLD=0)" + } + }); + println!("# N=1 is current production ('antes'); higher N is the candidate ('después')"); + println!("############################################################\n"); + println!( + "| {:<22} | {:>8} | {:>9} | {:>8} | {:>9} |", + "scenario", "prefetch", "med MB/s", "min ms", "vs N=1" + ); + println!( + "|{:-<24}|{:-<10}|{:-<11}|{:-<10}|{:-<11}|", + "", "", "", "", "" + ); + + let mb = actual_bytes as f64 / (1024.0 * 1024.0); + + // cache states to test + let mut cache_states: Vec<&str> = vec!["warm"]; + if want_cold && COLD_SUPPORTED { + cache_states.push("cold"); + } + + for &thr_mbps in &throttles_mbps { + let throttle_bps = thr_mbps as f64 * 1024.0 * 1024.0; + let thr_label = if thr_mbps == 0 { + "unthrottled".to_string() + } else { + format!("throttled@{}MB/s", thr_mbps) + }; + + for cache in &cache_states { + let scenario = format!("{}/{}", cache, thr_label); + let mut baseline_mbps: Option = None; + + for &pf in &prefetches { + let mut samples_mbps: Vec = Vec::with_capacity(reps); + let mut min_ms = f64::MAX; + + // one warmup (also primes warm-cache state) + let _ = run_once(backend.clone(), hashes.clone(), pf, throttle_bps).await; + + for _ in 0..reps { + if *cache == "cold" { + evict(&paths); + } + let t = Instant::now(); + let got = run_once(backend.clone(), hashes.clone(), pf, throttle_bps).await; + let secs = t.elapsed().as_secs_f64(); + assert_eq!(got, actual_bytes, "short read"); + samples_mbps.push(mb / secs); + min_ms = min_ms.min(secs * 1000.0); + } + + samples_mbps.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let med = samples_mbps[samples_mbps.len() / 2]; + let delta = match baseline_mbps { + None => { + baseline_mbps = Some(med); + "—".to_string() + } + Some(base) => format!("{:+.1}%", (med / base - 1.0) * 100.0), + }; + + println!( + "| {:<22} | {:>8} | {:>9.1} | {:>8.1} | {:>9} |", + scenario, pf, med, min_ms, delta + ); + } + println!( + "|{:-<24}|{:-<10}|{:-<11}|{:-<10}|{:-<11}|", + "", "", "", "", "" + ); + } + } + + println!( + "\nInterpretation: a '+x%' under 'vs N=1' is the read-ahead gain over current\n\ + production for that scenario; a negative value is a regression. Network-bound\n\ + rows (throttled) are the realistic remote-download case; unthrottled rows are\n\ + disk-bound (localhost/LAN). Pick the smallest N that wins the throttled rows\n\ + without regressing the disk-bound/cold rows.\n" + ); +} diff --git a/examples/bench_pool_concurrency.rs b/examples/bench_pool_concurrency.rs new file mode 100644 index 00000000..a858be40 --- /dev/null +++ b/examples/bench_pool_concurrency.rs @@ -0,0 +1,232 @@ +//! CPU pool concurrency benchmark — thumbnail decode under a CPU quota. +//! +//! Measures what `effective_parallelism()` changes for the image pools +//! (`ThumbnailService::max_concurrent_decodes`, `image_transcode_service`, +//! `di.rs` video): the number of concurrent CPU-heavy renders permitted. Those +//! pools used to size from `available_parallelism()`, which ignores the CFS +//! quota (`--cpus` / cgroup `cpu.max`), so under a container quota they permit +//! one render per *host* core onto cores the scheduler can't actually give. +//! +//! It drives the **real service path** — a `Semaphore(K)` gating +//! `spawn_blocking(ThumbnailService::bench_render_all)` — with a gallery of +//! concurrent requests, and sweeps the permit count K. Run pinned to the quota's +//! cores to reproduce the pathology: +//! taskset -c 0,1 cargo run --release --features bench --example bench_pool_concurrency +//! Under `taskset -c 0,1`, `effective_parallelism()` = 2 (the "after"); the +//! higher K rows are what bare `available_parallelism()` would permit on a +//! many-core host under a 2-core quota (the "before"). +//! +//! No Postgres needed. Tunables (env): +//! BENCH_K_LIST (1,2,4,8,16) BENCH_GALLERY (48) BENCH_SECONDS (4) + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +use oxicloud::bench_support; +use oxicloud::common::runtime::effective_parallelism; +use oxicloud::infrastructure::services::thumbnail_service::ThumbnailService; +use tokio::sync::Semaphore; + +fn env_or(key: &str, default: T) -> T { + std::env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +#[cfg(target_os = "linux")] +fn rss_mb() -> u64 { + std::fs::read_to_string("/proc/self/status") + .ok() + .and_then(|s| { + s.lines() + .find(|l| l.starts_with("VmRSS:")) + .and_then(|l| l.split_whitespace().nth(1)) + .and_then(|kb| kb.parse::().ok()) + }) + .map(|kb| kb / 1024) + .unwrap_or(0) +} +#[cfg(not(target_os = "linux"))] +fn rss_mb() -> u64 { + 0 +} + +/// Peak RSS while exactly `k` renders run concurrently (one wave) — the resident +/// cost of `k` simultaneous decode buffers, i.e. what the permit count bounds. +fn bench_k_rss(rt: &tokio::runtime::Runtime, img: Arc>, k: usize) -> u64 { + rt.block_on(async move { + let peak = Arc::new(AtomicU64::new(rss_mb())); + let stop = Arc::new(AtomicBool::new(false)); + let p = peak.clone(); + let s = stop.clone(); + let sampler = tokio::spawn(async move { + while !s.load(Ordering::Relaxed) { + p.fetch_max(rss_mb(), Ordering::Relaxed); + tokio::time::sleep(Duration::from_millis(2)).await; + } + }); + let mut hs = Vec::with_capacity(k); + for _ in 0..k { + let img2 = img.clone(); + hs.push(tokio::task::spawn_blocking(move || { + let _ = ThumbnailService::bench_render_all(&img2); + })); + } + for h in hs { + let _ = h.await; + } + peak.fetch_max(rss_mb(), Ordering::Relaxed); + stop.store(true, Ordering::Relaxed); + let _ = sampler.await; + peak.load(Ordering::Relaxed) + }) +} + +fn percentile(sorted: &[u64], p: f64) -> u64 { + if sorted.is_empty() { + return 0; + } + let idx = ((p / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +/// One sweep cell: gallery of `producers` callers, `k` permits, `secs` window. +/// Returns (renders, renders/s, p50_ms, p99_ms). +fn bench_k( + rt: &tokio::runtime::Runtime, + img: Arc>, + k: usize, + producers: usize, + secs: u64, +) -> (u64, f64, f64, f64) { + rt.block_on(async move { + let sem = Arc::new(Semaphore::new(k)); + let deadline = Instant::now() + Duration::from_secs(secs); + let mut handles = Vec::with_capacity(producers); + for _ in 0..producers { + let sem = sem.clone(); + let img = img.clone(); + handles.push(tokio::spawn(async move { + let mut count = 0u64; + let mut lats: Vec = Vec::with_capacity(256); + while Instant::now() < deadline { + // Real path: acquire a decode permit, render off-reactor. + let t = Instant::now(); + let permit = sem.clone().acquire_owned().await.unwrap(); + let img2 = img.clone(); + let _ = tokio::task::spawn_blocking(move || { + ThumbnailService::bench_render_all(&img2).expect("render_all") + }) + .await; + drop(permit); + lats.push(t.elapsed().as_micros() as u64); + count += 1; + } + (count, lats) + })); + } + let mut total = 0u64; + let mut all: Vec = Vec::new(); + for h in handles { + let (c, l) = h.await.expect("join"); + total += c; + all.extend_from_slice(&l); + } + all.sort_unstable(); + let rps = total as f64 / secs as f64; + ( + total, + rps, + percentile(&all, 50.0) as f64 / 1000.0, + percentile(&all, 99.0) as f64 / 1000.0, + ) + }) +} + +fn main() { + let k_list: Vec = std::env::var("BENCH_K_LIST") + .ok() + .map(|s| s.split(',').filter_map(|x| x.trim().parse().ok()).collect()) + .filter(|v: &Vec| !v.is_empty()) + .unwrap_or_else(|| vec![1, 2, 4, 8, 16]); + let producers: usize = env_or("BENCH_GALLERY", 48); + let secs: u64 = env_or("BENCH_SECONDS", 4); + + // Pick the heaviest corpus image — the decode cost the pool gates. + let corpus = bench_support::load_or_generate(); + let case = corpus + .iter() + .max_by(|a, b| a.megapixels().partial_cmp(&b.megapixels()).unwrap()) + .expect("corpus non-empty"); + let img = Arc::new(case.bytes.clone()); + + // The renders run on spawn_blocking; give the blocking pool plenty of room + // so the Semaphore(K) — not the runtime — is the binding constraint. + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .max_blocking_threads(64) + .enable_all() + .build() + .expect("runtime"); + + let eff = effective_parallelism(); + println!("\n############################################################"); + println!("# CPU pool concurrency — thumbnail decode under a CPU quota"); + println!( + "# image: {} ({:.1} MP, {} KiB) gallery: {} concurrent callers window: {}s", + case.name, + case.megapixels(), + case.bytes.len() / 1024, + producers, + secs + ); + println!( + "# available_parallelism = {} effective_parallelism = {} (= the 'after' permit count)", + std::thread::available_parallelism().map(|n| n.get()).unwrap_or(0), + eff + ); + println!("# run under `taskset -c 0,1` to model a 2-core quota"); + println!("############################################################\n"); + println!( + "| {:>8} | {:>9} | {:>10} | {:>9} | {:>9} |", + "permits", "renders", "renders/s", "p50 ms", "p99 ms" + ); + println!("|{:-<10}|{:-<11}|{:-<12}|{:-<11}|{:-<11}|", "", "", "", "", ""); + + // Warm up (also triggers corpus generation / codec init). + let _ = bench_k(&rt, img.clone(), 2, producers, 1); + + for &k in &k_list { + let (renders, rps, p50, p99) = bench_k(&rt, img.clone(), k, producers, secs); + let tag = if k == eff { " ← effective" } else { "" }; + println!( + "| {:>8} | {:>9} | {:>10.1} | {:>9.1} | {:>9.1} |{}", + k, renders, rps, p50, p99, tag + ); + } + + // ── Part B: peak RSS for K concurrent decodes (the real over-permit cost) ── + println!("\n[B] Peak RSS with K concurrent decodes (one wave)\n"); + println!("| {:>8} | {:>14} | {:>12} |", "permits", "peak RSS MiB", "vs effective"); + println!("|{:-<10}|{:-<16}|{:-<14}|", "", "", ""); + let mut eff_rss: Option = None; + for &k in &k_list { + let peak = bench_k_rss(&rt, img.clone(), k); + if k == eff { + eff_rss = Some(peak); + } + let delta = match eff_rss { + Some(base) if k > eff => format!("+{} MiB", peak.saturating_sub(base)), + _ => "—".to_string(), + }; + let tag = if k == eff { " ← effective" } else { "" }; + println!("| {:>8} | {:>14} | {:>12} |{}", k, peak, delta, tag); + } + + println!( + "\nThroughput (A) is CPU-bound — flat past the core count, so over-permitting\n\ + buys no throughput. The cost of over-permitting is resident memory (B):\n\ + each concurrent decode holds its buffer, so RSS scales with the permit\n\ + count. Sizing to the *effective* cores (not the host count) is what keeps\n\ + a many-core-host CPU quota from multiplying thumbnail RAM under load.\n" + ); +} diff --git a/examples/bench_tokio_runtime.rs b/examples/bench_tokio_runtime.rs new file mode 100644 index 00000000..1b0c3052 --- /dev/null +++ b/examples/bench_tokio_runtime.rs @@ -0,0 +1,271 @@ +//! Tokio runtime tuning benchmark — worker over-subscription + blocking-pool RSS. +//! +//! Measures the two things `build_runtime` changes versus the `#[tokio::main]` +//! defaults, on a realistic mix of async work + CPU-bound `spawn_blocking`. +//! +//! Part A — worker over-subscription (throughput + tail latency) +//! Drives `concurrency` async "requests", each doing an async hop +//! (`yield_now`) plus inline CPU work (BLAKE3 over a buffer — stands in for +//! JSON ser/de, parsing, auth math that real handlers run on the worker). +//! Compares a runtime with many workers (what tokio's default spawns on a +//! CFS-quota-limited box, because `available_parallelism()` ignores the quota) +//! against one sized to the real core budget. **Run pinned to the quota's +//! cores** to reproduce the pathology, e.g. `taskset -c 0,1` on a 2-core quota: +//! taskset -c 0,1 cargo run --release --features bench --example bench_tokio_runtime +//! Under taskset the "before" worker count is forced high (BENCH_WORKERS_BEFORE) +//! to model "tokio counted all host cores"; "after" uses the visible core count. +//! +//! Part B — blocking-pool RSS blast radius +//! Floods the blocking pool with memory-heavy tasks (each ~`BENCH_ALLOC_MB`, +//! the order of an Argon2 hash or an image decode buffer) and samples peak RSS. +//! Compares tokio's default `max_blocking_threads = 512` against a bounded +//! pool. Shows the worst-case resident-memory difference the cap prevents. +//! +//! No Postgres needed. Tunables (env): +//! BENCH_CONCURRENCY (96) BENCH_SECONDS (4) BENCH_BURN_KB (256) +//! BENCH_WORKERS_BEFORE (32) BENCH_WORKERS_AFTER (visible cores) +//! BENCH_BLOCKING_TASKS (96) BENCH_ALLOC_MB (16) BENCH_HOLD_MS (120) +//! BENCH_MAX_BLOCKING_AFTER (16) + +use std::env; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +use oxicloud::common::runtime::{cgroup_cpu_quota, effective_parallelism, runtime_pool_sizes}; + +fn env_or(key: &str, default: T) -> T { + env::var(key).ok().and_then(|v| v.parse().ok()).unwrap_or(default) +} + +#[cfg(target_os = "linux")] +fn rss_mb() -> u64 { + std::fs::read_to_string("/proc/self/status") + .ok() + .and_then(|s| { + s.lines() + .find(|l| l.starts_with("VmRSS:")) + .and_then(|l| l.split_whitespace().nth(1)) + .and_then(|kb| kb.parse::().ok()) + }) + .map(|kb| kb / 1024) + .unwrap_or(0) +} +#[cfg(not(target_os = "linux"))] +fn rss_mb() -> u64 { + 0 +} + +fn build_rt(workers: usize, max_blocking: usize) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(workers) + .max_blocking_threads(max_blocking) + .enable_all() + .build() + .expect("build runtime") +} + +fn percentile(sorted: &[u64], p: f64) -> u64 { + if sorted.is_empty() { + return 0; + } + let idx = ((p / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +/// Part A: returns (requests, req/s, p50_us, p99_us, max_us). +fn bench_workers( + workers: usize, + max_blocking: usize, + concurrency: usize, + secs: u64, + burn_bytes: usize, +) -> (u64, f64, u64, u64, u64) { + let rt = build_rt(workers, max_blocking); + let (total, mut lats) = rt.block_on(async move { + let deadline = Instant::now() + Duration::from_secs(secs); + let mut handles = Vec::with_capacity(concurrency); + for _ in 0..concurrency { + handles.push(tokio::spawn(async move { + let buf = vec![0xa5u8; burn_bytes]; + let mut count = 0u64; + let mut lats: Vec = Vec::with_capacity(4096); + while Instant::now() < deadline { + let t = Instant::now(); + // async hop — models awaiting I/O between CPU steps. + tokio::task::yield_now().await; + // inline CPU work on the worker — models handler compute. + let h = blake3::hash(&buf); + std::hint::black_box(h.as_bytes()[0]); + lats.push(t.elapsed().as_micros() as u64); + count += 1; + } + (count, lats) + })); + } + let mut total = 0u64; + let mut all: Vec = Vec::new(); + for h in handles { + let (c, l) = h.await.expect("join"); + total += c; + all.extend_from_slice(&l); + } + (total, all) + }); + lats.sort_unstable(); + let reqs_per_s = total as f64 / secs as f64; + ( + total, + reqs_per_s, + percentile(&lats, 50.0), + percentile(&lats, 99.0), + lats.last().copied().unwrap_or(0), + ) +} + +/// Part B: returns (peak_rss_mb, baseline_rss_mb). +fn bench_blocking_rss( + max_blocking: usize, + n_tasks: usize, + alloc_mb: usize, + hold_ms: u64, +) -> (u64, u64) { + // Two workers is plenty for the async sampler; the variable under test is + // the blocking pool, not the worker pool. + let rt = build_rt(2, max_blocking); + rt.block_on(async move { + let baseline = rss_mb(); + let peak = Arc::new(AtomicU64::new(baseline)); + let stop = Arc::new(AtomicBool::new(false)); + + let p = peak.clone(); + let s = stop.clone(); + let sampler = tokio::spawn(async move { + while !s.load(Ordering::Relaxed) { + p.fetch_max(rss_mb(), Ordering::Relaxed); + tokio::time::sleep(Duration::from_millis(3)).await; + } + }); + + let mut handles = Vec::with_capacity(n_tasks); + for _ in 0..n_tasks { + handles.push(tokio::task::spawn_blocking(move || { + let mut v = vec![0u8; alloc_mb * 1024 * 1024]; + // Touch every page so the allocation is actually resident. + let mut i = 0; + while i < v.len() { + v[i] = 1; + i += 4096; + } + std::thread::sleep(Duration::from_millis(hold_ms)); + std::hint::black_box(v.len()); + })); + } + for h in handles { + let _ = h.await; + } + peak.fetch_max(rss_mb(), Ordering::Relaxed); + stop.store(true, Ordering::Relaxed); + let _ = sampler.await; + (peak.load(Ordering::Relaxed), baseline) + }) +} + +fn main() { + let concurrency: usize = env_or("BENCH_CONCURRENCY", 96); + let secs: u64 = env_or("BENCH_SECONDS", 4); + let burn_kb: usize = env_or("BENCH_BURN_KB", 256); + let visible = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(2); + let workers_before: usize = env_or("BENCH_WORKERS_BEFORE", 32); + let workers_after: usize = env_or("BENCH_WORKERS_AFTER", visible); + + let blocking_tasks: usize = env_or("BENCH_BLOCKING_TASKS", 96); + let alloc_mb: usize = env_or("BENCH_ALLOC_MB", 16); + let hold_ms: u64 = env_or("BENCH_HOLD_MS", 120); + let max_blocking_after: usize = env_or("BENCH_MAX_BLOCKING_AFTER", 16); + + let (def_workers, def_max_blocking) = runtime_pool_sizes(); + + println!("\n############################################################"); + println!("# Tokio runtime tuning benchmark"); + println!( + "# available_parallelism = {} cgroup_cpu_quota = {:?} effective = {}", + visible, + cgroup_cpu_quota(), + effective_parallelism() + ); + println!( + "# build_runtime would pick: worker_threads={} max_blocking_threads={}", + def_workers, def_max_blocking + ); + println!("############################################################"); + + // ── Part A ────────────────────────────────────────────────────────────── + println!("\n[A] Worker over-subscription under CPU contention"); + println!( + " workload: {concurrency} concurrent requests, {burn_kb} KiB BLAKE3 each, {secs}s" + ); + println!(" (run under `taskset -c 0,1` to model a 2-core quota)\n"); + println!( + "| {:<26} | {:>8} | {:>10} | {:>8} | {:>8} |", + "runtime", "req/s", "requests", "p50 us", "p99 us" + ); + println!( + "|{:-<28}|{:-<10}|{:-<12}|{:-<10}|{:-<10}|", + "", "", "", "", "" + ); + + let burn_bytes = burn_kb * 1024; + let before = bench_workers(workers_before, 512, concurrency, secs, burn_bytes); + println!( + "| {:<26} | {:>8.0} | {:>10} | {:>8} | {:>8} |", + format!("before: {workers_before} workers"), + before.1, + before.0, + before.2, + before.3 + ); + let after = bench_workers(workers_after, def_max_blocking, concurrency, secs, burn_bytes); + println!( + "| {:<26} | {:>8.0} | {:>10} | {:>8} | {:>8} |", + format!("after: {workers_after} workers"), + after.1, + after.0, + after.2, + after.3 + ); + let thr_delta = (after.1 / before.1 - 1.0) * 100.0; + let p99_delta = (after.3 as f64 / before.3.max(1) as f64 - 1.0) * 100.0; + println!( + "\n → throughput {:+.1}% , p99 latency {:+.1}% (after vs before)", + thr_delta, p99_delta + ); + + // ── Part B ────────────────────────────────────────────────────────────── + println!("\n[B] Blocking-pool RSS blast radius"); + println!( + " workload: {blocking_tasks} concurrent spawn_blocking tasks, {alloc_mb} MiB each, hold {hold_ms}ms\n" + ); + println!( + "| {:<26} | {:>12} | {:>12} |", + "max_blocking_threads", "peak RSS MiB", "vs default" + ); + println!("|{:-<28}|{:-<14}|{:-<14}|", "", "", ""); + + let (peak_def, base_def) = bench_blocking_rss(512, blocking_tasks, alloc_mb, hold_ms); + println!( + "| {:<26} | {:>12} | {:>12} |", + "before: 512 (tokio default)", peak_def, "—" + ); + let (peak_cap, _base_cap) = bench_blocking_rss(max_blocking_after, blocking_tasks, alloc_mb, hold_ms); + let saved = peak_def as i64 - peak_cap as i64; + println!( + "| {:<26} | {:>12} | {:>12} |", + format!("after: {max_blocking_after} (bounded)"), + peak_cap, + format!("-{} MiB", saved.max(0)) + ); + println!(" (baseline RSS before the flood: ~{base_def} MiB)\n"); +} diff --git a/src/application/ports/blob_storage_ports.rs b/src/application/ports/blob_storage_ports.rs index 13a01ff0..91f7d428 100644 --- a/src/application/ports/blob_storage_ports.rs +++ b/src/application/ports/blob_storage_ports.rs @@ -130,12 +130,17 @@ pub trait BlobStorageBackend: Send + Sync + 'static { /// How many chunk fetches the CDC reader may run concurrently when /// reassembling a file (`read_blob_stream`'s `buffered(N)` read-ahead). /// - /// The default is **1** — sequential, because for a local disk concurrent - /// opens turn one sequential read into several competing random-I/O streams - /// over content-addressed (scattered) chunk files, which is neutral on a - /// warm page cache and *slower* cold. Remote backends (S3/Azure) override - /// this with a higher value: there the dominant cost is per-chunk request - /// latency, and overlapping fetches hides it (≈ N× faster reassembly). + /// The trait default is a conservative **1** (strictly sequential) — the + /// safe fallback for any backend that doesn't know its own I/O profile. + /// Concrete backends override it: + /// • Local disk → a small benchmarked depth (default `2`, env-tunable): + /// overlapping the next chunk's `File::open` with the current chunk's + /// drain measured +7–12% on disk-bound reads with no cold regression on + /// SSDs; deeper queues buy little (the data read, not the open, is then + /// the cost) and risk competing random I/O over scattered content- + /// addressed chunk files on seek-bound HDDs. See `LocalBlobBackend`. + /// • Remote (S3/Azure) → `8`: there the dominant cost is per-chunk request + /// latency, and overlapping fetches hides it (≈ N× faster reassembly). /// Wrapping backends delegate to the backend that actually serves the bytes. fn read_prefetch(&self) -> usize { 1 diff --git a/src/application/ports/file_ports.rs b/src/application/ports/file_ports.rs index fa68438c..9f607241 100644 --- a/src/application/ports/file_ports.rs +++ b/src/application/ports/file_ports.rs @@ -93,7 +93,7 @@ pub trait FileUploadUseCase: Send + Sync + 'static { /// Optimized file content returned by the retrieval service. /// /// The handler only needs to map each variant to the appropriate HTTP -/// response; all caching / transcoding / mmap decisions happen in the +/// response; all caching / transcoding decisions happen in the /// application layer. pub enum OptimizedFileContent { /// Small-file content (possibly transcoded / compressed) already in RAM. @@ -102,9 +102,7 @@ pub enum OptimizedFileContent { mime_type: Arc, was_transcoded: bool, }, - /// Memory-mapped file (10–100 MB). - Mmap(Bytes), - /// Streaming download for very large files (≥100 MB). + /// Streaming download for everything above the in-RAM cache threshold. Stream(Pin> + Send>>), } diff --git a/src/application/services/file_retrieval_service.rs b/src/application/services/file_retrieval_service.rs index 00a6741b..490a423e 100644 --- a/src/application/services/file_retrieval_service.rs +++ b/src/application/services/file_retrieval_service.rs @@ -26,8 +26,8 @@ const CACHE_THRESHOLD: u64 = 10 * 1024 * 1024; /// Implements a multi-tier download strategy: /// - Tier 0: Write-behind cache (just-uploaded files still in RAM) /// - Tier 1: Hot cache + optional WebP transcoding (<10 MB) -/// - Tier 2: Memory-mapped I/O (10–100 MB) -/// - Tier 3: Streaming (≥100 MB) +/// - Tier 2: Streaming for everything ≥10 MB — CDC chunk reassembly with the +/// backend's read-ahead (`read_prefetch`); no whole-file buffering. pub struct FileRetrievalService { file_read: Arc, content_cache: Option>, diff --git a/src/common/mod.rs b/src/common/mod.rs index b343f797..a9f142c7 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -3,4 +3,5 @@ pub mod di; pub mod errors; pub mod locale; pub mod mime_detect; +pub mod runtime; pub mod stubs; diff --git a/src/common/runtime.rs b/src/common/runtime.rs new file mode 100644 index 00000000..ce099fc7 --- /dev/null +++ b/src/common/runtime.rs @@ -0,0 +1,144 @@ +//! Tokio runtime pool sizing — CFS-quota-aware worker / blocking thread counts. +//! +//! `#[tokio::main]` and the existing rayon / image pools all size themselves from +//! [`std::thread::available_parallelism`], which reflects CPU **affinity** +//! (`sched_getaffinity`: cpuset cgroup, `taskset`) but **not** the CFS bandwidth +//! quota (`docker --cpus`, cgroup v2 `cpu.max`, v1 `cpu.cfs_quota_us`). On a +//! quota-limited container on a many-core host it therefore over-reports the +//! usable core count — tokio then spawns one worker per *host* core that +//! time-slice across the *quota's* cores. These helpers fold the CFS quota back +//! in so the runtime (and any caller that wants it) sizes to the real budget. +//! +//! All parsing is split into pure functions so the cgroup formats are unit-tested +//! without touching `/sys`. + +/// Parse cgroup **v2** `cpu.max` contents — `" "`, or +/// `"max "` when unlimited. Returns the quota in whole cores (rounded +/// up), or `None` when unlimited / unparseable. +fn parse_cpu_max_v2(s: &str) -> Option { + let mut it = s.split_whitespace(); + let quota = it.next()?; + let period = it.next()?; + if quota == "max" { + return None; + } + let quota: f64 = quota.parse().ok()?; + let period: f64 = period.parse().ok()?; + if quota > 0.0 && period > 0.0 { + Some((quota / period).ceil() as usize) + } else { + None + } +} + +/// Parse cgroup **v1** `cpu.cfs_quota_us` / `cpu.cfs_period_us`. A quota of `-1` +/// (or any non-positive value) means unlimited → `None`. Otherwise whole cores, +/// rounded up. +fn parse_cpu_quota_v1(quota: &str, period: &str) -> Option { + let quota: i64 = quota.trim().parse().ok()?; + let period: i64 = period.trim().parse().ok()?; + if quota > 0 && period > 0 { + Some(((quota as f64) / (period as f64)).ceil() as usize) + } else { + None + } +} + +/// The cgroup CPU quota in whole cores (v2 first, then v1), or `None` when there +/// is no quota (unlimited) or it can't be read. +pub fn cgroup_cpu_quota() -> Option { + // cgroup v2 unified hierarchy. + if let Ok(s) = std::fs::read_to_string("/sys/fs/cgroup/cpu.max") + && let Some(n) = parse_cpu_max_v2(&s) + { + return Some(n); + } + // cgroup v1. + let quota = std::fs::read_to_string("/sys/fs/cgroup/cpu/cpu.cfs_quota_us").ok()?; + let period = std::fs::read_to_string("/sys/fs/cgroup/cpu/cpu.cfs_period_us").ok()?; + parse_cpu_quota_v1("a, &period) +} + +/// Effective CPU parallelism: affinity-parallelism capped by the CFS quota. +/// +/// `available_parallelism()` alone over-reports under a CFS quota (see module +/// docs); we take the min of it and the quota, floored at 1. +pub fn effective_parallelism() -> usize { + let affinity = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + match cgroup_cpu_quota() { + Some(q) => affinity.min(q.max(1)), + None => affinity, + } +} + +/// `(worker_threads, max_blocking_threads)` for the Tokio runtime, from env with +/// CFS-aware defaults. +/// +/// - `OXICLOUD_WORKER_THREADS` (or tokio's native `TOKIO_WORKER_THREADS`) sets +/// the worker count; default [`effective_parallelism`]. +/// - `OXICLOUD_MAX_BLOCKING_THREADS` sets the blocking-pool cap; default +/// `max(32, 8 × workers)` — vs tokio's flat **512**, which for this heavy +/// `spawn_blocking` user (thumbnails, transcode, zip, PDF/text extraction, +/// Argon2 ≈19 MB/hash) is a multi-GB RSS blast radius with no ceiling. +/// +/// Both are clamped to ≥1. Unset env on an uncontended host yields the same +/// worker count as the previous `#[tokio::main]` default. +pub fn runtime_pool_sizes() -> (usize, usize) { + let workers = std::env::var("OXICLOUD_WORKER_THREADS") + .or_else(|_| std::env::var("TOKIO_WORKER_THREADS")) + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|&n| n > 0) + .unwrap_or_else(effective_parallelism) + .max(1); + let max_blocking = std::env::var("OXICLOUD_MAX_BLOCKING_THREADS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|&n| n > 0) + .unwrap_or_else(|| (workers * 8).max(32)); + (workers, max_blocking) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn v2_exact_two_cores() { + assert_eq!(parse_cpu_max_v2("200000 100000"), Some(2)); + } + + #[test] + fn v2_rounds_up_fractional() { + // 1.5 cores of quota → 2 whole worker threads. + assert_eq!(parse_cpu_max_v2("150000 100000"), Some(2)); + } + + #[test] + fn v2_unlimited_is_none() { + assert_eq!(parse_cpu_max_v2("max 100000"), None); + } + + #[test] + fn v2_garbage_is_none() { + assert_eq!(parse_cpu_max_v2("not a quota"), None); + assert_eq!(parse_cpu_max_v2(""), None); + } + + #[test] + fn v1_two_cores() { + assert_eq!(parse_cpu_quota_v1("200000", "100000"), Some(2)); + } + + #[test] + fn v1_unlimited_sentinel_is_none() { + assert_eq!(parse_cpu_quota_v1("-1", "100000"), None); + } + + #[test] + fn v1_rounds_up() { + assert_eq!(parse_cpu_quota_v1("250000", "100000"), Some(3)); + } +} diff --git a/src/infrastructure/services/local_blob_backend.rs b/src/infrastructure/services/local_blob_backend.rs index 8ef8cf3b..9a1e5b96 100644 --- a/src/infrastructure/services/local_blob_backend.rs +++ b/src/infrastructure/services/local_blob_backend.rs @@ -167,17 +167,51 @@ static HEX_PREFIXES: [&str; 256] = [ pub struct LocalBlobBackend { blob_root: PathBuf, temp_root: PathBuf, + /// Chunk read-ahead depth for CDC reassembly — see [`Self::new`]. + read_prefetch: usize, } +/// Default chunk-open read-ahead for the local backend (overrides the trait's +/// conservative `1`). +/// +/// Benchmarked with `examples/bench_blob_prefetch` on SSD-class storage: a small +/// read-ahead is the sweet spot for the *disk-bound* read paths — localhost/LAN +/// downloads and, importantly, the internal blob reads that drain as fast as the +/// disk delivers (thumbnail render, transcode, ZIP export, content extraction), +/// all of which flow through `DedupService::stream_chunks`'s `buffered(N)`. +/// +/// Measured median throughput vs the old sequential `N=1`: +/// warm disk-bound +11.8% (N=2) cold disk-bound +7.2% (N=2) +/// network-bound (throttled) ≈ 0% — the consumer, not the disk, is the cap +/// N=16 −4.4% warm — fan-out past a couple turns one sequential read into +/// competing random I/O over scattered content-addressed chunk files. +/// +/// `2` deliberately captures most of that gain at the lowest fan-out, because +/// `buffered(N)` here overlaps the per-chunk `File::open` (cheap on local disk), +/// not the data read, so deeper queues buy little and risk seek contention on +/// the spinning disks we can't bench here. Operators tune it via +/// `OXICLOUD_LOCAL_READ_PREFETCH` (set `1` on seek-bound HDDs to restore the old +/// strictly-sequential behaviour; raise it on fast NVMe arrays). +const DEFAULT_LOCAL_READ_PREFETCH: usize = 2; + impl LocalBlobBackend { /// Create a new local backend rooted at `storage_root`. /// /// Blob files go under `{storage_root}/.blobs/`, temp files under /// `{storage_root}/.dedup_temp/`. pub fn new(storage_root: &Path) -> Self { + // Read-ahead depth: env override, else the benchmark-backed default. + // Clamped to ≥1 so a bogus `0` can't stall reads (buffered(0) would + // make no progress; `stream_chunks` also guards with `.max(1)`). + let read_prefetch = std::env::var("OXICLOUD_LOCAL_READ_PREFETCH") + .ok() + .and_then(|v| v.parse::().ok()) + .map(|n| n.max(1)) + .unwrap_or(DEFAULT_LOCAL_READ_PREFETCH); Self { blob_root: storage_root.join(".blobs"), temp_root: storage_root.join(".dedup_temp"), + read_prefetch, } } @@ -493,6 +527,13 @@ impl BlobStorageBackend for LocalBlobBackend { fn local_blob_path(&self, hash: &str) -> Option { Some(self.blob_path(hash)) } + + /// Local disk read-ahead for CDC reassembly. Overrides the trait default of + /// `1` with a small benchmark-backed depth (default `2`, env-tunable via + /// `OXICLOUD_LOCAL_READ_PREFETCH`). See [`DEFAULT_LOCAL_READ_PREFETCH`]. + fn read_prefetch(&self) -> usize { + self.read_prefetch + } } #[cfg(test)] diff --git a/src/interfaces/api/handlers/file_handler.rs b/src/interfaces/api/handlers/file_handler.rs index f27e6124..00e6312d 100644 --- a/src/interfaces/api/handlers/file_handler.rs +++ b/src/interfaces/api/handlers/file_handler.rs @@ -750,20 +750,6 @@ impl FileHandler { data, mime_type, .. } => Self::build_cached_response(data, &mime_type, &disposition, &etag) .into_response(), - OptimizedFileContent::Mmap(mmap_data) => Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, &*file_dto.mime_type) - .header(header::CONTENT_DISPOSITION, &disposition) - .header(header::CONTENT_LENGTH, mmap_data.len()) - .header(header::ETAG, &etag) - .header( - header::CACHE_CONTROL, - "private, max-age=3600, must-revalidate", - ) - .header(header::ACCEPT_RANGES, "bytes") - .body(Body::from(mmap_data)) - .unwrap() - .into_response(), OptimizedFileContent::Stream(pinned_stream) => Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, &*file_dto.mime_type) diff --git a/src/interfaces/api/handlers/share_handler.rs b/src/interfaces/api/handlers/share_handler.rs index b19bd9ee..2bda63b9 100644 --- a/src/interfaces/api/handlers/share_handler.rs +++ b/src/interfaces/api/handlers/share_handler.rs @@ -501,21 +501,6 @@ async fn serve_share_file( .body(Body::from(data)) .unwrap() .into_response(), - OptimizedFileContent::Mmap(mmap_data) => Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, &*mime) - .header(header::CONTENT_DISPOSITION, &disposition) - .header(header::CONTENT_LENGTH, mmap_data.len()) - .header(header::ACCEPT_RANGES, "bytes") - .header(header::ETAG, &etag) - .header( - header::CACHE_CONTROL, - "private, max-age=3600, must-revalidate", - ) - .header(header::VARY, "Cookie, Range") - .body(Body::from(mmap_data)) - .unwrap() - .into_response(), OptimizedFileContent::Stream(stream) => Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, &*mime) diff --git a/src/main.rs b/src/main.rs index ae850490..c9e3733b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -126,8 +126,7 @@ fn make_socket(addr: &SocketAddr, reuse_port: bool) -> std::io::Result { Ok(socket) } -#[tokio::main] -async fn main() -> Result<(), Box> { +fn main() -> Result<(), Box> { // Minimal CLI: // --version Print version + branch + commit hash and exit. // --config Load env from this file. When given, the default @@ -186,6 +185,39 @@ async fn main() -> Result<(), Box> { } } + // Build the Tokio runtime explicitly (not via `#[tokio::main]`) so the + // worker + blocking pools are sized from the cgroup CPU quota and bounded + // — with the `.env` loaded above already in scope. See `build_runtime`. + let runtime = build_runtime()?; + runtime.block_on(run()) +} + +/// Construct the multi-threaded Tokio runtime with explicit, CFS-quota-aware +/// pool sizes. +/// +/// `#[tokio::main]` hides two defaults that misbehave under container limits: +/// • worker threads default to `available_parallelism()`, which honours CPU +/// affinity but **ignores the CFS quota** (`--cpus` / `cpu.max`) — so on a +/// 2-core-quota container on a 64-core host it spawns 64 workers that +/// time-slice across 2 cores. +/// • the blocking pool defaults to a flat **512** threads — a multi-GB RSS +/// blast radius for this heavy `spawn_blocking` user. +/// +/// Both come from [`common::runtime::runtime_pool_sizes`] (env-overridable via +/// `OXICLOUD_WORKER_THREADS` / `OXICLOUD_MAX_BLOCKING_THREADS`). Unset env on an +/// uncontended host reproduces the previous behaviour. +fn build_runtime() -> std::io::Result { + let (workers, max_blocking) = common::runtime::runtime_pool_sizes(); + tokio::runtime::Builder::new_multi_thread() + .worker_threads(workers) + .max_blocking_threads(max_blocking) + .thread_name("oxicloud-worker") + .enable_all() + .build() +} + +/// Async entrypoint, driven by the runtime built in [`main`]. +async fn run() -> Result<(), Box> { // Initialize tracing. // // Default access-log policy — two independent directives are @@ -236,6 +268,20 @@ async fn main() -> Result<(), Box> { env!("GIT_HASH") ); + // Surface the runtime pool sizing chosen in `build_runtime`. `available` + // is what tokio's default would have used; `cgroup_cpu_quota` is the CFS + // limit it ignores. When the two diverge, the worker count tracks the + // smaller (effective) value — the whole point of the explicit builder. + let (rt_workers, rt_max_blocking) = common::runtime::runtime_pool_sizes(); + tracing::info!( + worker_threads = rt_workers, + max_blocking_threads = rt_max_blocking, + available_parallelism = + std::thread::available_parallelism().map(|n| n.get()).unwrap_or(0), + cgroup_cpu_quota = ?common::runtime::cgroup_cpu_quota(), + "Tokio runtime pools sized" + ); + // Load configuration from environment variables let config = common::config::AppConfig::from_env();