diff --git a/Cargo.lock b/Cargo.lock index fa1b330af..05da13a72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2186,7 +2186,6 @@ dependencies = [ "bytes", "bytes-utils", "const_format", - "filetime", "fred", "futures", "hex", diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..fc5d492be 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -318,9 +318,7 @@ pub enum StoreSpec { /// Stores the data on the filesystem. This store is designed for /// local persistent storage. Restarts of this program should restore /// the previous state, meaning anything uploaded will be persistent - /// as long as the filesystem integrity holds. This store uses the - /// filesystem's `atime` (access time) to hold the last touched time - /// of the file(s). + /// as long as the filesystem integrity holds. /// /// **Example JSON Config:** /// ```json diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 0441d7dba..878365d3b 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -52,7 +52,6 @@ rust_library( "@crates//:bytes", "@crates//:bytes-utils", "@crates//:const_format", - "@crates//:filetime", "@crates//:fred", "@crates//:futures", "@crates//:hex", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index 379dbfeb4..ada648629 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -28,7 +28,6 @@ byteorder = { version = "1.5.0", default-features = false } bytes = { version = "1.9.0", default-features = false } bytes-utils = { version = "0.1.4", default-features = false } const_format = { version = "0.2.34", default-features = false } -filetime = "0.2.25" fred = { version = "10.0.3", default-features = false, features = [ "i-std", "i-scripts", diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 8e6f0a1d6..74b71182a 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -23,12 +23,12 @@ use std::time::SystemTime; use async_lock::RwLock; use async_trait::async_trait; use bytes::BytesMut; -use filetime::{set_file_atime, FileTime}; use futures::stream::{StreamExt, TryStreamExt}; use futures::{Future, TryFutureExt}; use nativelink_config::stores::FilesystemSpec; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; +use nativelink_util::background_spawn; use nativelink_util::buf_channel::{ make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, }; @@ -38,7 +38,6 @@ use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthS use nativelink_util::store_trait::{ StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo, }; -use nativelink_util::{background_spawn, spawn_blocking}; use tokio::io::{AsyncReadExt, AsyncWriteExt, Take}; use tokio_stream::wrappers::ReadDirStream; use tracing::{event, Level}; @@ -338,33 +337,6 @@ impl LenEntry for FileEntryImpl { self.data_size == 0 } - #[inline] - async fn touch(&self) -> bool { - let result = self - .get_file_path_locked(move |full_content_path| async move { - let full_content_path = full_content_path.clone(); - spawn_blocking!("filesystem_touch_set_mtime", move || { - set_file_atime(&full_content_path, FileTime::now()).err_tip(|| { - format!("Failed to touch file in filesystem store {full_content_path:?}") - }) - }) - .await - .map_err(|e| { - make_err!( - Code::Internal, - "Failed to change atime of file due to spawn failing {:?}", - e - ) - })? - }) - .await; - if let Err(err) = result { - event!(Level::ERROR, ?err, "Failed to touch file",); - return false; - } - true - } - // unref() only triggers when an item is removed from the eviction_map. It is possible // that another place in code has a reference to `FileEntryImpl` and may later read the // file. To support this edge case, we first move the file to a temp file and point @@ -499,20 +471,13 @@ async fn add_files_to_cache( // We need to filter out folders - we do not want to try to cache the s and d folders. let is_file = metadata.is_file() || !(file_name == STR_FOLDER || file_name == DIGEST_FOLDER); - let atime = match metadata.accessed() { - Ok(atime) => atime, - Err(err) => { - panic!( - "{}{}{} : {} {:?}", - "It appears this filesystem does not support access time. ", - "Please configure this program to run on a drive that supports ", - "atime", - file_name, - err - ); - } - }; - + // Using access time is not perfect, but better than random. We do not update the + // atime when a file is actually "touched", we rely on whatever the filesystem does + // when we read the file (usually update on read). + let atime = metadata + .accessed() + .or_else(|_| metadata.modified()) + .unwrap_or(SystemTime::UNIX_EPOCH); Result::<(String, SystemTime, u64, bool), Error>::Ok(( file_name, atime, @@ -966,7 +931,19 @@ impl StoreDriver for FilesystemStore { ) })?; let read_limit = length.unwrap_or(u64::MAX); - let mut temp_file = entry.read_file_part(offset, read_limit).await?; + let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move { + // If the file is not found, we need to remove it from the eviction map. + if err.code == Code::NotFound { + event!( + Level::ERROR, + ?err, + ?key, + "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted." + ); + self.evicting_map.remove(&key).await; + } + Err(err) + }).await?; loop { let mut buf = BytesMut::with_capacity(self.read_buffer_size); diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index e32659610..cd04a225b 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -19,11 +19,10 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, LazyLock}; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use async_lock::RwLock; use bytes::Bytes; -use filetime::{set_file_atime, FileTime}; use futures::executor::block_on; use futures::task::Poll; use futures::{poll, Future, FutureExt}; @@ -142,10 +141,6 @@ impl LenEntry for TestFileEntry bool { - self.inner.as_ref().unwrap().touch().await - } - async fn unref(&self) { Hooks::on_unref(self); self.inner.as_ref().unwrap().unref().await; @@ -570,98 +565,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { check_temp_empty(&temp_path).await } -#[nativelink_test] -async fn atime_updates_on_get_part_test() -> Result<(), Error> { - let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; - - let store = Box::pin( - FilesystemStore::::new(&FilesystemSpec { - content_path: make_temp_path("content_path"), - temp_path: make_temp_path("temp_path"), - eviction_policy: None, - ..Default::default() - }) - .await?, - ); - // Insert data into store. - store.update_oneshot(digest1, VALUE1.into()).await?; - - let file_entry = store.get_file_entry_for_digest(&digest1).await?; - file_entry - .get_file_path_locked(move |path| async move { - // Set atime to along time ago. - set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?; - - // Check to ensure it was set to zero from previous command. - assert_eq!( - fs::metadata(&path).await?.accessed()?, - SystemTime::UNIX_EPOCH - ); - Ok(()) - }) - .await?; - - // Now touch digest1. - let data = store.get_part_unchunked(digest1, 0, None).await?; - assert_eq!(data, VALUE1.as_bytes()); - - file_entry - .get_file_path_locked(move |path| async move { - // Ensure it was updated. - assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH); - Ok(()) - }) - .await?; - - Ok(()) -} - -#[nativelink_test] -async fn eviction_drops_file_test() -> Result<(), Error> { - let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; - - let store = Box::pin( - FilesystemStore::::new(&FilesystemSpec { - content_path: make_temp_path("content_path"), - temp_path: make_temp_path("temp_path"), - eviction_policy: None, - ..Default::default() - }) - .await?, - ); - // Insert data into store. - store.update_oneshot(digest1, VALUE1.into()).await?; - - let file_entry = store.get_file_entry_for_digest(&digest1).await?; - file_entry - .get_file_path_locked(move |path| async move { - // Set atime to along time ago. - set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?; - - // Check to ensure it was set to zero from previous command. - assert_eq!( - fs::metadata(&path).await?.accessed()?, - SystemTime::UNIX_EPOCH - ); - Ok(()) - }) - .await?; - - // Now touch digest1. - let data = store.get_part_unchunked(digest1, 0, None).await?; - assert_eq!(data, VALUE1.as_bytes()); - - file_entry - .get_file_path_locked(move |path| async move { - // Ensure it was updated. - assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH); - Ok(()) - }) - .await?; - - Ok(()) -} - // Test to ensure that if we are holding a reference to `FileEntry` and the contents are // replaced, the `FileEntry` continues to use the old data. // `FileEntry` file contents should be immutable for the lifetime of the object. @@ -1150,11 +1053,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}")); std::fs::remove_file(stored_file_path)?; - let digest_result = store - .has(digest) - .await - .err_tip(|| "Failed to execute has")?; - assert!(digest_result.is_none()); + let get_part_res = store.get_part_unchunked(digest, 0, None).await; + assert_eq!(get_part_res.unwrap_err().code, Code::NotFound); // Repeat with a string typed key. @@ -1168,11 +1068,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}")); std::fs::remove_file(stored_file_path)?; - let string_result = store - .has(string_key) - .await - .err_tip(|| "Failed to execute has")?; - assert!(string_result.is_none()); + let string_digest_get_part_res = store.get_part_unchunked(string_key, 0, None).await; + assert_eq!(string_digest_get_part_res.unwrap_err().code, Code::NotFound); Ok(()) } diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index af8ad4945..642af29c4 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -50,13 +50,6 @@ pub trait LenEntry: 'static { /// Returns `true` if `self` has zero length. fn is_empty(&self) -> bool; - /// Called when an entry is touched. On failure, will remove the entry - /// from the map. - #[inline] - fn touch(&self) -> impl Future + Send { - std::future::ready(true) - } - /// This will be called when object is removed from map. /// Note: There may still be a reference to it held somewhere else, which /// is why it can't be mutable. This is a good place to mark the item @@ -86,11 +79,6 @@ impl LenEntry for Arc { T::is_empty(self.as_ref()) } - #[inline] - async fn touch(&self) -> bool { - self.as_ref().touch().await - } - #[inline] async fn unref(&self) { self.as_ref().unref().await; @@ -347,27 +335,21 @@ where }; match maybe_entry { Some(entry) => { - // Since we are not inserting anythign we don't need to evict based - // on the size of the store. // Note: We need to check eviction because the item might be expired // based on the current time. In such case, we remove the item while // we are here. - let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX); - if !should_evict && peek { - *result = Some(entry.data.len()); - } else if !should_evict && entry.data.touch().await { - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - *result = Some(entry.data.len()); - } else { + if self.should_evict(lru_len, entry, 0, u64::MAX) { *result = None; if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) { - if should_evict { - event!(Level::INFO, ?key, "Item expired, evicting"); - } else { - event!(Level::INFO, ?key, "Touch failed, evicting"); - } + event!(Level::INFO, ?key, "Item expired, evicting"); state.remove(key.borrow(), &eviction_item, false).await; } + } else { + if !peek { + entry.seconds_since_anchor = + self.anchor_time.elapsed().as_secs() as i32; + } + *result = Some(entry.data.len()); } } None => *result = None, @@ -385,15 +367,8 @@ where let entry = state.lru.get_mut(key.borrow())?; - if entry.data.touch().await { - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - return Some(entry.data.clone()); - } - - let (key, eviction_item) = state.lru.pop_entry(key.borrow())?; - event!(Level::INFO, ?key, "Touch failed, evicting"); - state.remove(key.borrow(), &eviction_item, false).await; - None + entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; + Some(entry.data.clone()) } /// Returns the replaced item if any. diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index b4ec6ac94..ca3f10f3d 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -356,11 +356,6 @@ async fn unref_called_on_replace() -> Result<(), Error> { unreachable!("We are not testing this functionality"); } - async fn touch(&self) -> bool { - // Do nothing. We are not testing this functionality. - true - } - async fn unref(&self) { self.unref_called.store(true, Ordering::Relaxed); }