Skip to content

Commit 2f42265

Browse files
erneestocclaudeMarcusSorealheis
authored
fast_slow_store: has_with_results consults fast store and in-flight slow writes (#2343)
* fast_slow_store: make has_with_results aware of fast store and in-flight slow writes `FastSlowStore::has_with_results` previously only consulted the slow store, which caused two known bugs: 1. Fast-only writes (and the brief window between fast-store insert and slow-store write start) were invisible — callers got NotFound for blobs that were locally present, triggering redundant fetches. 2. Concurrent writers racing on the same digest could not see each other's in-flight slow-store writes, so the second writer re-uploaded what the first had nearly finished pushing. The fix layers three consultations in order: slow store first (authoritative for downstream consumers), then a `in_flight_slow_writes: Mutex<HashMap>` tracking active slow writes, then fast store as a final fallback for fast-only / pre-slow-write hits. In-flight tracking uses a cancel-safe RAII guard (`InFlightSlowWriteGuard`) registered at the start of each `update`/`update_with_whole_file` path that writes to the slow store, so a cancelled write future correctly removes itself from the map on Drop. Tests cover all four reachable cases: slow-hit short-circuit, fast-only hit, in-flight slow write visibility, and noop-slow-store fall-through. Provenance: equivalent to upstream commits f69aaf8 and 2d770d9 from TraceMachina/nativelink PR #2243, ported atomically to current main. * fast_slow_store_test: add cancel-safety and mixed-key has_with_results tests The previous commit added tests for the four reachable cases of the fast/slow/in-flight has() lookup, but two correctness properties were asserted only indirectly: 1. Cancel safety of `InFlightSlowWriteGuard`. The fix's central claim is that aborting an in-progress update() removes the key from the in_flight_slow_writes map via Drop. The previous in-flight test only verified the happy-path (writer completes normally) and the fast-store fallback masked any leak. 2. Per-key independence in batched has_with_results. With multiple keys spanning different storage tiers, an off-by-one in the missing_indices fallback or an over-broad in-flight match would silently corrupt results without any existing test noticing. New tests: * dropping_update_future_cleans_up_in_flight_entry: uses a gated-slow-store with NoopStore as fast (so the fast-store fallback cannot mask a leak), spawns a writer, waits on a oneshot started signal, aborts the writer mid-update, then asserts has() returns None — proving the guard's Drop ran. * has_with_results_handles_mixed_key_sources: builds a request with four keys — slow-only, in-flight, fast-only, and missing — and asserts each result independently matches its source. Catches index-mapping regressions in the batched fallback path. Both tests are deterministic (oneshot channels, no sleeps). Verified that reverting the fix in fast_slow_store.rs causes both new tests plus the existing has_sees_in_flight_slow_writes and fast_store_only_value_is_reported_by_has to fail. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fmt: apply rustfmt to nativelink-store Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fast_slow_store_test: address clippy items_after_statements and cast_possible_truncation Hoist the `MapBackedSlow` test helper (struct, StoreDriver impl, and `default_health_status_indicator!` macro invocation) out of `has_with_results_handles_mixed_key_sources` to module scope so that items are declared before statements, satisfying `clippy::items_after_statements`. Replace `as usize` casts of the per-key `u64` sizes with `usize::try_from(...).unwrap()` to satisfy `clippy::cast_possible_truncation`, matching the existing conversion pattern elsewhere in this test file. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * ci: retrigger after GitHub 502 fetching hermetic_cc_toolchain tarball --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Marcus Eagan <marcuseagan@gmail.com>
1 parent 2410a08 commit 2f42265

2 files changed

Lines changed: 681 additions & 9 deletions

File tree

nativelink-store/src/fast_slow_store.rs

Lines changed: 118 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ pub struct FastSlowStore {
6565
// actually it's faster because we're not downloading the file multiple
6666
// times are doing loads of duplicate IO.
6767
populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
68+
// Tracks keys whose slow-store write is currently in flight, along with
69+
// the best-known size. Consulted by `has_with_results` so that a
70+
// concurrent writer that has not yet finished pushing to the slow store
71+
// is still visible to a concurrent existence check, preventing redundant
72+
// duplicate uploads of the same blob.
73+
in_flight_slow_writes: Mutex<HashMap<StoreKey<'static>, u64>>,
6874
}
6975

7076
// This guard ensures that the populating_digests is cleared even if the future
@@ -91,6 +97,27 @@ impl LoaderGuard<'_> {
9197
}
9298
}
9399

100+
/// Cancel-safe RAII guard that removes an entry from
101+
/// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the
102+
/// map does not leak entries if the surrounding `update()` future is
103+
/// cancelled before the slow-store write completes.
104+
struct InFlightSlowWriteGuard {
105+
weak_store: Weak<FastSlowStore>,
106+
key: Option<StoreKey<'static>>,
107+
}
108+
109+
impl Drop for InFlightSlowWriteGuard {
110+
fn drop(&mut self) {
111+
let Some(store) = self.weak_store.upgrade() else {
112+
return;
113+
};
114+
let Some(key) = self.key.take() else {
115+
return;
116+
};
117+
store.in_flight_slow_writes.lock().remove(&key);
118+
}
119+
}
120+
94121
impl Drop for LoaderGuard<'_> {
95122
fn drop(&mut self) {
96123
let Some(store) = self.weak_store.upgrade() else {
@@ -126,9 +153,40 @@ impl FastSlowStore {
126153
weak_self: weak_self.clone(),
127154
metrics: FastSlowStoreMetrics::default(),
128155
populating_digests: Mutex::new(HashMap::new()),
156+
in_flight_slow_writes: Mutex::new(HashMap::new()),
129157
})
130158
}
131159

