Skip to content

Commit f68972e

Browse files
DioCraftsclaude
andcommitted
perf(blob): one manifest query in read_blob_bytes instead of two
read_blob_bytes read the same storage.chunk_manifests PK row twice per full-blob read — blob_size (SELECT total_size) then read_blob_stream (SELECT chunk_hashes) — even though both columns live in one row. Fold them into a single `SELECT chunk_hashes, total_size` and share the chunk stream builder via a new stream_chunks helper. The legacy (no-manifest) path is unchanged. Output is identical; the read just costs one fewer DB round-trip. Benchmark (examples/bench_blob_manifest.rs, isolates the manifest lookup against the real Postgres): ~1.9x throughput and p50/p99 roughly halved on that sub-step; the win is the removed round-trip under pool pressure during upload bursts. Note this is the manifest sub-step only — end-to-end read_blob_bytes is dominated by the actual chunk reads, and it is a background path (thumbnail generation / EXIF / indexing), not normal gallery serving. Methodology + honest framing in benches/BLOB-MANIFEST.md. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 5722481 commit f68972e

4 files changed

Lines changed: 339 additions & 25 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ name = "bench_db_pool"
168168
path = "examples/bench_db_pool.rs"
169169
required-features = ["bench"]
170170

171+
# Blob-manifest read round-trip benchmark: OLD 2 queries vs NEW 1 (needs dev Postgres).
172+
[[example]]
173+
name = "bench_blob_manifest"
174+
path = "examples/bench_blob_manifest.rs"
175+
required-features = ["bench"]
176+
171177
# ACL owner-cache benchmark — owner query vs moka hit (needs the dev Postgres up).
172178
[[example]]
173179
name = "bench_owner_cache"

benches/BLOB-MANIFEST.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Blob-manifest read — halve the manifest round-trips
2+
3+
`DedupService::read_blob_bytes` (the full-blob read used by thumbnail generation,
4+
EXIF extraction, content indexing, etc.) used to read the **same**
5+
`storage.chunk_manifests` PK row **twice**:
6+
7+
- `blob_size(hash)``SELECT total_size …` (for the buffer pre-allocation), then
8+
- `read_blob_stream(hash)``SELECT chunk_hashes …` (to stream the chunks).
9+
10+
On the thumbnail cold path that's **2N manifest queries** for an N-image gallery
11+
load. The change folds both into one query and shares the chunk-stream builder:
12+
13+
```sql
14+
SELECT chunk_hashes, total_size FROM storage.chunk_manifests WHERE file_hash = $1
15+
```
16+
17+
(`dedup_service.rs``read_blob_bytes` + the extracted `stream_chunks` helper.)
18+
The legacy (no-manifest) path is unchanged.
19+
20+
## Reproduce
21+
22+
```bash
23+
cargo run --release --features bench --example bench_blob_manifest
24+
```
25+
26+
Needs the dev Postgres up (reads `DATABASE_URL` from `.env`). The bench isolates
27+
exactly what changed — the manifest lookup(s) per blob read, not the unchanged
28+
chunk streaming — running OLD (2 queries/op) vs NEW (1 query/op) against the real
29+
pool, at low contention (raw per-op cost) and high contention (concurrency > pool,
30+
where holding a connection ~2× longer inflates the tail).
31+
32+
## Results (pool=20, 4 s/run, 64-chunk manifest)
33+
34+
| contention | mode | ops/s | p50 ms | p95 ms | p99 ms |
35+
|---|---|---:|---:|---:|---:|
36+
| conc 4 (no pool pressure) | OLD (2q) | 4 646 | 0.850 | 1.028 | 1.213 |
37+
| | **NEW (1q)** | **8 931** | **0.442** | **0.546** | **0.644** |
38+
| conc 64 (> pool 20) | OLD (2q) | 7 490 | 8.538 | 9.201 | 10.117 |
39+
| | **NEW (1q)** | **14 330** | **4.396** | **5.229** | **5.869** |
40+
41+
- **~1.9× throughput** on the manifest-read step, **p50 and p99 roughly halved**.
42+
- Under pool pressure the absolute latency saved is larger (p50 8.5 → 4.4 ms),
43+
because each OLD read occupies a connection for two round-trips instead of one —
44+
exactly the tail-latency-under-contention win this targeted.
45+
46+
The end-to-end gallery-load impact is smaller than 1.9× (chunk reads and decode
47+
dominate the full `read_blob_bytes`), but this removes one DB round-trip from
48+
*every* full-blob read, which is the part that queues under load.

