From fbf8069c8f96a8c5c0c5d0ff9bccb6680ede99ab Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 25 Feb 2026 14:24:24 +0530 Subject: [PATCH 1/6] Fix evicting map get lock and fast slow store EOF --- nativelink-store/src/fast_slow_store.rs | 93 +++++++++++++++--------- nativelink-store/src/filesystem_store.rs | 3 +- nativelink-util/src/evicting_map.rs | 73 +++++++++++-------- 3 files changed, 102 insertions(+), 67 deletions(-) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 1a52d7577..c377da6c9 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -396,7 +396,7 @@ impl StoreDriver for FastSlowStore { return self.slow_store.update(key, reader, size_info).await; } - let (mut fast_tx, fast_rx) = make_buf_channel_pair(); + let (fast_tx, fast_rx) = make_buf_channel_pair(); let (mut slow_tx, slow_rx) = make_buf_channel_pair(); let key_debug = format!("{key:?}"); @@ -408,6 +408,7 @@ impl StoreDriver for FastSlowStore { let mut bytes_sent: u64 = 0; let data_stream_fut = async move { + let mut fast_tx = Some(fast_tx); loop { let buffer = reader .recv() @@ -415,9 +416,9 @@ impl StoreDriver for FastSlowStore { .err_tip(|| "Failed to read buffer in fastslow store")?; if buffer.is_empty() { // EOF received. - fast_tx.send_eof().err_tip( - || "Failed to write eof to fast store in fast_slow store update", - )?; + if let Some(mut ftx) = fast_tx.take() { + drop(ftx.send_eof()); + } slow_tx .send_eof() .err_tip(|| "Failed to write eof to writer in fast_slow store update")?; @@ -429,34 +430,43 @@ impl StoreDriver for FastSlowStore { } let chunk_len = buffer.len(); - let send_start = std::time::Instant::now(); - let (fast_result, slow_result) = - join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer)); - let send_elapsed = send_start.elapsed(); - if send_elapsed.as_secs() >= 5 { - warn!( - chunk_len, - send_elapsed_ms = send_elapsed.as_millis(), - total_bytes = bytes_sent, - "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging", - ); - } - bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX); - fast_result - .map_err(|e| { + if let Some(ref mut ftx) = fast_tx { + let send_start = std::time::Instant::now(); + let (fast_result, slow_result) = + join!(ftx.send(buffer.clone()), slow_tx.send(buffer)); + let send_elapsed = send_start.elapsed(); + if send_elapsed.as_secs() >= 5 { + warn!( + chunk_len, + send_elapsed_ms = send_elapsed.as_millis(), + total_bytes = bytes_sent, + "FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging", + ); + } + if fast_result.is_err() { + warn!( + total_bytes = bytes_sent, + "FastSlowStore::update: fast store channel failed, continuing with slow store only", + ); + fast_tx = None; + } + slow_result.map_err(|e| { make_err!( Code::Internal, - "Failed to send message to fast_store in fast_slow_store {:?}", + "Failed to send message to slow_store in fast_slow store {:?}", e ) - }) - .merge(slow_result.map_err(|e| { + })?; + } else { + slow_tx.send(buffer).await.map_err(|e| { make_err!( Code::Internal, "Failed to send message to slow_store in fast_slow store {:?}", e ) - }))?; + })?; + } + bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX); } }; @@ -483,7 +493,15 @@ impl StoreDriver for FastSlowStore { "FastSlowStore::update: completed successfully", ); } - data_stream_res.merge(fast_res).merge(slow_res)?; + // Slow store success is required; fast store failure is tolerated since it's a cache. + data_stream_res.merge(slow_res)?; + if let Err(err) = fast_res { + warn!( + ?err, + key = %key_debug, + "FastSlowStore::update: fast store failed during upload; data stored in slow store", + ); + } Ok(()) } @@ -591,16 +609,23 @@ impl StoreDriver for FastSlowStore { // TODO(palfrey) Investigate if we should maybe ignore errors here instead of // forwarding them up. if self.fast_store.has(key.borrow()).await?.is_some() { - self.metrics - .fast_store_hit_count - .fetch_add(1, Ordering::Acquire); - self.fast_store - .get_part(key, writer.borrow_mut(), offset, length) - .await?; - self.metrics - .fast_store_downloaded_bytes - .fetch_add(writer.get_bytes_written(), Ordering::Acquire); - return Ok(()); + match self + .fast_store + .get_part(key.borrow(), writer.borrow_mut(), offset, length) + .await + { + Ok(()) => { + self.metrics + .fast_store_hit_count + .fetch_add(1, Ordering::Acquire); + self.metrics + .fast_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + Err(err) if err.code == Code::NotFound => {} + Err(err) => return Err(err), + } } // If the fast store is noop or read only or update only then bypass it. diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 8ee6d9c0f..60e2010d5 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -816,7 +816,8 @@ impl FilesystemStore { // it still exists in there. But first, get the lock... let mut encoded_file_path = entry.get_encoded_file_path().write().await; // Then check it's still in there... - if evicting_map.get(&key).await.is_none() { + // Use size_for_key instead of get() to avoid triggering bulk eviction of other entries. + if evicting_map.size_for_key(&key).await.is_none() { info!(%key, "Got eviction while emplacing, dropping"); return Ok(()); } diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index e779f38b6..13e138697 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -435,41 +435,50 @@ where } pub async fn get(&self, key: &Q) -> Option { - // Fast path: Check if we need eviction before acquiring lock for eviction - let needs_eviction = { - let state = self.state.lock(); - if let Some((_, peek_entry)) = state.lru.peek_lru() { - self.should_evict( - state.lru.len(), - peek_entry, - state.sum_store_size, - self.max_bytes, - ) + let (result, items_to_unref, removal_futures) = { + let mut state = self.state.lock(); + // Check if the requested item is expired before promoting it. + let result = if let Some(entry) = state.lru.peek(key.borrow()) { + if self.should_evict(state.lru.len(), entry, 0, u64::MAX) { + // Item is expired, remove it. + if let Some((k, eviction_item)) = state.lru.pop_entry(key.borrow()) { + let (data, futures) = state.remove(k.borrow(), &eviction_item, false); + let (mut items, mut removals) = self.evict_items(&mut *state); + items.push(data); + removals.extend(futures); + return { + Self::unref_items(items, removals).await; + None + }; + } + None + } else { + // Item is valid. Promote it in LRU so it's safe from eviction. + state.lru.get_mut(key.borrow()).map(|entry| { + entry.seconds_since_anchor = + i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); + entry.data.clone() + }) + } } else { - false - } - }; - - // Perform eviction if needed - if needs_eviction { - let (items_to_unref, removal_futures) = { - let mut state = self.state.lock(); - self.evict_items(&mut *state) + None }; - // Unref items outside of lock - let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); - while callbacks.next().await.is_some() {} - let mut callbacks: FuturesUnordered<_> = - items_to_unref.iter().map(LenEntry::unref).collect(); - while callbacks.next().await.is_some() {} - } + // Evict other items if needed + let (items_to_unref, removal_futures) = self.evict_items(&mut *state); + (result, items_to_unref, removal_futures) + }; + // Unref items outside of lock + Self::unref_items(items_to_unref, removal_futures).await; + result + } - // Now get the item - let mut state = self.state.lock(); - let entry = state.lru.get_mut(key.borrow())?; - entry.seconds_since_anchor = - i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); - Some(entry.data.clone()) + /// Helper to unref evicted items outside of lock. + async fn unref_items(items_to_unref: Vec, removal_futures: Vec) { + let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect(); + while callbacks.next().await.is_some() {} + let mut callbacks: FuturesUnordered<_> = + items_to_unref.iter().map(LenEntry::unref).collect(); + while callbacks.next().await.is_some() {} } /// Returns the replaced item if any. From 384c923d8ccb9d87bec03332fe0d838177d3ee2a Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 25 Feb 2026 17:21:21 +0530 Subject: [PATCH 2/6] Fix blocking lock on evicting map --- nativelink-util/src/evicting_map.rs | 27 +++++++++++++------------- nativelink-util/src/instant_wrapper.rs | 2 +- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 13e138697..5726e9370 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -438,7 +438,7 @@ where let (result, items_to_unref, removal_futures) = { let mut state = self.state.lock(); // Check if the requested item is expired before promoting it. - let result = if let Some(entry) = state.lru.peek(key.borrow()) { + if let Some(entry) = state.lru.peek(key.borrow()) { if self.should_evict(state.lru.len(), entry, 0, u64::MAX) { // Item is expired, remove it. if let Some((k, eviction_item)) = state.lru.pop_entry(key.borrow()) { @@ -446,28 +446,27 @@ where let (mut items, mut removals) = self.evict_items(&mut *state); items.push(data); removals.extend(futures); - return { - Self::unref_items(items, removals).await; - None - }; + (None, items, removals) + } else { + let (items, removals) = self.evict_items(&mut *state); + (None, items, removals) } - None } else { // Item is valid. Promote it in LRU so it's safe from eviction. - state.lru.get_mut(key.borrow()).map(|entry| { + let data = state.lru.get_mut(key.borrow()).map(|entry| { entry.seconds_since_anchor = i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX); entry.data.clone() - }) + }); + let (items, removals) = self.evict_items(&mut *state); + (data, items, removals) } } else { - None - }; - // Evict other items if needed - let (items_to_unref, removal_futures) = self.evict_items(&mut *state); - (result, items_to_unref, removal_futures) + let (items, removals) = self.evict_items(&mut *state); + (None, items, removals) + } }; - // Unref items outside of lock + // Unref items outside of lock — lock is guaranteed dropped here. Self::unref_items(items_to_unref, removal_futures).await; result } diff --git a/nativelink-util/src/instant_wrapper.rs b/nativelink-util/src/instant_wrapper.rs index 81247ec13..2b882d0b1 100644 --- a/nativelink-util/src/instant_wrapper.rs +++ b/nativelink-util/src/instant_wrapper.rs @@ -88,7 +88,7 @@ impl InstantWrapper for MockInstantWrapped { let baseline = self.0.elapsed(); loop { tokio::task::yield_now().await; - if self.0.elapsed() - baseline >= duration { + if self.0.elapsed().saturating_sub(baseline) >= duration { break; } } From d73481d796041222bdb93975248871bd444928c9 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 25 Feb 2026 19:10:44 +0530 Subject: [PATCH 3/6] Reverted the fast slow store changes --- nativelink-store/src/fast_slow_store.rs | 27 +++++++++---------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index c377da6c9..318c6886e 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -609,23 +609,16 @@ impl StoreDriver for FastSlowStore { // TODO(palfrey) Investigate if we should maybe ignore errors here instead of // forwarding them up. if self.fast_store.has(key.borrow()).await?.is_some() { - match self - .fast_store - .get_part(key.borrow(), writer.borrow_mut(), offset, length) - .await - { - Ok(()) => { - self.metrics - .fast_store_hit_count - .fetch_add(1, Ordering::Acquire); - self.metrics - .fast_store_downloaded_bytes - .fetch_add(writer.get_bytes_written(), Ordering::Acquire); - return Ok(()); - } - Err(err) if err.code == Code::NotFound => {} - Err(err) => return Err(err), - } + self.metrics + .fast_store_hit_count + .fetch_add(1, Ordering::Acquire); + self.fast_store + .get_part(key, writer.borrow_mut(), offset, length) + .await?; + self.metrics + .fast_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); } // If the fast store is noop or read only or update only then bypass it. From a4e773113bcbf28ce78eb45f5f174c6d3ac13ba5 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 25 Feb 2026 21:00:06 +0530 Subject: [PATCH 4/6] Add fast slow store for update with whole file --- nativelink-store/src/fast_slow_store.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 318c6886e..3accbf838 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -556,10 +556,20 @@ impl StoreDriver for FastSlowStore { { return Ok(Some(file)); } - return self + return match self .fast_store .update_with_whole_file(key, path, file, upload_size) - .await; + .await + { + Ok(file_slot) => Ok(file_slot), + Err(err) => { + warn!( + ?err, + "FastSlowStore::update_with_whole_file: fast store failed; data stored in slow store", + ); + Ok(None) + } + }; } if self @@ -573,14 +583,19 @@ impl StoreDriver for FastSlowStore { || self.fast_direction == StoreDirection::ReadOnly || self.fast_direction == StoreDirection::Get; if !ignore_fast { - slow_update_store_with_file( + if let Err(err) = slow_update_store_with_file( self.fast_store.as_store_driver_pin(), key.borrow(), &mut file, upload_size, ) .await - .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; + { + warn!( + ?err, + "FastSlowStore::update_with_whole_file: fast store failed; continuing with slow store", + ); + } } let ignore_slow = self.slow_direction == StoreDirection::ReadOnly || self.slow_direction == StoreDirection::Get; From d93775a6c8eae12caab662a5b60be906a9102690 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Thu, 26 Feb 2026 16:13:01 +0530 Subject: [PATCH 5/6] Return Precondition in case of Not Found error, such that bazel can retry uploading results --- nativelink-worker/src/local_worker.rs | 31 ++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 0b9ff40e2..bb0eec13f 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -369,11 +369,32 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke .err_tip(|| "Error while calling execution_response")?; }, Err(e) => { - grpc_client.execution_response(ExecuteResult{ - instance_name, - operation_id, - result: Some(execute_result::Result::InternalError(e.into())), - }).await.err_tip(|| "Error calling execution_response with error")?; + if e.code == Code::NotFound { + warn!( + ?e, + "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION" + ); + let action_result = ActionResult { + error: Some(make_err!( + Code::FailedPrecondition, + "{}", + e.message_string() + )), + ..ActionResult::default() + }; + let action_stage = ActionStage::Completed(action_result); + grpc_client.execution_response(ExecuteResult{ + instance_name, + operation_id, + result: Some(execute_result::Result::ExecuteResponse(action_stage.into())), + }).await.err_tip(|| "Error calling execution_response with missing inputs")?; + } else { + grpc_client.execution_response(ExecuteResult{ + instance_name, + operation_id, + result: Some(execute_result::Result::InternalError(e.into())), + }).await.err_tip(|| "Error calling execution_response with error")?; + } }, } Ok(()) From 0b2f8efcff23ad8070d131f8de0bda2c91538d8c Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Thu, 26 Feb 2026 18:40:43 +0530 Subject: [PATCH 6/6] Return Precondition in case of only fast slow store not found errors --- nativelink-worker/src/local_worker.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index bb0eec13f..ccf53a3a4 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -369,7 +369,9 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke .err_tip(|| "Error while calling execution_response")?; }, Err(e) => { - if e.code == Code::NotFound { + let is_cas_blob_missing = e.code == Code::NotFound + && e.message_string().contains("not found in either fast or slow store"); + if is_cas_blob_missing { warn!( ?e, "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION"