diff --git a/deployment-examples/docker-compose/worker.json5 b/deployment-examples/docker-compose/worker.json5 index 57cc5ad1d..658dd160f 100644 --- a/deployment-examples/docker-compose/worker.json5 +++ b/deployment-examples/docker-compose/worker.json5 @@ -33,6 +33,7 @@ } } }, + "fast_direction": "get", "slow": { "ref_store": { "name": "GRPC_LOCAL_STORE" diff --git a/kubernetes/components/worker/worker.json5 b/kubernetes/components/worker/worker.json5 index 3a452b1aa..4cd7883f2 100644 --- a/kubernetes/components/worker/worker.json5 +++ b/kubernetes/components/worker/worker.json5 @@ -32,6 +32,7 @@ } } }, + "fast_direction": "get", "slow": { "ref_store": { "name": "GRPC_LOCAL_STORE" diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..f19828cff 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -514,6 +514,26 @@ pub struct FilesystemSpec { pub block_size: u64, } +#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum StoreDirection { + /// The store operates normally and all get and put operations are + /// handled by it. + #[default] + Both, + /// Update operations will cause persistence to this store, but Get + /// operations will be ignored. + /// This only makes sense on the fast store as the slow store will + /// never get written to on Get anyway. + Update, + /// Get operations will cause persistence to this store, but Update + /// operations will be ignored. + Get, + /// Operate as a read only store, only really makes sense if there's + /// another way to write to it. + ReadOnly, +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct FastSlowSpec { @@ -521,9 +541,19 @@ pub struct FastSlowSpec { /// out to the `slow` store. pub fast: StoreSpec, + /// How to handle the fast store. This can be useful to set to Get for + /// worker nodes such that results are persisted to the slow store only. + #[serde(default)] + pub fast_direction: StoreDirection, + /// If the object does not exist in the `fast` store it will try to /// get it from this store. pub slow: StoreSpec, + + /// How to handle the slow store. This can be useful if creating a diode + /// and you wish to have an upstream read only store. + #[serde(default)] + pub slow_direction: StoreDirection, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 60789e3a5..cc73807c7 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, Weak}; use async_trait::async_trait; use futures::{join, FutureExt}; -use nativelink_config::stores::FastSlowSpec; +use nativelink_config::stores::{FastSlowSpec, StoreDirection}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{ @@ -45,18 +45,22 @@ use nativelink_util::store_trait::{ pub struct FastSlowStore { #[metric(group = "fast_store")] fast_store: Store, + fast_direction: StoreDirection, #[metric(group = "slow_store")] slow_store: Store, + slow_direction: StoreDirection, weak_self: Weak, #[metric] metrics: FastSlowStoreMetrics, } impl FastSlowStore { - pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { + pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { Arc::new_cyclic(|weak_self| Self { fast_store, + fast_direction: spec.fast_direction.clone(), slow_store, + slow_direction: spec.slow_direction.clone(), weak_self: weak_self.clone(), metrics: FastSlowStoreMetrics::default(), }) @@ -155,12 +159,31 @@ impl StoreDriver for FastSlowStore { ) -> Result<(), Error> { // If either one of our stores is a noop store, bypass the multiplexing // and just use the store that is not a noop store. - let slow_store = self.slow_store.inner_store(Some(key.borrow())); - if slow_store.optimized_for(StoreOptimizations::NoopUpdates) { + let ignore_slow = self + .slow_store + .inner_store(Some(key.borrow())) + .optimized_for(StoreOptimizations::NoopUpdates) + || self.slow_direction == StoreDirection::ReadOnly + || self.slow_direction == StoreDirection::Get; + let ignore_fast = self + .fast_store + .inner_store(Some(key.borrow())) + .optimized_for(StoreOptimizations::NoopUpdates) + || self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Get; + if ignore_slow && ignore_fast { + // We need to drain the reader to avoid the writer complaining that we dropped + // the connection prematurely. + reader + .drain() + .await + .err_tip(|| "In FastFlowStore::update")?; + return Ok(()); + } + if ignore_slow { return self.fast_store.update(key, reader, size_info).await; } - let fast_store = self.fast_store.inner_store(Some(key.borrow())); - if fast_store.optimized_for(StoreOptimizations::NoopUpdates) { + if ignore_fast { return self.slow_store.update(key, reader, size_info).await; } @@ -233,7 +256,10 @@ impl StoreDriver for FastSlowStore { { if !self .slow_store + .inner_store(Some(key.borrow())) .optimized_for(StoreOptimizations::NoopUpdates) + && self.slow_direction != StoreDirection::ReadOnly + && self.slow_direction != StoreDirection::Get { slow_update_store_with_file( self.slow_store.as_store_driver_pin(), @@ -244,6 +270,11 @@ impl StoreDriver for FastSlowStore { .await .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?; } + if self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Get + { + return Ok(Some(file)); + } return self .fast_store .update_with_whole_file(key, file, upload_size) @@ -254,10 +285,13 @@ impl StoreDriver for FastSlowStore { .slow_store .optimized_for(StoreOptimizations::FileUpdates) { - if !self + let ignore_fast = self .fast_store + .inner_store(Some(key.borrow())) .optimized_for(StoreOptimizations::NoopUpdates) - { + || self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Get; + if !ignore_fast { slow_update_store_with_file( self.fast_store.as_store_driver_pin(), key.borrow(), @@ -267,6 +301,11 @@ impl StoreDriver for FastSlowStore { .await .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; } + let ignore_slow = self.slow_direction == StoreDirection::ReadOnly + || self.slow_direction == StoreDirection::Get; + if ignore_slow { + return Ok(Some(file)); + } return self .slow_store .update_with_whole_file(key, file, upload_size) @@ -317,6 +356,22 @@ impl StoreDriver for FastSlowStore { .slow_store_hit_count .fetch_add(1, Ordering::Acquire); + if self + .fast_store + .inner_store(Some(key.borrow())) + .optimized_for(StoreOptimizations::NoopUpdates) + || self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Update + { + self.slow_store + .get_part(key, writer.borrow_mut(), offset, length) + .await?; + self.metrics + .slow_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + let send_range = offset..length.map_or(u64::MAX, |length| length + offset); let mut bytes_received: u64 = 0; diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 74b65f749..135e5c863 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::Bytes; -use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreSpec}; +use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_metric::MetricsComponent; @@ -35,13 +35,18 @@ use rand::{Rng, SeedableRng}; const MEGABYTE_SZ: usize = 1024 * 1024; -fn make_stores() -> (Store, Store, Store) { +fn make_stores_direction( + fast_direction: StoreDirection, + slow_direction: StoreDirection, +) -> (Store, Store, Store) { let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); let slow_store = Store::new(MemoryStore::new(&MemorySpec::default())); let fast_slow_store = Store::new(FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction, slow: StoreSpec::memory(MemorySpec::default()), + slow_direction, }, fast_store.clone(), slow_store.clone(), @@ -49,6 +54,10 @@ fn make_stores() -> (Store, Store, Store) { (fast_slow_store, fast_store, slow_store) } +fn make_stores() -> (Store, Store, Store) { + make_stores_direction(StoreDirection::default(), StoreDirection::default()) +} + fn make_random_data(sz: usize) -> Vec { let mut value = vec![0u8; sz]; let mut rng = SmallRng::seed_from_u64(1); @@ -331,7 +340,9 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { let fast_slow_store = FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, fast_store, slow_store, @@ -372,7 +383,9 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> { let fast_slow_store = Arc::new(FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, fast_store.clone(), slow_store, @@ -395,7 +408,9 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { let slow_store = Store::new(NoopStore::new()); let fast_slow_store_config = FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::noop(NoopSpec::default()), + slow_direction: StoreDirection::default(), }; let fast_slow_store = Arc::new(FastSlowStore::new( &fast_slow_store_config, @@ -430,3 +445,95 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { ); Ok(()) } + +#[nativelink_test] +async fn fast_get_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Get, StoreDirection::Both); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_none(), + "Expected data to not be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_some(), + "Expected data in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn fast_readonly_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::ReadOnly, StoreDirection::Both); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_none(), + "Expected data to not be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_some(), + "Expected data in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn slow_readonly_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Both, StoreDirection::ReadOnly); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_some(), + "Expected data to be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_none(), + "Expected data to not be in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn slow_get_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Both, StoreDirection::Get); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_some(), + "Expected data to be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_none(), + "Expected data to not be in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn fast_put_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Update, StoreDirection::Both); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + let _ = fast_slow_store.get_part_unchunked(digest, 0, None).await; + assert!( + fast_store.has(digest).await?.is_none(), + "Expected data to not be in the fast store" + ); + Ok(()) +} diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index f02b32e82..130bc9bce 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -27,7 +27,9 @@ use filetime::{set_file_atime, FileTime}; use futures::executor::block_on; use futures::task::Poll; use futures::{poll, Future, FutureExt}; -use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{ + FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec, +}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; @@ -1333,7 +1335,9 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result< // Note: The config is not needed for this test, so use dummy data. &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, Store::new( FilesystemStore::::new(&FilesystemSpec { diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 7d0350c94..f6ea26926 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -31,7 +31,9 @@ mod utils { use hyper::body::Frame; use nativelink_config::cas_server::{LocalWorkerConfig, WorkerProperty}; -use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{ + FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec, +}; use nativelink_error::{make_err, make_input_err, Code, Error}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; @@ -416,7 +418,9 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Box::new(&FilesystemSpec { @@ -456,7 +460,9 @@ async fn new_local_worker_removes_work_directory_before_start_test( &FastSlowSpec { // Note: These are not needed for this test, so we put dummy memory stores here. fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, Store::new( ::new(&FilesystemSpec { diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 647e38e2c..0dff177e8 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -27,7 +27,9 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use nativelink_config::cas_server::EnvironmentSource; -use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{ + FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec, +}; use nativelink_error::{make_input_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable; @@ -103,7 +105,9 @@ async fn setup_stores() -> Result< let cas_store = FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::filesystem(fast_config), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(slow_config), + slow_direction: StoreDirection::default(), }, Store::new(fast_store.clone()), Store::new(slow_store.clone()),