examples/bench_blob_manifest.rs

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
//! Blob-manifest read benchmark — `read_blob_bytes` manifest round-trips.
2+
//!
3+
//! `read_blob_bytes` used to read the same `storage.chunk_manifests` PK row
4+
//! TWICE per full-blob read: once via `blob_size` (`SELECT total_size`) and once
5+
//! via `read_blob_stream` (`SELECT chunk_hashes`). On the thumbnail cold path
6+
//! that is 2N manifest queries for an N-image gallery load. The change folds both
7+
//! into ONE query (`SELECT chunk_hashes, total_size`).
8+
//!
9+
//! This isolates exactly that change — the two manifest lookups vs the one — and
10+
//! leaves the (unchanged) chunk streaming out, so the signal is the DB
11+
//! round-trip(s) per blob read. Runs OLD (2 queries) vs NEW (1 query) against the
12+
//! real dev Postgres, at low contention (raw per-op latency) and high contention
13+
//! (concurrency > pool, where holding a connection ~2× longer inflates the tail).
14+
//!
15+
//! Run (needs the dev Postgres up; reads DATABASE_URL from .env):
16+
//! cargo run --release --features bench --example bench_blob_manifest
17+
//! Tunables (env): BENCH_POOL (20), BENCH_SECONDS (4), BENCH_CHUNKS (64),
18+
//! BENCH_CONCURRENCIES ("4,64").
19+
20+
use std::env;
21+
use std::sync::Arc;
22+
use std::time::{Duration, Instant};
23+
24+
use sqlx::PgPool;
25+
use sqlx::postgres::PgPoolOptions;
26+
27+
/// Synthetic manifest key: exactly 64 chars (the VARCHAR(64) PK width), and the
28+
/// non-hex letters ('n','s','h') guarantee it can never collide with a real
29+
/// BLAKE3 blob hash (always lowercase hex). 8 × "bench000".
30+
const FILE_HASH: &str = "bench000bench000bench000bench000bench000bench000bench000bench000";
31+
32+
fn env_or<T: std::str::FromStr>(key: &str, default: T) -> T {
33+
env::var(key)
34+
.ok()
35+
.and_then(|v| v.parse().ok())
36+
.unwrap_or(default)
37+
}
38+
39+
#[derive(Clone, Copy)]
40+
enum Mode {
41+
/// Two separate manifest lookups (the old blob_size + read_blob_stream).
42+
Old,
43+
/// One combined manifest lookup (the new read_blob_bytes).
44+
New,
45+
}
46+
47+
async fn seed(pool: &PgPool, n_chunks: usize) {
48+
let chunk_hashes: Vec<String> = (0..n_chunks).map(|i| format!("{i:064x}")).collect();
49+
let chunk_sizes: Vec<i64> = vec![65_536; n_chunks];
50+
let total: i64 = chunk_sizes.iter().sum();
51+
sqlx::query(
52+
"INSERT INTO storage.chunk_manifests
53+
(file_hash, chunk_hashes, chunk_sizes, total_size, chunk_count)
54+
VALUES ($1, $2, $3, $4, $5)
55+
ON CONFLICT (file_hash) DO UPDATE
56+
SET chunk_hashes = $2, chunk_sizes = $3, total_size = $4, chunk_count = $5",
57+
)
58+
.bind(FILE_HASH)
59+
.bind(&chunk_hashes)
60+
.bind(&chunk_sizes)
61+
.bind(total)
62+
.bind(n_chunks as i32)
63+
.execute(pool)
64+
.await
65+
.expect("seed chunk_manifests row");
66+
}
67+
68+
async fn cleanup(pool: &PgPool) {
69+
let _ = sqlx::query("DELETE FROM storage.chunk_manifests WHERE file_hash = $1")
70+
.bind(FILE_HASH)
71+
.execute(pool)
72+
.await;
73+
}
74+
75+
/// One blob "manifest read" — exactly the queries the production code issues.
76+
async fn one_op(pool: &PgPool, mode: Mode) {
77+
match mode {
78+
Mode::Old => {
79+
let _total: i64 = sqlx::query_scalar(
80+
"SELECT total_size FROM storage.chunk_manifests WHERE file_hash = $1",
81+
)
82+
.bind(FILE_HASH)
83+
.fetch_one(pool)
84+
.await
85+
.expect("old total_size query");
86+
let _chunks: Vec<String> = sqlx::query_scalar(
87+
"SELECT chunk_hashes FROM storage.chunk_manifests WHERE file_hash = $1",
88+
)
89+
.bind(FILE_HASH)
90+
.fetch_one(pool)
91+
.await
92+
.expect("old chunk_hashes query");
93+
}
94+
Mode::New => {
95+
let _row: (Vec<String>, i64) = sqlx::query_as(
96+
"SELECT chunk_hashes, total_size FROM storage.chunk_manifests WHERE file_hash = $1",
97+
)
98+
.bind(FILE_HASH)
99+
.fetch_one(pool)
100+
.await
101+
.expect("new combined query");
102+
}
103+
}
104+
}
105+
106+
struct Stats {
107+
count: usize,
108+
rps: f64,
109+
p50: f64,
110+
p95: f64,
111+
p99: f64,
112+
max: f64,
113+
}
114+
115+
fn summarize(mut lats: Vec<f64>, secs: u64) -> Stats {
116+
lats.sort_by(|a, b| a.partial_cmp(b).unwrap());
117+
let n = lats.len();
118+
let pct = |p: f64| {
119+
if n == 0 {
120+
0.0
121+
} else {
122+
lats[((n as f64 * p) as usize).min(n - 1)]
123+
}
124+
};
125+
Stats {
126+
count: n,
127+
rps: n as f64 / secs as f64,
128+
p50: pct(0.50),
129+
p95: pct(0.95),
130+
p99: pct(0.99),
131+
max: lats.last().copied().unwrap_or(0.0),
132+
}
133+
}
134+
135+
async fn run_window(pool: Arc<PgPool>, concurrency: usize, secs: u64, mode: Mode) -> Stats {
136+
let deadline = Instant::now() + Duration::from_secs(secs);
137+
let mut handles = Vec::with_capacity(concurrency);
138+
for _ in 0..concurrency {
139+
let pool = pool.clone();
140+
handles.push(tokio::spawn(async move {
141+
let mut lats = Vec::new();
142+
while Instant::now() < deadline {
143+
let t = Instant::now();
144+
one_op(&pool, mode).await;
145+
lats.push(t.elapsed().as_secs_f64() * 1000.0);
146+
}
147+
lats
148+
}));
149+
}
150+
let mut all = Vec::new();
151+
for h in handles {
152+
all.extend(h.await.unwrap());
153+
}
154+
summarize(all, secs)
155+
}
156+
157+
#[tokio::main(flavor = "multi_thread")]
158+
async fn main() {
159+
dotenvy::dotenv().ok();
160+
let url = env::var("DATABASE_URL")
161+
.or_else(|_| env::var("OXICLOUD_DB_CONNECTION_STRING"))
162+
.expect("set DATABASE_URL (or OXICLOUD_DB_CONNECTION_STRING) — the dev Postgres URL");
163+
164+
let pool_size: u32 = env_or("BENCH_POOL", 20);
165+
let secs: u64 = env_or("BENCH_SECONDS", 4);
166+
let n_chunks: usize = env_or("BENCH_CHUNKS", 64);
167+
let concurrencies: Vec<usize> = env::var("BENCH_CONCURRENCIES")
168+
.ok()
169+
.map(|s| s.split(',').filter_map(|x| x.trim().parse().ok()).collect())
170+
.unwrap_or_else(|| vec![4, 64]);
171+
172+
let pool = Arc::new(
173+
PgPoolOptions::new()
174+
.max_connections(pool_size)
175+
.min_connections(pool_size) // pre-warm: don't time connection setup
176+
.acquire_timeout(Duration::from_secs(10))
177+
.connect(&url)
178+
.await
179+
.expect("connect dev Postgres"),
180+
);
181+
182+
seed(&pool, n_chunks).await;
183+
184+
println!("\n###########################################################");
185+
println!("# read_blob_bytes manifest round-trips: OLD (2 queries) vs NEW (1)");
186+
println!("# pool={pool_size} window={secs}s/run chunks/manifest={n_chunks}");
187+
println!("# latency = acquire-wait + manifest query/queries per blob read");
188+
println!("###########################################################\n");
189+
println!(
190+
"| {:>5} | {:<4} | {:>9} | {:>9} | {:>7} | {:>7} | {:>7} | {:>7} |",
191+
"conc", "mode", "ops", "ops/s", "p50 ms", "p95 ms", "p99 ms", "max ms"
192+
);
193+
println!(
194+
"|{:-<7}|{:-<6}|{:-<11}|{:-<11}|{:-<9}|{:-<9}|{:-<9}|{:-<9}|",
195+
"", "", "", "", "", "", "", ""
196+
);
197+
198+
for &conc in &concurrencies {
199+
// Warm-up (discarded) so the first real window isn't skewed.
200+
let _ = run_window(pool.clone(), conc, 1, Mode::New).await;
201+
202+
let old = run_window(pool.clone(), conc, secs, Mode::Old).await;
203+
let new = run_window(pool.clone(), conc, secs, Mode::New).await;
204+
let row = |label: &str, s: &Stats| {
205+
println!(
206+
"| {:>5} | {:<4} | {:>9} | {:>9.0} | {:>7.3} | {:>7.3} | {:>7.3} | {:>7.3} |",
207+
conc, label, s.count, s.rps, s.p50, s.p95, s.p99, s.max
208+
);
209+
};
210+
row("OLD", &old);
211+
row("NEW", &new);
212+
let thr = if old.rps > 0.0 {
213+
new.rps / old.rps
214+
} else {
215+
0.0
216+
};
217+
let p99 = if new.p99 > 0.0 {
218+
old.p99 / new.p99
219+
} else {
220+
0.0
221+
};
222+
println!(
223+
"| | → | {:>9} | {:>7.2}× | {:>7} | {:>7} | {:>6.2}× | {:>7} |",
224+
"throughput", thr, "", "", p99, ""
225+
);
226+
}
227+
228+
cleanup(&pool).await;
229+
println!("\n(ops = blob-manifest reads completed; NEW issues 1 query/op, OLD issues 2.)");
230+
}

