Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use nativelink_util::store_trait::{
};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio::sync::Semaphore;
use tokio::time::timeout;
use tokio_stream::wrappers::ReadDirStream;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -1367,7 +1368,47 @@ impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
"FilesystemStore"
}

async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
StoreDriver::check_health(Pin::new(self), namespace).await
/// Lightweight probe: `stat()` the `content_path` directory. No
/// write-semaphore / eviction-map contention with production
/// traffic, and bounded so a hung NFS / EBS mount can't wedge the
/// indicator.
async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2);

let content_path = &self.shared_context.content_path;
let stat = tokio::fs::metadata(&content_path);
match timeout(HEALTH_PROBE_TIMEOUT, stat).await {
Ok(Ok(meta)) if meta.is_dir() => {
HealthStatus::new_ok(self, "FilesystemStore::check_health: ok".into())
}
Ok(Ok(_)) => HealthStatus::new_failed(
self,
format!(
"FilesystemStore::check_health: content_path {content_path} is not a directory"
)
.into(),
),
Ok(Err(e)) => {
warn!(
?e,
%content_path,
"FilesystemStore::check_health: stat errored",
);
HealthStatus::new_failed(
self,
format!("FilesystemStore::check_health: stat errored: {e}").into(),
)
}
Err(_) => {
warn!(
%content_path,
timeout_secs = HEALTH_PROBE_TIMEOUT.as_secs(),
"FilesystemStore::check_health: stat timed out",
);
HealthStatus::Timeout {
struct_name: self.struct_name(),
}
}
}
}
}
39 changes: 35 additions & 4 deletions nativelink-store/src/gcs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use core::fmt::Debug;
use core::pin::Pin;
use core::time::Duration;
use std::borrow::Cow;
use std::sync::Arc;

Expand All @@ -32,7 +33,8 @@ use nativelink_util::store_trait::{
RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo,
};
use rand::Rng;
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
use tracing::warn;

use crate::cas_utils::is_zero_digest;
use crate::gcs_client::client::{GcsClient, GcsOperations};
Expand Down Expand Up @@ -92,7 +94,7 @@ where
.unwrap_or(DEFAULT_CONCURRENT_UPLOADS);

let jitter_amt = spec.common.retry.jitter;
let jitter_fn = Arc::new(move |delay: tokio::time::Duration| {
let jitter_fn = Arc::new(move |delay: Duration| {
if jitter_amt == 0.0 {
return delay;
}
Expand Down Expand Up @@ -486,7 +488,36 @@ where
"GcsStore"
}

async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
StoreDriver::check_health(Pin::new(self), namespace).await
/// Lightweight probe: a single `object_exists` against a fixed
/// never-existing path. Shares no resources with production traffic
/// and stays well under the `HealthServer` per-indicator budget.
async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2);

let probe_path = ObjectPath::new(
self.bucket.clone(),
"__nativelink_health_probe__/does-not-exist",
);

let probe = self.client.object_exists(&probe_path);
match timeout(HEALTH_PROBE_TIMEOUT, probe).await {
Ok(Ok(_)) => HealthStatus::new_ok(self, "GcsStore::check_health: ok".into()),
Ok(Err(e)) => {
warn!(?e, "GcsStore::check_health: object_exists errored");
HealthStatus::new_failed(
self,
format!("GcsStore::check_health: object_exists errored: {e}").into(),
)
}
Err(_) => {
warn!(
timeout_secs = HEALTH_PROBE_TIMEOUT.as_secs(),
"GcsStore::check_health: probe timed out",
);
HealthStatus::Timeout {
struct_name: self.struct_name(),
}
}
}
}
}
52 changes: 52 additions & 0 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use core::fmt::{Debug, Formatter};
use core::marker::PhantomData;
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use core::time::Duration;
use std::borrow::Cow;
use std::ffi::{OsStr, OsString};
use std::path::Path;
use std::sync::{Arc, LazyLock};
Expand All @@ -36,6 +37,7 @@ use nativelink_store::filesystem_store::{
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::{DigestInfo, fs, make_temp_path};
use nativelink_util::evicting_map::LenEntry;
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
use nativelink_util::store_trait::{Store, StoreKey, StoreLike, UploadSizeInfo};
use nativelink_util::{background_spawn, spawn};
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
Expand Down Expand Up @@ -1506,6 +1508,56 @@ async fn add_too_early_files() -> Result<(), Error> {
Ok(())
}

#[nativelink_test]
async fn check_health_ok_when_content_path_is_a_directory() -> Result<(), Error> {
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");
fs::create_dir_all(&content_path).await?;
fs::create_dir_all(&temp_path).await?;

let store = FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
content_path: content_path.clone(),
temp_path,
eviction_policy: None,
..Default::default()
})
.await?;

