Skip to content

Commit b8bf6c7

Browse files
danlaineclabby
andauthored
[storage/qmdb] Optimize Db::apply_batch snapshot merge (#3730)
Co-authored-by: clabby <ben@clab.by>
1 parent 0be9cae commit b8bf6c7

3 files changed

Lines changed: 516 additions & 64 deletions

File tree

storage/src/qmdb/any/batch.rs

Lines changed: 273 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121
},
2222
Context,
2323
};
24-
use ahash::{AHashMap, AHashSet};
24+
use ahash::AHashSet;
2525
use commonware_codec::Codec;
2626
use commonware_cryptography::{Digest, Hasher};
2727
use commonware_parallel::{Sequential, Strategy};
@@ -253,7 +253,7 @@ pub struct MerkleizedBatch<
253253

254254
/// Each ancestor's `total_size` (operation count after that ancestor).
255255
/// 1:1 with `ancestor_diffs`: `ancestor_diff_ends[i]` is the boundary for
256-
/// `ancestor_diffs[i]`. A batch is committed when `ancestor_diff_ends[i] <= db_size`.
256+
/// `ancestor_diffs[i]`. A batch is applied when `ancestor_diff_ends[i] <= db_size`.
257257
pub(crate) ancestor_diff_ends: Vec<u64>,
258258
}
259259

@@ -324,6 +324,83 @@ fn apply_diff<F: Family, V, I: UnorderedIndex<Value = Location<F>>, const N: usi
324324
}
325325
}
326326

327+
/// k-way sorted merge over diff slices in priority order. On equal keys, the lowest-indexed
328+
/// stream wins and all tied cursors are advanced. Each input slice must be sorted by key.
329+
struct DiffMerge<'a, K, F: Family, V> {
330+
cursors: Vec<(&'a DiffSlice<K, F, V>, usize)>,
331+
}
332+
333+
impl<'a, K: Ord, F: Family, V> DiffMerge<'a, K, F, V> {
334+
fn new(streams: impl IntoIterator<Item = &'a DiffSlice<K, F, V>>) -> Self {
335+
Self {
336+
cursors: streams.into_iter().map(|s| (s, 0)).collect(),
337+
}
338+
}
339+
}
340+
341+
impl<'a, K: Ord, F: Family, V> Iterator for DiffMerge<'a, K, F, V> {
342+
type Item = (&'a K, &'a DiffEntry<F, V>);
343+
344+
fn next(&mut self) -> Option<Self::Item> {
345+
let n = self.cursors.len();
346+
let mut winner: Option<usize> = None;
347+
for level in 0..n {
348+
let (slice, pos) = self.cursors[level];
349+
let Some((k, _)) = slice.get(pos) else {
350+
continue;
351+
};
352+
let better = match winner {
353+
None => true,
354+
Some(w) => {
355+
let (ws, wpos) = self.cursors[w];
356+
*k < ws[wpos].0
357+
}
358+
};
359+
if better {
360+
winner = Some(level);
361+
}
362+
}
363+
let level = winner?;
364+
let (slice, pos) = self.cursors[level];
365+
for inner in 0..n {
366+
let (s, p) = self.cursors[inner];
367+
if s.get(p).is_some_and(|(k, _)| *k == slice[pos].0) {
368+
self.cursors[inner].1 += 1;
369+
}
370+
}
371+
Some((&slice[pos].0, &slice[pos].1))
372+
}
373+
}
374+
375+
/// Resolves a key's `base_old_loc` by walking parallel cursors over already-applied
376+
/// ancestor diffs (parent-first). Lookups must be issued in ascending key order because
377+
/// cursors only advance forward. Returns `Some(Some(loc))` for an active entry,
378+
/// `Some(None)` for a deletion, and `None` when no already-applied ancestor touched the
379+
/// key.
380+
struct AppliedAncestorResolver<'a, K, F: Family, V> {
381+
cursors: Vec<(&'a DiffSlice<K, F, V>, usize)>,
382+
}
383+
384+
impl<'a, K: Ord, F: Family, V> AppliedAncestorResolver<'a, K, F, V> {
385+
fn new(applied: impl IntoIterator<Item = &'a DiffSlice<K, F, V>>) -> Self {
386+
Self {
387+
cursors: applied.into_iter().map(|s| (s, 0)).collect(),
388+
}
389+
}
390+
391+
fn lookup(&mut self, key: &K) -> Option<Option<Location<F>>> {
392+
for (slice, idx) in self.cursors.iter_mut() {
393+
while *idx < slice.len() && slice[*idx].0 < *key {
394+
*idx += 1;
395+
}
396+
if *idx < slice.len() && slice[*idx].0 == *key {
397+
return Some(slice[*idx].1.loc());
398+
}
399+
}
400+
None
401+
}
402+
}
403+
327404
/// Return the next floor-raise candidate in `[floor, tip)`.
328405
///
329406
/// The committed prefix is indexed by `bitmap`, so unset bits can be skipped without reading
@@ -1600,8 +1677,8 @@ where
16001677
let _timer = self.metrics.operations.apply_batch_timer();
16011678
self.metrics.operations.apply_batch_calls.inc();
16021679
let db_size = *self.last_commit_loc + 1;
1603-
// Valid db_size values: batch.db_size (nothing committed), batch.base_size
1604-
// (all ancestors committed), or any ancestor_diff_ends[i] (partial commit).
1680+
// Valid db_size values: batch.db_size (nothing applied), batch.base_size
1681+
// (all ancestors applied), or any ancestor_diff_ends[i] (partial apply).
16051682
let valid = db_size == batch.db_size
16061683
|| db_size == batch.base_size
16071684
|| batch.ancestor_diff_ends.contains(&db_size);
@@ -1617,73 +1694,42 @@ where
16171694
// Apply journal (handles its own partial ancestor skipping).
16181695
self.log.apply_batch(&batch.journal_batch).await?;
16191696

