From 194e7e4de618b938925c41a9ab9e6191d4155129 Mon Sep 17 00:00:00 2001 From: Chris Staite Date: Sat, 8 Feb 2025 08:54:19 +0000 Subject: [PATCH] Fast slow store directions There are multiple use cases where we don't want a fast-slow store to persist to one of the stores in some direction. For example, worker nodes do not want to store build results on the local filesystem, just with the upstream CAS. Another case would be the re-use of prod action cache in a dev environment, but not vice-versa. This PR introduces options to the fast-slow store which default to the existing behaviour, but allows customisation of each side of the fast slow store to either persist in the case or get operations, put operations or to make them read only. Fixes #1577 --- .../docker-compose/worker.json5 | 1 + kubernetes/components/worker/worker.json5 | 1 + nativelink-config/src/stores.rs | 30 +++++ nativelink-store/src/fast_slow_store.rs | 73 ++++++++++-- .../tests/fast_slow_store_test.rs | 111 +++++++++++++++++- .../tests/filesystem_store_test.rs | 6 +- nativelink-worker/tests/local_worker_test.rs | 8 +- .../tests/running_actions_manager_test.rs | 6 +- 8 files changed, 223 insertions(+), 13 deletions(-) diff --git a/deployment-examples/docker-compose/worker.json5 b/deployment-examples/docker-compose/worker.json5 index 57cc5ad1d..658dd160f 100644 --- a/deployment-examples/docker-compose/worker.json5 +++ b/deployment-examples/docker-compose/worker.json5 @@ -33,6 +33,7 @@ } } }, + "fast_direction": "get", "slow": { "ref_store": { "name": "GRPC_LOCAL_STORE" diff --git a/kubernetes/components/worker/worker.json5 b/kubernetes/components/worker/worker.json5 index 3a452b1aa..4cd7883f2 100644 --- a/kubernetes/components/worker/worker.json5 +++ b/kubernetes/components/worker/worker.json5 @@ -32,6 +32,7 @@ } } }, + "fast_direction": "get", "slow": { "ref_store": { "name": "GRPC_LOCAL_STORE" diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..f19828cff 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -514,6 +514,26 @@ pub struct FilesystemSpec { pub block_size: u64, } +#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum StoreDirection { + /// The store operates normally and all get and put operations are + /// handled by it. + #[default] + Both, + /// Update operations will cause persistence to this store, but Get + /// operations will be ignored. + /// This only makes sense on the fast store as the slow store will + /// never get written to on Get anyway. + Update, + /// Get operations will cause persistence to this store, but Update + /// operations will be ignored. + Get, + /// Operate as a read only store, only really makes sense if there's + /// another way to write to it. + ReadOnly, +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct FastSlowSpec { @@ -521,9 +541,19 @@ pub struct FastSlowSpec { /// out to the `slow` store. pub fast: StoreSpec, + /// How to handle the fast store. This can be useful to set to Get for + /// worker nodes such that results are persisted to the slow store only. + #[serde(default)] + pub fast_direction: StoreDirection, + /// If the object does not exist in the `fast` store it will try to /// get it from this store. pub slow: StoreSpec, + + /// How to handle the slow store. This can be useful if creating a diode + /// and you wish to have an upstream read only store. + #[serde(default)] + pub slow_direction: StoreDirection, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 60789e3a5..b47c0a59a 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, Weak}; use async_trait::async_trait; use futures::{join, FutureExt}; -use nativelink_config::stores::FastSlowSpec; +use nativelink_config::stores::{FastSlowSpec, StoreDirection}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{ @@ -45,18 +45,22 @@ use nativelink_util::store_trait::{ pub struct FastSlowStore { #[metric(group = "fast_store")] fast_store: Store, + fast_direction: StoreDirection, #[metric(group = "slow_store")] slow_store: Store, + slow_direction: StoreDirection, weak_self: Weak, #[metric] metrics: FastSlowStoreMetrics, } impl FastSlowStore { - pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { + pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { Arc::new_cyclic(|weak_self| Self { fast_store, + fast_direction: spec.fast_direction.clone(), slow_store, + slow_direction: spec.slow_direction.clone(), weak_self: weak_self.clone(), metrics: FastSlowStoreMetrics::default(), }) @@ -155,12 +159,31 @@ impl StoreDriver for FastSlowStore { ) -> Result<(), Error> { // If either one of our stores is a noop store, bypass the multiplexing // and just use the store that is not a noop store. - let slow_store = self.slow_store.inner_store(Some(key.borrow())); - if slow_store.optimized_for(StoreOptimizations::NoopUpdates) { + let ignore_slow = self + .slow_store + .inner_store(Some(key.borrow())) + .optimized_for(StoreOptimizations::NoopUpdates) + || self.slow_direction == StoreDirection::ReadOnly + || self.slow_direction == StoreDirection::Get; + let ignore_fast = self + .fast_store + .inner_store(Some(key.borrow())) + .optimized_for(StoreOptimizations::NoopUpdates) + || self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Get; + if ignore_slow && ignore_fast { + // We need to drain the reader to avoid the writer complaining that we dropped + // the connection prematurely. + reader + .drain() + .await + .err_tip(|| "In FastFlowStore::update")?; + return Ok(()); + } + if ignore_slow { return self.fast_store.update(key, reader, size_info).await; } - let fast_store = self.fast_store.inner_store(Some(key.borrow())); - if fast_store.optimized_for(StoreOptimizations::NoopUpdates) { + if ignore_fast { return self.slow_store.update(key, reader, size_info).await; } @@ -229,11 +252,15 @@ impl StoreDriver for FastSlowStore { ) -> Result, Error> { if self .fast_store + .inner_store(Some(key.borrow())) .optimized_for(StoreOptimizations::FileUpdates) { if !self .slow_store + .inner_store(Some(key.borrow())) .optimized_for(StoreOptimizations::NoopUpdates) + && self.slow_direction != StoreDirection::ReadOnly + && self.slow_direction != StoreDirection::Get { slow_update_store_with_file( self.slow_store.as_store_driver_pin(), @@ -244,6 +271,11 @@ impl StoreDriver for FastSlowStore { .await .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?; } + if self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Get + { + return Ok(Some(file)); + } return self .fast_store .update_with_whole_file(key, file, upload_size) @@ -252,12 +284,16 @@ impl StoreDriver for FastSlowStore { if self .slow_store + .inner_store(Some(key.borrow())) .optimized_for(StoreOptimizations::FileUpdates) { - if !self + let ignore_fast = self .fast_store + .inner_store(Some(key.borrow())) .optimized_for(StoreOptimizations::NoopUpdates) - { + || self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Get; + if !ignore_fast { slow_update_store_with_file( self.fast_store.as_store_driver_pin(), key.borrow(), @@ -267,6 +303,11 @@ impl StoreDriver for FastSlowStore { .await .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; } + let ignore_slow = self.slow_direction == StoreDirection::ReadOnly + || self.slow_direction == StoreDirection::Get; + if ignore_slow { + return Ok(Some(file)); + } return self .slow_store .update_with_whole_file(key, file, upload_size) @@ -317,6 +358,22 @@ impl StoreDriver for FastSlowStore { .slow_store_hit_count .fetch_add(1, Ordering::Acquire); + if self + .fast_store + .inner_store(Some(key.borrow())) + .optimized_for(StoreOptimizations::NoopUpdates) + || self.fast_direction == StoreDirection::ReadOnly + || self.fast_direction == StoreDirection::Update + { + self.slow_store + .get_part(key, writer.borrow_mut(), offset, length) + .await?; + self.metrics + .slow_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + let send_range = offset..length.map_or(u64::MAX, |length| length + offset); let mut bytes_received: u64 = 0; diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 74b65f749..135e5c863 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::Bytes; -use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreSpec}; +use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_metric::MetricsComponent; @@ -35,13 +35,18 @@ use rand::{Rng, SeedableRng}; const MEGABYTE_SZ: usize = 1024 * 1024; -fn make_stores() -> (Store, Store, Store) { +fn make_stores_direction( + fast_direction: StoreDirection, + slow_direction: StoreDirection, +) -> (Store, Store, Store) { let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); let slow_store = Store::new(MemoryStore::new(&MemorySpec::default())); let fast_slow_store = Store::new(FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction, slow: StoreSpec::memory(MemorySpec::default()), + slow_direction, }, fast_store.clone(), slow_store.clone(), @@ -49,6 +54,10 @@ fn make_stores() -> (Store, Store, Store) { (fast_slow_store, fast_store, slow_store) } +fn make_stores() -> (Store, Store, Store) { + make_stores_direction(StoreDirection::default(), StoreDirection::default()) +} + fn make_random_data(sz: usize) -> Vec { let mut value = vec![0u8; sz]; let mut rng = SmallRng::seed_from_u64(1); @@ -331,7 +340,9 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { let fast_slow_store = FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, fast_store, slow_store, @@ -372,7 +383,9 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> { let fast_slow_store = Arc::new(FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, fast_store.clone(), slow_store, @@ -395,7 +408,9 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { let slow_store = Store::new(NoopStore::new()); let fast_slow_store_config = FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::noop(NoopSpec::default()), + slow_direction: StoreDirection::default(), }; let fast_slow_store = Arc::new(FastSlowStore::new( &fast_slow_store_config, @@ -430,3 +445,95 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { ); Ok(()) } + +#[nativelink_test] +async fn fast_get_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Get, StoreDirection::Both); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_none(), + "Expected data to not be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_some(), + "Expected data in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn fast_readonly_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::ReadOnly, StoreDirection::Both); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_none(), + "Expected data to not be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_some(), + "Expected data in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn slow_readonly_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Both, StoreDirection::ReadOnly); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_some(), + "Expected data to be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_none(), + "Expected data to not be in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn slow_get_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Both, StoreDirection::Get); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + fast_slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + assert!( + fast_store.has(digest).await?.is_some(), + "Expected data to be in the fast store" + ); + assert!( + slow_store.has(digest).await?.is_none(), + "Expected data to not be in the slow store" + ); + Ok(()) +} + +#[nativelink_test] +async fn fast_put_only_not_updated() -> Result<(), Error> { + let (fast_slow_store, fast_store, slow_store) = + make_stores_direction(StoreDirection::Update, StoreDirection::Both); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); + slow_store + .update_oneshot(digest, make_random_data(100).into()) + .await?; + let _ = fast_slow_store.get_part_unchunked(digest, 0, None).await; + assert!( + fast_store.has(digest).await?.is_none(), + "Expected data to not be in the fast store" + ); + Ok(()) +} diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index f02b32e82..130bc9bce 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -27,7 +27,9 @@ use filetime::{set_file_atime, FileTime}; use futures::executor::block_on; use futures::task::Poll; use futures::{poll, Future, FutureExt}; -use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{ + FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec, +}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; @@ -1333,7 +1335,9 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result< // Note: The config is not needed for this test, so use dummy data. &FastSlowSpec { fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, Store::new( FilesystemStore::::new(&FilesystemSpec { diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 7d0350c94..f6ea26926 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -31,7 +31,9 @@ mod utils { use hyper::body::Frame; use nativelink_config::cas_server::{LocalWorkerConfig, WorkerProperty}; -use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{ + FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec, +}; use nativelink_error::{make_err, make_input_err, Code, Error}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; @@ -416,7 +418,9 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Box::new(&FilesystemSpec { @@ -456,7 +460,9 @@ async fn new_local_worker_removes_work_directory_before_start_test( &FastSlowSpec { // Note: These are not needed for this test, so we put dummy memory stores here. fast: StoreSpec::memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(MemorySpec::default()), + slow_direction: StoreDirection::default(), }, Store::new( ::new(&FilesystemSpec { diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 647e38e2c..0dff177e8 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -27,7 +27,9 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use nativelink_config::cas_server::EnvironmentSource; -use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; +use nativelink_config::stores::{ + FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec, +}; use nativelink_error::{make_input_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable; @@ -103,7 +105,9 @@ async fn setup_stores() -> Result< let cas_store = FastSlowStore::new( &FastSlowSpec { fast: StoreSpec::filesystem(fast_config), + fast_direction: StoreDirection::default(), slow: StoreSpec::memory(slow_config), + slow_direction: StoreDirection::default(), }, Store::new(fast_store.clone()), Store::new(slow_store.clone()),