Skip to content

Commit 37a7d1c

Browse files
[storage] invalidate rewinded pages in page cache (#3653)
1 parent 1c8a5d5 commit 37a7d1c

2 files changed

Lines changed: 146 additions & 11 deletions

File tree

runtime/src/utils/buffer/paged/append.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -857,12 +857,6 @@ impl<B: Blob> Append<B> {
857857
return Ok(());
858858
}
859859

860-
// Implementation note: rewinding the blob across a page boundary potentially results in
861-
// stale data remaining in the page cache. We don't proactively purge the data
862-
// within this function since it would be inaccessible anyway. Instead we ensure it is
863-
// always updated should the blob grow back to the point where we have new data for the same
864-
// page, if any old data hasn't expired naturally by then.
865-
866860
let logical_page_size = self.cache_ref.page_size();
867861
let physical_page_size = logical_page_size + CHECKSUM_SIZE;
868862

@@ -889,6 +883,12 @@ impl<B: Blob> Append<B> {
889883
blob_guard.blob.resize(new_physical_size).await?;
890884
blob_guard.partial_page_state = None;
891885

886+
// Evict cached pages at or beyond the new full-page boundary. The page at `full_pages` (if
887+
// partial) is now owned by the tip buffer, and anything above is beyond the new logical
888+
// size. Leaving their pre-resize contents in the cache lets `try_read_sync` (which bypasses
889+
// the tip buffer) observe stale bytes once the tip is repopulated.
890+
self.cache_ref.invalidate_from(self.id, full_pages);
891+
892892
// Update blob state and buffer based on the desired logical size. The partial page data is
893893
// read with CRC validation; the validated length may exceed partial_bytes (reflecting the
894894
// old data length), but we only load the prefix we need. The next sync will write the
@@ -2456,6 +2456,54 @@ mod tests {
24562456
});
24572457
}
24582458