1620-
// Pre-size the two hash collections used below in one pass over ancestors. Each
1621-
// ancestor's diff lands in exactly one of:
1622-
// - `committed_locs`, if the ancestor's ops are already in the DB (`end <= db_size`)
1623-
// - the `seen` set, if the ancestor's ops still need to be applied
1624-
let mut committed_diff_total = 0usize;
1625-
let mut uncommitted_diff_total = 0usize;
1626-
for (ancestor_diff, &ancestor_end) in
1627-
batch.ancestor_diffs.iter().zip(&batch.ancestor_diff_ends)
1628-
{
1629-
if ancestor_end <= db_size {
1630-
committed_diff_total += ancestor_diff.len();
1631-
} else {
1632-
uncommitted_diff_total += ancestor_diff.len();
1633-
}
1634-
}
1635-
1636-
// Build committed_locs: for each key in a committed ancestor batch, record the nearest
1637-
// (to child) committed ancestor's final state. Some(loc) = Active at loc, None =
1638-
// Deleted. It's safe to use a hashmap here since we don't rely on iteration order.
1639-
let mut committed_locs: AHashMap<&U::Key, Option<Location<F>>> =
1640-
AHashMap::with_capacity(committed_diff_total);
1641-
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1642-
if batch.ancestor_diff_ends[i] <= db_size {
1643-
for (key, entry) in ancestor_diff.iter() {
1644-
// parent-first order: .or_insert keeps the nearest committed.
1645-
committed_locs.entry(key).or_insert(entry.loc());
1646-
}
1647-
}
1648-
}
1649-
1650-
// Apply diffs to snapshot and bitmap under one write lock (sync, no await).
1697+
// Scoped so the bitmap guard drops before later `.await`s (guard is `!Send`).
16511698
{
16521699
let mut bitmap = self.bitmap.write();
16531700
bitmap.extend_to(*batch.new_last_commit_loc + 1);
16541701

1655-
// Apply child's diff (child wins via seen set). Safe to use an AHashSet here since
1656-
// we don't rely on iteration order.
1657-
let mut seen =
1658-
AHashSet::<&U::Key>::with_capacity(batch.diff.len() + uncommitted_diff_total);
1659-
for (key, entry) in batch.diff.iter() {
1660-
seen.insert(key);
1661-
let base_old_loc = committed_locs
1662-
.get(key)
1663-
.copied()
1664-
.unwrap_or_else(|| entry.base_old_loc());
1665-
apply_diff(&mut self.snapshot, &mut bitmap, key, entry, base_old_loc);
1666-
}
1667-
1668-
// Apply uncommitted ancestor diffs (skip committed batches, skip seen keys).
1669-
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1670-
if batch.ancestor_diff_ends[i] <= db_size {
1671-
continue;
1702+
if batch.ancestor_diffs.is_empty() {
1703+
// Fast path: no ancestors to merge, no fixups to look up.
1704+
for (key, entry) in batch.diff.iter() {
1705+
apply_diff(
1706+
&mut self.snapshot,
1707+
&mut bitmap,
1708+
key,
1709+
entry,
1710+
entry.base_old_loc(),
1711+
);
16721712
}
1673-
for (key, entry) in ancestor_diff.iter() {
1674-
if seen.insert(key) {
1675-
let base_old_loc = committed_locs
1676-
.get(key)
1677-
.copied()
1678-
.unwrap_or_else(|| entry.base_old_loc());
1679-
apply_diff(&mut self.snapshot, &mut bitmap, key, entry, base_old_loc);
1680-
} else if let Some(loc) = entry.loc() {
1681-
debug_assert!(
1682-
!bitmap.get_bit(*loc),
1683-
"farther ancestor location should remain inactive",
1684-
);
1713+
} else {
1714+
// Partition ancestor diffs into already-applied (provide `base_old_loc` fixups)
1715+
// and pending (still to be applied; merged with the child).
1716+
let mut applied = Vec::with_capacity(batch.ancestor_diffs.len());
1717+
let mut pending = Vec::with_capacity(batch.ancestor_diffs.len());
1718+
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
1719+
if batch.ancestor_diff_ends[i] <= db_size {
1720+
applied.push(ancestor_diff.as_slice());
1721+
} else {
1722+
pending.push(ancestor_diff.as_slice());
16851723
}
16861724
}
1725+
let mut resolver = AppliedAncestorResolver::new(applied);
1726+
let merge = DiffMerge::new(
1727+
iter::once(batch.diff.as_slice()).chain(pending.iter().copied()),
1728+
);
1729+
for (key, entry) in merge {
1730+
let old = resolver.lookup(key).unwrap_or_else(|| entry.base_old_loc());
1731+
apply_diff(&mut self.snapshot, &mut bitmap, key, entry, old);
1732+
}
16871733
}
16881734

16891735
// CommitFloor: bit = 1 only on the current last commit. Demote the previous and
@@ -1937,6 +1983,76 @@ mod tests {
19371983
Shared::new(bm)
19381984
}
19391985

1986+
fn active(value: u64, location: u64) -> DiffEntry<mmr::Family, u64> {
1987+
DiffEntry::Active {
1988+
value,
1989+
loc: loc(location),
1990+
base_old_loc: None,
1991+
}
1992+
}
1993+
1994+
fn deleted(base_old_loc: Option<u64>) -> DiffEntry<mmr::Family, u64> {
1995+
DiffEntry::Deleted {
1996+
base_old_loc: base_old_loc.map(loc),
1997+
}
1998+
}
1999+
2000+
#[test]
2001+
fn diff_merge_returns_sorted_newest_entries() {
2002+
let child = vec![(2, active(20, 20)), (5, active(50, 50))];
2003+
let parent = vec![
2004+
(1, active(11, 11)),
2005+
(2, active(12, 12)),
2006+
(4, deleted(Some(4))),
2007+
(7, active(17, 17)),
2008+
];
2009+
let grandparent = vec![
2010+
(2, active(102, 102)),
2011+
(3, active(103, 103)),
2012+
(4, active(104, 104)),
2013+
(6, active(106, 106)),
2014+
];
2015+
2016+
// Streams are priority ordered: child, parent, then grandparent. Equal keys should
2017+
// yield only the newest entry while preserving ascending key order for resolver lookups.
2018+
let merged: Vec<_> =
2019+
DiffMerge::new([child.as_slice(), parent.as_slice(), grandparent.as_slice()])
2020+
.map(|(key, entry)| (*key, entry.value().copied(), entry.loc()))
2021+
.collect();
2022+
2023+
assert_eq!(
2024+
merged,
2025+
vec![
2026+
(1, Some(11), Some(loc(11))),
2027+
(2, Some(20), Some(loc(20))),
2028+
(3, Some(103), Some(loc(103))),
2029+
(4, None, None),
2030+
(5, Some(50), Some(loc(50))),
2031+
(6, Some(106), Some(loc(106))),
2032+
(7, Some(17), Some(loc(17))),
2033+
]
2034+
);
2035+
}
2036+
2037+
#[test]
2038+
fn applied_ancestor_resolver_uses_nearest_touch() {
2039+
let parent = vec![(2, active(20, 20)), (5, deleted(Some(5)))];
2040+
let grandparent = vec![
2041+
(2, active(200, 200)),
2042+
(4, active(40, 40)),
2043+
(5, active(50, 50)),
2044+
];
2045+
let mut resolver =
2046+
AppliedAncestorResolver::new([parent.as_slice(), grandparent.as_slice()]);
2047+
2048+
// Lookups are issued in ascending order, as they are from DiffMerge in apply_batch.
2049+
assert_eq!(resolver.lookup(&1), None);
2050+
assert_eq!(resolver.lookup(&2), Some(Some(loc(20))));
2051+
assert_eq!(resolver.lookup(&4), Some(Some(loc(40))));
2052+
assert_eq!(resolver.lookup(&5), Some(None));
2053+
assert_eq!(resolver.lookup(&9), None);
2054+
}
2055+
19402056
#[test]
19412057
fn bitmap_scan_empty() {
19422058
let bitmap = shared_with(|_| {});
@@ -2097,6 +2213,99 @@ mod tests {
20972213
assert!(mutations.contains_key(&1));
20982214
}
20992215

2216+
#[test]
2217+
fn apply_batch_merges_committed_and_uncommitted_overlaps() {
2218+
let runner = deterministic::Runner::default();
2219+
runner.start(|context| async move {
2220+
type TestDb = UnorderedFixedDb<
2221+
mmr::Family,
2222+
deterministic::Context,
2223+
sha256::Digest,
2224+
sha256::Digest,
2225+
Sha256,
2226+
OneCap,
2227+
>;
2228+
2229+
let config = fixed_db_config::<OneCap>("mixed-ancestor-overlaps", &context);
2230+
let mut db = TestDb::init(context, config).await.unwrap();
2231+
2232+
let key_update = Sha256::hash(b"update-through-all-layers");
2233+
let key_recreate_then_delete = Sha256::hash(b"recreate-then-delete");
2234+
let key_delete_from_uncommitted = Sha256::hash(b"delete-from-uncommitted");
2235+
let key_uncommitted_create = Sha256::hash(b"uncommitted-create");
2236+
2237+
let seed = db
2238+
.new_batch()
2239+
.write(key_update, Some(Sha256::hash(b"seed-update")))
2240+
.write(
2241+
key_recreate_then_delete,
2242+
Some(Sha256::hash(b"seed-recreate")),
2243+
)
2244+
.write(
2245+
key_delete_from_uncommitted,
2246+
Some(Sha256::hash(b"seed-delete")),
2247+
)
2248+
.merkleize(&db, None)
2249+
.await
2250+
.unwrap();
2251+
db.apply_batch(seed).await.unwrap();
2252+
2253+
let applied = db
2254+
.new_batch()
2255+
.write(key_update, Some(Sha256::hash(b"committed-update")))
2256+
.write(key_recreate_then_delete, None)
2257+
.write(
2258+
key_delete_from_uncommitted,
2259+
Some(Sha256::hash(b"committed-delete-base")),
2260+
)
2261+
.merkleize(&db, None)
2262+
.await
2263+
.unwrap();
2264+
2265+
let pending = applied
2266+
.new_batch::<Sha256>()
2267+
.write(key_update, Some(Sha256::hash(b"uncommitted-update")))
2268+
.write(
2269+
key_recreate_then_delete,
2270+
Some(Sha256::hash(b"uncommitted-recreate")),
2271+
)
2272+
.write(key_delete_from_uncommitted, None)
2273+
.write(
2274+
key_uncommitted_create,
2275+
Some(Sha256::hash(b"uncommitted-create")),
2276+
)
2277+
.merkleize(&db, None)
2278+
.await
2279+
.unwrap();
2280+
2281+
let final_update = Sha256::hash(b"child-update");
2282+
let child = pending
2283+
.new_batch::<Sha256>()
2284+
.write(key_update, Some(final_update))
2285+
.write(key_recreate_then_delete, None)
2286+
.merkleize(&db, None)
2287+
.await
2288+
.unwrap();
2289+
let expected_root = child.root();
2290+
2291+
// Apply only the first ancestor. Applying the child must combine applied
2292+
// fixups from that ancestor with the still-pending parent diff.
2293+
db.apply_batch(applied).await.unwrap();
2294+
db.apply_batch(child).await.unwrap();
2295+
2296+
assert_eq!(db.root(), expected_root);
2297+
assert_eq!(db.get(&key_update).await.unwrap(), Some(final_update));
2298+
assert_eq!(db.get(&key_recreate_then_delete).await.unwrap(), None);
2299+
assert_eq!(db.get(&key_delete_from_uncommitted).await.unwrap(), None);
2300+
assert_eq!(
2301+
db.get(&key_uncommitted_create).await.unwrap(),
2302+
Some(Sha256::hash(b"uncommitted-create"))
2303+
);
2304+
2305+
db.destroy().await.unwrap();
2306+
});
2307+
}
2308+
21002309
#[test]
21012310
fn read_ops_resolves_committed_ancestor_and_current_sources() {
21022311
let runner = deterministic::Runner::default();

0 commit comments

Comments
 (0)