src/infrastructure/services/dedup_service.rs

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,33 @@ impl DedupService {
14751475

14761476
// ── Read operations ──────────────────────────────────────────
14771477

1478+
/// Build an in-order, prefetched byte stream over a CDC file's chunks.
1479+
///
1480+
/// Read-ahead depth is the backend's hint (1 for local disk, higher for
1481+
/// remote object stores where overlapping fetches hide per-chunk latency).
1482+
/// Shared by [`Self::read_blob_stream`] and [`Self::read_blob_bytes`] so both
1483+
/// build the chunk stream identically from a manifest's `chunk_hashes`.
1484+
fn stream_chunks(
1485+
&self,
1486+
chunk_hashes: Vec<String>,
1487+
) -> Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>> {
1488+
let prefetch = self.backend.read_prefetch().max(1);
1489+
let backend = self.backend.clone();
1490+
let chunk_stream = stream::iter(chunk_hashes)
1491+
.map(move |chunk_hash| {
1492+
let backend = backend.clone();
1493+
async move {
1494+
backend
1495+
.get_blob_stream(&chunk_hash)
1496+
.await
1497+
.map_err(|e| std::io::Error::other(e.to_string()))
1498+
}
1499+
})
1500+
.buffered(prefetch)
1501+
.try_flatten();
1502+
Box::pin(chunk_stream)
1503+
}
1504+
14781505
/// Stream blob content — CDC-aware with legacy fallback.
14791506
///
14801507
/// For CDC files: looks up the manifest, then streams chunks in order,
@@ -1494,29 +1521,10 @@ impl DedupService {
14941521
.await
14951522
.map_err(|e| DomainError::internal_error("Dedup", format!("Manifest lookup: {}", e)))?;
14961523

1497-
if let Some(chunk_hashes) = manifest {
1498-
// CDC file: stream chunks in order. Read-ahead depth is the
1499-
// backend's hint (1 for local disk, higher for remote object
1500-
// stores where overlapping fetches hide per-chunk latency).
1501-
let prefetch = self.backend.read_prefetch().max(1);
1502-
let backend = self.backend.clone();
1503-
let chunk_stream = stream::iter(chunk_hashes)
1504-
.map(move |chunk_hash| {
1505-
let backend = backend.clone();
1506-
async move {
1507-
backend
1508-
.get_blob_stream(&chunk_hash)
1509-
.await
1510-
.map_err(|e| std::io::Error::other(e.to_string()))
1511-
}
1512-
})
1513-
.buffered(prefetch)
1514-
.try_flatten();
1515-
1516-
Ok(Box::pin(chunk_stream))
1517-
} else {
1524+
match manifest {
1525+
Some(chunk_hashes) => Ok(self.stream_chunks(chunk_hashes)),
15181526
// Legacy whole-file blob
1519-
self.backend.get_blob_stream(hash).await
1527+
None => self.backend.get_blob_stream(hash).await,
15201528
}
15211529
}
15221530

@@ -1525,11 +1533,33 @@ impl DedupService {
15251533
/// This is intended for image-oriented workflows such as thumbnail
15261534
/// generation where the downstream library already requires the full
15271535
/// payload in memory to decode the image.
1536+
///
1537+
/// A single manifest query fetches BOTH the size hint (for the buffer
1538+
/// pre-allocation) and the chunk list — they live in the same
1539+
/// `chunk_manifests` PK row, so reading them separately (the old
1540+
/// `blob_size` + `read_blob_stream`) doubled the manifest round-trips on
1541+
/// every full-blob read (e.g. 2N queries for an N-image gallery cold load).
15281542
pub async fn read_blob_bytes(&self, hash: &str) -> Result<Bytes, DomainError> {
1529-
let expected_size = self.blob_size(hash).await? as usize;
1530-
let mut data = Vec::with_capacity(expected_size);
1531-
let mut stream = self.read_blob_stream(hash).await?;
1543+
let manifest = sqlx::query_as::<_, (Vec<String>, i64)>(
1544+
"SELECT chunk_hashes, total_size FROM storage.chunk_manifests WHERE file_hash = $1",
1545+
)
1546+
.bind(hash)
1547+
.fetch_optional(self.pool.as_ref())
1548+
.await
1549+
.map_err(|e| DomainError::internal_error("Dedup", format!("Manifest lookup: {}", e)))?;
1550+
1551+
let (mut stream, expected_size) = match manifest {
1552+
Some((chunk_hashes, total_size)) => {
1553+
(self.stream_chunks(chunk_hashes), total_size.max(0) as usize)
1554+
}
1555+
None => {
1556+
// Legacy whole-file blob: size + stream straight from the backend.
1557+
let size = self.backend.blob_size(hash).await? as usize;
1558+
(self.backend.get_blob_stream(hash).await?, size)
1559+
}
1560+
};
15321561

1562+
let mut data = Vec::with_capacity(expected_size);
15331563
while let Some(chunk) = stream.next().await {
15341564
let chunk = chunk.map_err(|e| {
15351565
DomainError::internal_error("Dedup", format!("Failed to read blob chunk: {}", e))

0 commit comments

Comments
 (0)