160+
/// Best-effort size for tracking in-flight slow writes. Falls back to 0
161+
/// when neither the upload size nor the key carries an exact size
162+
/// (e.g. `MaxSize` for a string key). Callers of `has_with_results` only
163+
/// rely on `Some(_)` vs `None`, so a size of 0 still correctly signals
164+
/// "this blob exists".
165+
const fn track_size(key: &StoreKey<'_>, size_info: UploadSizeInfo) -> u64 {
166+
match size_info {
167+
UploadSizeInfo::ExactSize(s) => s,
168+
UploadSizeInfo::MaxSize(_) => match key {
169+
StoreKey::Digest(d) => d.size_bytes(),
170+
StoreKey::Str(_) => 0,
171+
},
172+
}
173+
}
174+
175+
fn register_in_flight_slow_write(
176+
&self,
177+
key: StoreKey<'_>,
178+
size: u64,
179+
) -> InFlightSlowWriteGuard {
180+
let owned = key.into_owned();
181+
self.in_flight_slow_writes
182+
.lock()
183+
.insert(owned.borrow().into_owned(), size);
184+
InFlightSlowWriteGuard {
185+
weak_store: self.weak_self.clone(),
186+
key: Some(owned),
187+
}
188+
}
189+
132190
pub const fn fast_store(&self) -> &Store {
133191
&self.fast_store
134192
}
@@ -365,11 +423,55 @@ impl StoreDriver for FastSlowStore {
365423
if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
366424
return self.fast_store.has_with_results(key, results).await;
367425
}
368-
// Only check the slow store because if it's not there, then something
369-
// down stream might be unable to get it. This should not affect
370-
// workers as they only use get() and a CAS can use an
371-
// ExistenceCacheStore to avoid the bottleneck.
372-
self.slow_store.has_with_results(key, results).await
426+
// Primary lookup is the slow store because that's authoritative for
427+
// downstream consumers that fetch from there. But the slow store
428+
// alone can miss two important cases:
429+
// 1. A concurrent writer's slow-store write is still in flight.
430+
// 2. The blob is present in the fast (local) store — either
431+
// fast-only by configuration, or because the slow write has
432+
// not yet started/completed.
433+
// Reporting NotFound in those cases causes redundant duplicate
434+
// uploads or unnecessary slow-store fetches.
435+
self.slow_store.has_with_results(key, results).await?;
436+
437+
// Fill in any blobs whose slow-store write is currently in flight.
438+
// Cheap when the map is empty (the common case).
439+
{
440+
let in_flight = self.in_flight_slow_writes.lock();
441+
if !in_flight.is_empty() {
442+
for (k, result) in key.iter().zip(results.iter_mut()) {
443+
if result.is_none() {
444+
let owned = k.borrow().into_owned();
445+
if let Some(size) = in_flight.get(&owned) {
446+
*result = Some(*size);
447+
}
448+
}
449+
}
450+
}
451+
}
452+
453+
// Fall back to the fast store for anything still missing. This
454+
// covers fast-only writes and the brief window between fast-store
455+
// insertion and slow-store write start.
456+
let missing_indices: Vec<usize> = results
457+
.iter()
458+
.enumerate()
459+
.filter_map(|(i, r)| if r.is_none() { Some(i) } else { None })
460+
.collect();
461+
if !missing_indices.is_empty() {
462+
let missing_keys: Vec<StoreKey<'_>> =
463+
missing_indices.iter().map(|&i| key[i].borrow()).collect();
464+
let mut fast_results = vec![None; missing_keys.len()];
465+
self.fast_store
466+
.has_with_results(&missing_keys, &mut fast_results)
467+
.await?;
468+
for (j, &orig_idx) in missing_indices.iter().enumerate() {
469+
if fast_results[j].is_some() {
470+
results[orig_idx] = fast_results[j];
471+
}
472+
}
473+
}
474+
Ok(())
373475
}
374476

375477
async fn update(
@@ -405,9 +507,14 @@ impl StoreDriver for FastSlowStore {
405507
return self.fast_store.update(key, reader, size_info).await;
406508
}
407509
if ignore_fast {
510+
let _guard =
511+
self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info));
408512
return self.slow_store.update(key, reader, size_info).await;
409513
}
410514

515+
let _slow_in_flight_guard =
516+
self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info));
517+
411518
let (mut fast_tx, fast_rx) = make_buf_channel_pair();
412519
let (mut slow_tx, slow_rx) = make_buf_channel_pair();
413520

@@ -549,6 +656,10 @@ impl StoreDriver for FastSlowStore {
549656
{
550657
trace!("FastSlowStore::update_with_whole_file: uploading to slow_store");
551658
let slow_start = std::time::Instant::now();
659+
let _guard = self.register_in_flight_slow_write(
660+
key.borrow(),
661+
Self::track_size(&key, upload_size),
662+
);
552663
slow_update_store_with_file(
553664
self.slow_store.as_store_driver_pin(),
554665
key.borrow(),
@@ -598,6 +709,8 @@ impl StoreDriver for FastSlowStore {
598709
if ignore_slow {
599710
return Ok(Some(file));
600711
}
712+
let _guard = self
713+
.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, upload_size));
601714
return self
602715
.slow_store
603716
.update_with_whole_file(key, path, file, upload_size)

0 commit comments

Comments
 (0)