Skip to content

Commit

Permalink
Remove atime references to FilesystemStore
Browse files Browse the repository at this point in the history
FilesystemStore will no longer keep atime of files up-to-date. This
was an expensive operation, since every time any CAS/AC item was
touched, it'd update the atime, which caused a lot of sys calls.

While the system is running this PR has no effect, but when the
program is restarted, items are now inserted in order of atime then
mtime, but it relies on the filesystem to keep atime up-to-date.
This means that .has() calls do not "refresh" the atime on disk.
  • Loading branch information
allada committed Feb 13, 2025
1 parent 79c2357 commit ab7de2a
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 198 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,7 @@ pub enum StoreSpec {
/// Stores the data on the filesystem. This store is designed for
/// local persistent storage. Restarts of this program should restore
/// the previous state, meaning anything uploaded will be persistent
/// as long as the filesystem integrity holds. This store uses the
/// filesystem's `atime` (access time) to hold the last touched time
/// of the file(s).
/// as long as the filesystem integrity holds.
///
/// **Example JSON Config:**
/// ```json
Expand Down
1 change: 0 additions & 1 deletion nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ rust_library(
"@crates//:bytes",
"@crates//:bytes-utils",
"@crates//:const_format",
"@crates//:filetime",
"@crates//:fred",
"@crates//:futures",
"@crates//:hex",
Expand Down
1 change: 0 additions & 1 deletion nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ byteorder = { version = "1.5.0", default-features = false }
bytes = { version = "1.9.0", default-features = false }
bytes-utils = { version = "0.1.4", default-features = false }
const_format = { version = "0.2.34", default-features = false }
filetime = "0.2.25"
fred = { version = "10.0.3", default-features = false, features = [
"i-std",
"i-scripts",
Expand Down
65 changes: 21 additions & 44 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use std::time::SystemTime;
use async_lock::RwLock;
use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{Future, TryFutureExt};
use nativelink_config::stores::FilesystemSpec;
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::background_spawn;
use nativelink_util::buf_channel::{
make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
};
Expand All @@ -38,7 +38,6 @@ use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthS
use nativelink_util::store_trait::{
StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
};
use nativelink_util::{background_spawn, spawn_blocking};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{event, Level};
Expand Down Expand Up @@ -338,33 +337,6 @@ impl LenEntry for FileEntryImpl {
self.data_size == 0
}

#[inline]
async fn touch(&self) -> bool {
let result = self
.get_file_path_locked(move |full_content_path| async move {
let full_content_path = full_content_path.clone();
spawn_blocking!("filesystem_touch_set_mtime", move || {
set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
format!("Failed to touch file in filesystem store {full_content_path:?}")
})
})
.await
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to change atime of file due to spawn failing {:?}",
e
)
})?
})
.await;
if let Err(err) = result {
event!(Level::ERROR, ?err, "Failed to touch file",);
return false;
}
true
}

// unref() only triggers when an item is removed from the eviction_map. It is possible
// that another place in code has a reference to `FileEntryImpl` and may later read the
// file. To support this edge case, we first move the file to a temp file and point
Expand Down Expand Up @@ -499,20 +471,13 @@ async fn add_files_to_cache<Fe: FileEntry>(
// We need to filter out folders - we do not want to try to cache the s and d folders.
let is_file =
metadata.is_file() || !(file_name == STR_FOLDER || file_name == DIGEST_FOLDER);
let atime = match metadata.accessed() {
Ok(atime) => atime,
Err(err) => {
panic!(
"{}{}{} : {} {:?}",
"It appears this filesystem does not support access time. ",
"Please configure this program to run on a drive that supports ",
"atime",
file_name,
err
);
}
};

// Using access time is not perfect, but better than random. We do not update the
// atime when a file is actually "touched", we rely on whatever the filesystem does
// when we read the file (usually update on read).
let atime = metadata
.accessed()
.or_else(|_| metadata.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
Result::<(String, SystemTime, u64, bool), Error>::Ok((
file_name,
atime,
Expand Down Expand Up @@ -966,7 +931,19 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
let mut temp_file = entry.read_file_part(offset, read_limit).await?;
let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
// If the file is not found, we need to remove it from the eviction map.
if err.code == Code::NotFound {
event!(
Level::ERROR,
?err,
?key,
"Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
);
self.evicting_map.remove(&key).await;
}
Err(err)
}).await?;

loop {
let mut buf = BytesMut::with_capacity(self.read_buffer_size);
Expand Down
113 changes: 5 additions & 108 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use std::marker::PhantomData;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, LazyLock};
use std::time::{Duration, SystemTime};
use std::time::Duration;

use async_lock::RwLock;
use bytes::Bytes;
use filetime::{set_file_atime, FileTime};
use futures::executor::block_on;
use futures::task::Poll;
use futures::{poll, Future, FutureExt};
Expand Down Expand Up @@ -142,10 +141,6 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> LenEntry for TestFileEntry<H
self.inner.as_ref().unwrap().is_empty()
}

async fn touch(&self) -> bool {
self.inner.as_ref().unwrap().touch().await
}

async fn unref(&self) {
Hooks::on_unref(self);
self.inner.as_ref().unwrap().unref().await;
Expand Down Expand Up @@ -570,98 +565,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
check_temp_empty(&temp_path).await
}

#[nativelink_test]
async fn atime_updates_on_get_part_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
})
.await?,
);
// Insert data into store.
store.update_oneshot(digest1, VALUE1.into()).await?;

let file_entry = store.get_file_entry_for_digest(&digest1).await?;
file_entry
.get_file_path_locked(move |path| async move {
// Set atime to along time ago.
set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?;

// Check to ensure it was set to zero from previous command.
assert_eq!(
fs::metadata(&path).await?.accessed()?,
SystemTime::UNIX_EPOCH
);
Ok(())
})
.await?;

// Now touch digest1.
let data = store.get_part_unchunked(digest1, 0, None).await?;
assert_eq!(data, VALUE1.as_bytes());

file_entry
.get_file_path_locked(move |path| async move {
// Ensure it was updated.
assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH);
Ok(())
})
.await?;

Ok(())
}

#[nativelink_test]
async fn eviction_drops_file_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
})
.await?,
);
// Insert data into store.
store.update_oneshot(digest1, VALUE1.into()).await?;

let file_entry = store.get_file_entry_for_digest(&digest1).await?;
file_entry
.get_file_path_locked(move |path| async move {
// Set atime to along time ago.
set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?;

// Check to ensure it was set to zero from previous command.
assert_eq!(
fs::metadata(&path).await?.accessed()?,
SystemTime::UNIX_EPOCH
);
Ok(())
})
.await?;

// Now touch digest1.
let data = store.get_part_unchunked(digest1, 0, None).await?;
assert_eq!(data, VALUE1.as_bytes());

file_entry
.get_file_path_locked(move |path| async move {
// Ensure it was updated.
assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH);
Ok(())
})
.await?;

Ok(())
}

// Test to ensure that if we are holding a reference to `FileEntry` and the contents are
// replaced, the `FileEntry` continues to use the old data.
// `FileEntry` file contents should be immutable for the lifetime of the object.
Expand Down Expand Up @@ -1150,11 +1053,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}"));
std::fs::remove_file(stored_file_path)?;

let digest_result = store
.has(digest)
.await
.err_tip(|| "Failed to execute has")?;
assert!(digest_result.is_none());
let get_part_res = store.get_part_unchunked(digest, 0, None).await;
assert_eq!(get_part_res.unwrap_err().code, Code::NotFound);

// Repeat with a string typed key.

Expand All @@ -1168,11 +1068,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}"));
std::fs::remove_file(stored_file_path)?;

let string_result = store
.has(string_key)
.await
.err_tip(|| "Failed to execute has")?;
assert!(string_result.is_none());
let string_digest_get_part_res = store.get_part_unchunked(string_key, 0, None).await;
assert_eq!(string_digest_get_part_res.unwrap_err().code, Code::NotFound);

Ok(())
}
Expand Down
45 changes: 10 additions & 35 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ pub trait LenEntry: 'static {
/// Returns `true` if `self` has zero length.
fn is_empty(&self) -> bool;

/// Called when an entry is touched. On failure, will remove the entry
/// from the map.
#[inline]
fn touch(&self) -> impl Future<Output = bool> + Send {
std::future::ready(true)
}

/// This will be called when object is removed from map.
/// Note: There may still be a reference to it held somewhere else, which
/// is why it can't be mutable. This is a good place to mark the item
Expand Down Expand Up @@ -86,11 +79,6 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
T::is_empty(self.as_ref())
}

#[inline]
async fn touch(&self) -> bool {
self.as_ref().touch().await
}

#[inline]
async fn unref(&self) {
self.as_ref().unref().await;
Expand Down Expand Up @@ -347,27 +335,21 @@ where
};
match maybe_entry {
Some(entry) => {
// Since we are not inserting anythign we don't need to evict based
// on the size of the store.
// Note: We need to check eviction because the item might be expired
// based on the current time. In such case, we remove the item while
// we are here.
let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX);
if !should_evict && peek {
*result = Some(entry.data.len());
} else if !should_evict && entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
*result = Some(entry.data.len());
} else {
if self.should_evict(lru_len, entry, 0, u64::MAX) {
*result = None;
if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) {
if should_evict {
event!(Level::INFO, ?key, "Item expired, evicting");
} else {
event!(Level::INFO, ?key, "Touch failed, evicting");
}
event!(Level::INFO, ?key, "Item expired, evicting");
state.remove(key.borrow(), &eviction_item, false).await;
}
} else {
if !peek {
entry.seconds_since_anchor =
self.anchor_time.elapsed().as_secs() as i32;
}
*result = Some(entry.data.len());
}
}
None => *result = None,
Expand All @@ -385,15 +367,8 @@ where

let entry = state.lru.get_mut(key.borrow())?;

if entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
return Some(entry.data.clone());
}

let (key, eviction_item) = state.lru.pop_entry(key.borrow())?;
event!(Level::INFO, ?key, "Touch failed, evicting");
state.remove(key.borrow(), &eviction_item, false).await;
None
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
Some(entry.data.clone())
}

/// Returns the replaced item if any.
Expand Down
5 changes: 0 additions & 5 deletions nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,6 @@ async fn unref_called_on_replace() -> Result<(), Error> {
unreachable!("We are not testing this functionality");
}

async fn touch(&self) -> bool {
// Do nothing. We are not testing this functionality.
true
}

async fn unref(&self) {
self.unref_called.store(true, Ordering::Relaxed);
}
Expand Down

0 comments on commit ab7de2a

Please sign in to comment.