match HealthStatusIndicator::check_health(&*store, Cow::Borrowed("test")).await {
HealthStatus::Ok { .. } => Ok(()),
other => panic!("expected HealthStatus::Ok, got {other:?}"),
}
}

#[nativelink_test]
async fn check_health_failed_when_content_path_is_missing() -> Result<(), Error> {
// Construct the store against a real path, then delete it so the
// stat() inside check_health fails with ENOENT.
let content_path = make_temp_path("content_path");
let temp_path = make_temp_path("temp_path");
fs::create_dir_all(&content_path).await?;
fs::create_dir_all(&temp_path).await?;

let store = FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
content_path: content_path.clone(),
temp_path,
eviction_policy: None,
..Default::default()
})
.await?;

fs::remove_dir_all(&content_path).await?;

match HealthStatusIndicator::check_health(&*store, Cow::Borrowed("test")).await {
HealthStatus::Failed { message, .. } => {
assert!(
message.contains("stat") || message.contains("not a directory"),
"unexpected failure message: {message}",
);
Ok(())
}
other => panic!("expected HealthStatus::Failed, got {other:?}"),
}
/// `get_executable_hardlink_source` must return a private **0o555** inode that
/// is created exactly once: distinct from the read-only **0o444** CAS blob,
/// stable across calls (so callers hardlink-many), and leaving the CAS blob's
Expand Down
49 changes: 42 additions & 7 deletions nativelink-store/tests/gcs_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use nativelink_error::{Code, Error, make_err};
use nativelink_macro::nativelink_test;
use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
use nativelink_store::gcs_client::client::GcsOperations;
use nativelink_store::gcs_client::mocks::{MockGcsOperations, MockRequest};
use nativelink_store::gcs_client::mocks::{FailureMode, MockGcsOperations, MockRequest};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea, but now causing

error: unnecessary qualification
   --> nativelink-store/tests/gcs_store_test.rs:136:27
    |
136 |         .set_failure_mode(nativelink_store::gcs_client::mocks::FailureMode::NotFound)
    |                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
    = note: `-D unused-qualifications` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(unused_qualifications)]`

use nativelink_store::gcs_client::types::{DEFAULT_CONTENT_TYPE, ObjectPath};
use nativelink_store::gcs_store::GcsStore;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::DigestInfo;
use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator};
use nativelink_util::instant_wrapper::MockInstantWrapped;
use nativelink_util::store_trait::{StoreKey, StoreLike, UploadSizeInfo};
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -131,9 +132,7 @@ async fn has_handles_not_found_error() -> Result<(), Error> {

// Set the mock to return a simulated NotFound error
mock_ops.set_should_fail(true);
mock_ops
.set_failure_mode(nativelink_store::gcs_client::mocks::FailureMode::NotFound)
.await;
mock_ops.set_failure_mode(FailureMode::NotFound).await;

let store = create_test_store(mock_ops.clone()).await?;

Expand All @@ -159,9 +158,7 @@ async fn get_part_handles_not_found_error() -> Result<(), Error> {

// Set the mock to return a simulated NotFound error
mock_ops.set_should_fail(true);
mock_ops
.set_failure_mode(nativelink_store::gcs_client::mocks::FailureMode::NotFound)
.await;
mock_ops.set_failure_mode(FailureMode::NotFound).await;

let store = create_test_store(mock_ops.clone()).await?;

Expand Down Expand Up @@ -736,3 +733,41 @@ fn create_object_path(key: &StoreKey) -> ObjectPath {
&format!("{}{}", KEY_PREFIX, key.as_str()),
)
}

#[nativelink_test]
async fn check_health_ok_when_bucket_reachable() -> Result<(), Error> {
let ops = Arc::new(MockGcsOperations::new());
let store = create_test_store(ops.clone()).await?;

match HealthStatusIndicator::check_health(&*store, std::borrow::Cow::Borrowed("test")).await {
HealthStatus::Ok { .. } => {}
other => panic!("expected HealthStatus::Ok, got {other:?}"),
}
// Sanity: the probe issues exactly one object_exists call.
assert_eq!(
ops.get_call_counts()
.object_exists_calls
.load(Ordering::Relaxed),
1
);
Ok(())
}

#[nativelink_test]
async fn check_health_failed_on_object_exists_error() -> Result<(), Error> {
let ops = Arc::new(MockGcsOperations::new());
ops.set_should_fail(true);
ops.set_failure_mode(FailureMode::NetworkError).await;
let store = create_test_store(ops.clone()).await?;

match HealthStatusIndicator::check_health(&*store, std::borrow::Cow::Borrowed("test")).await {
HealthStatus::Failed { message, .. } => {
assert!(
message.contains("object_exists errored"),
"unexpected failure message: {message}",
);
Ok(())
}
other => panic!("expected HealthStatus::Failed, got {other:?}"),
}
}
Loading