2459+
#[test]
2460+
fn test_resize_invalidates_cache() {
2461+
// Regression: shrinking a blob across a page boundary must drop cached pages for the
2462+
// truncated region. Before the fix, `try_read_sync` (which bypasses the tip buffer)
2463+
// would observe pre-resize bytes at offsets later reclaimed by new appends.
2464+
let executor = deterministic::Runner::default();
2465+
executor.start(|context: deterministic::Context| async move {
2466+
let cache_ref = CacheRef::from_pooler(
2467+
&context.with_label("cache"),
2468+
PAGE_SIZE,
2469+
NZUsize!(BUFFER_SIZE),
2470+
);
2471+
let (blob, blob_size) = context
2472+
.open("test_partition", b"resize_invalidates_cache")
2473+
.await
2474+
.unwrap();
2475+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
2476+
.await
2477+
.unwrap();
2478+
2479+
// Write + sync a full page so it lands in the page cache. Use a distinct byte
2480+
// pattern so a stale cache read would be obvious.
2481+
let page_size = PAGE_SIZE.get() as usize;
2482+
let old_bytes = vec![0xAAu8; page_size];
2483+
append.append(&old_bytes).await.unwrap();
2484+
append.sync().await.unwrap();
2485+
2486+
// Confirm page 0 is reachable via the cache-only fast path.
2487+
let mut probe = vec![0u8; 16];
2488+
assert!(append.try_read_sync(0, &mut probe));
2489+
assert_eq!(probe, vec![0xAAu8; 16]);
2490+
2491+
// Rewind to 0 (crossing the page boundary) and append a new, distinct pattern.
2492+
append.resize(0).await.unwrap();
2493+
let new_bytes = vec![0xBBu8; 16];
2494+
append.append(&new_bytes).await.unwrap();
2495+
2496+
// The cache must not serve pre-resize bytes. Either try_read_sync misses (cache
2497+
// was invalidated) or it returns the new pattern; it must never return 0xAA.
2498+
let mut probe = vec![0u8; 16];
2499+
let hit = append.try_read_sync(0, &mut probe);
2500+
assert!(
2501+
!hit || probe == new_bytes,
2502+
"try_read_sync served stale pre-resize bytes: {probe:?}"
2503+
);
2504+
});
2505+
}
2506+
24592507
#[test]
24602508
fn test_reopen_partial_tail_append_and_resize() {
24612509
let executor = deterministic::Runner::default();

runtime/src/utils/buffer/paged/cache.rs

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,15 @@ struct Cache {
107107
/// # Invariants
108108
///
109109
/// Each `index` entry maps to exactly one `entries` slot, and that entry always has a
110-
/// matching key.
110+
/// matching key. (The converse is not true: after [Self::invalidate_from] a slot may retain
111+
/// a stale key that is no longer present in `index`.)
111112
index: HashMap<(u64, u64), usize>,
112113

113114
/// Metadata for each cache slot.
114115
///
115-
/// Each `entries` slot has exactly one corresponding `index` entry.
116+
/// Every entry reachable via `index` has a matching key here. Slots that were invalidated by
117+
/// [Self::invalidate_from] retain their stale key but are unreachable from `index` and will
118+
/// be reclaimed by the Clock evictor on the next sweep.
116119
entries: Vec<CacheEntry>,
117120

118121
/// Per-slot page buffers allocated from the pool.
@@ -137,6 +140,12 @@ struct Cache {
137140
/// Metadata for a single cache entry (page data stored in per-slot buffers).
138141
struct CacheEntry {
139142
/// The cache key which is composed of the blob id and page number of the page.
143+
///
144+
/// # Invariant
145+
///
146+
/// Every live cache slot has a matching entry in `index`. Slots that have been invalidated (see
147+
/// [Cache::invalidate_from]) retain their stale key here but are no longer reachable via
148+
/// `index` and will be reclaimed first by the Clock evictor.
140149
key: (u64, u64),
141150

142151
/// A bit indicating whether this page was recently referenced.
@@ -425,6 +434,13 @@ impl CacheRef {
425434

426435
buf.len()
427436
}
437+
438+
/// Drop any cached pages for `blob_id` at `page_num >= start_page`. Used after a blob is
439+
/// truncated so subsequent reads can't observe pre-truncation bytes in a page that the tip
440+
/// buffer (or future writes) now owns.
441+
pub(super) fn invalidate_from(&self, blob_id: u64, start_page: u64) {
442+
self.cache.write().invalidate_from(blob_id, start_page);
443+
}
428444
}
429445

430446
impl Cache {
@@ -525,18 +541,24 @@ impl Cache {
525541
return;
526542
}
527543

528-
// Cache full: find slot to evict using Clock algorithm
544+
// Cache full: find slot to evict using Clock algorithm. Invalidated slots (`referenced =
545+
// false`, stale `entry.key` no longer in `index`) are reclaimed on the first sweep.
529546
while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
530547
self.entries[self.clock]
531548
.referenced
532549
.store(false, Ordering::Relaxed);
533550
self.clock = (self.clock + 1) % self.entries.len();
534551
}
535552

536-
// Evict and replace
553+
// Evict and replace. Only drop the old `entry.key` from `index` when it still points
554+
// to this slot: after `invalidate_from` a slot may hold a stale key that has since
555+
// been re-cached at a different slot, and an unconditional `remove` would orphan
556+
// that live entry.
537557
let slot = self.clock;
538558
let entry = &mut self.entries[slot];
539-
assert!(self.index.remove(&entry.key).is_some());
559+
if self.index.get(&entry.key) == Some(&slot) {
560+
self.index.remove(&entry.key);
561+
}
540562
self.index.insert(key, slot);
541563
entry.key = key;
542564
entry.referenced.store(true, Ordering::Relaxed);
@@ -545,6 +567,21 @@ impl Cache {
545567
// Move the clock forward.
546568
self.clock = (self.clock + 1) % self.entries.len();
547569
}
570+
571+
/// Drop any cached pages for `blob_id` at `page_num >= start_page`. The slots keep their
572+
/// (now stale) `entry.key` so the Clock evictor can reclaim them; `read_at` and the
573+
/// duplicate-update path never reach them because `index` no longer maps to them.
574+
fn invalidate_from(&mut self, blob_id: u64, start_page: u64) {
575+
self.index.retain(|&(bid, page_num), &mut slot| {
576+
if bid != blob_id || page_num < start_page {
577+
return true;
578+
}
579+
self.entries[slot]
580+
.referenced
581+
.store(false, Ordering::Relaxed);
582+
false
583+
});
584+
}
548585
}
549586

550587
/// Fetch one logical page for insertion into the page cache, rejecting partial pages because cache
@@ -755,6 +792,56 @@ mod tests {
755792
);
756793
}
757794

795+
#[test_traced]
796+
fn test_invalidate_from_does_not_orphan_re_cached_page() {
797+
// Regression: when the Clock evictor lands on an invalidated slot whose stale key has
798+
// since been re-cached at a different slot, the old index entry (pointing to the
799+
// live slot) must not be removed.
800+
let mut registry = Registry::default();
801+
let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
802+
let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(2));
803+
let blob_id = 0u64;
804+
let page_size = PAGE_SIZE.get() as usize;
805+
806+
// Fill both slots, then invalidate them so both carry stale keys with referenced=false.
807+
cache.cache(blob_id, &vec![0xAA; page_size], 0);
808+
cache.cache(blob_id, &vec![0xBB; page_size], 1);
809+
cache.invalidate_from(blob_id, 0);
810+
811+
// Re-cache page 1. Clock sits at slot 0, which is referenced=false, so the insert
812+
// lands at slot 0 (slot 1 still holds its stale (blob, 1) key).
813+
cache.cache(blob_id, &vec![0xCC; page_size], 1);
814+
let mut buf = vec![0u8; page_size];
815+
assert_eq!(
816+
cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64),
817+
page_size,
818+
"page 1 should be readable after re-cache"
819+
);
820+
assert_eq!(buf, vec![0xCC; page_size]);
821+
822+
// Cache a new page. Clock now advances to slot 1 (still referenced=false), evicts it.
823+
// With the buggy unconditional `index.remove(entry.key)` this would remove the live
824+
// (blob, 1) -> slot 0 mapping, orphaning slot 0.
825+
cache.cache(blob_id, &vec![0xDD; page_size], 2);
826+
827+
// Slot 0 must still be reachable via its live index entry.
828+
let mut buf = vec![0u8; page_size];
829+
assert_eq!(
830+
cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64),
831+
page_size,
832+
"live page 1 was orphaned by stale-slot eviction"
833+
);
834+
assert_eq!(buf, vec![0xCC; page_size]);
835+
836+
// And the newly cached page 2 is also reachable.
837+
let mut buf = vec![0u8; page_size];
838+
assert_eq!(
839+
cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64 * 2),
840+
page_size
841+
);
842+
assert_eq!(buf, vec![0xDD; page_size]);
843+
}
844+
758845
#[test_traced]
759846
fn test_cache_read_with_blob() {
760847
// Initialize the deterministic context

0 commit comments

Comments
 (0)