Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast slow store directions #1581

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deployment-examples/docker-compose/worker.json5
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
}
}
},
"fast_direction": "get",
"slow": {
"ref_store": {
"name": "GRPC_LOCAL_STORE"
Expand Down
1 change: 1 addition & 0 deletions kubernetes/components/worker/worker.json5
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
}
}
},
"fast_direction": "get",
"slow": {
"ref_store": {
"name": "GRPC_LOCAL_STORE"
Expand Down
30 changes: 30 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,46 @@ pub struct FilesystemSpec {
pub block_size: u64,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum StoreDirection {
/// The store operates normally and all get and put operations are
/// handled by it.
#[default]
Both,
/// Update operations will cause persistence to this store, but Get
/// operations will be ignored.
/// This only makes sense on the fast store as the slow store will
/// never get written to on Get anyway.
Update,
/// Get operations will cause persistence to this store, but Update
/// operations will be ignored.
Get,
/// Operate as a read only store, only really makes sense if there's
/// another way to write to it.
ReadOnly,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FastSlowSpec {
/// Fast store that will be attempted to be contacted before reaching
/// out to the `slow` store.
pub fast: StoreSpec,

/// How to handle the fast store. This can be useful to set to Get for
/// worker nodes such that results are persisted to the slow store only.
#[serde(default)]
pub fast_direction: StoreDirection,

/// If the object does not exist in the `fast` store it will try to
/// get it from this store.
pub slow: StoreSpec,

/// How to handle the slow store. This can be useful if creating a diode
/// and you wish to have an upstream read only store.
#[serde(default)]
pub slow_direction: StoreDirection,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
71 changes: 63 additions & 8 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::{Arc, Weak};

use async_trait::async_trait;
use futures::{join, FutureExt};
use nativelink_config::stores::FastSlowSpec;
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::buf_channel::{
Expand All @@ -45,18 +45,22 @@ use nativelink_util::store_trait::{
pub struct FastSlowStore {
#[metric(group = "fast_store")]
fast_store: Store,
fast_direction: StoreDirection,
#[metric(group = "slow_store")]
slow_store: Store,
slow_direction: StoreDirection,
weak_self: Weak<Self>,
#[metric]
metrics: FastSlowStoreMetrics,
}

impl FastSlowStore {
pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
Arc::new_cyclic(|weak_self| Self {
fast_store,
fast_direction: spec.fast_direction.clone(),
slow_store,
slow_direction: spec.slow_direction.clone(),
weak_self: weak_self.clone(),
metrics: FastSlowStoreMetrics::default(),
})
Expand Down Expand Up @@ -155,12 +159,31 @@ impl StoreDriver for FastSlowStore {
) -> Result<(), Error> {
// If either one of our stores is a noop store, bypass the multiplexing
// and just use the store that is not a noop store.
let slow_store = self.slow_store.inner_store(Some(key.borrow()));
if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
let ignore_slow = self
.slow_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
|| self.slow_direction == StoreDirection::ReadOnly
|| self.slow_direction == StoreDirection::Get;
let ignore_fast = self
.fast_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get;
if ignore_slow && ignore_fast {
// We need to drain the reader to avoid the writer complaining that we dropped
// the connection prematurely.
reader
.drain()
.await
.err_tip(|| "In FastFlowStore::update")?;
return Ok(());
}
if ignore_slow {
return self.fast_store.update(key, reader, size_info).await;
}
let fast_store = self.fast_store.inner_store(Some(key.borrow()));
if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
if ignore_fast {
return self.slow_store.update(key, reader, size_info).await;
}

Expand Down Expand Up @@ -233,7 +256,10 @@ impl StoreDriver for FastSlowStore {
{
if !self
.slow_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
&& self.slow_direction != StoreDirection::ReadOnly
&& self.slow_direction != StoreDirection::Get
{
slow_update_store_with_file(
self.slow_store.as_store_driver_pin(),
Expand All @@ -244,6 +270,11 @@ impl StoreDriver for FastSlowStore {
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
}
if self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get
{
return Ok(Some(file));
}
return self
.fast_store
.update_with_whole_file(key, file, upload_size)
Expand All @@ -254,10 +285,13 @@ impl StoreDriver for FastSlowStore {
.slow_store
.optimized_for(StoreOptimizations::FileUpdates)
{
if !self
let ignore_fast = self
.fast_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
{
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get;
if !ignore_fast {
slow_update_store_with_file(
self.fast_store.as_store_driver_pin(),
key.borrow(),
Expand All @@ -267,6 +301,11 @@ impl StoreDriver for FastSlowStore {
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
}
let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
|| self.slow_direction == StoreDirection::Get;
if ignore_slow {
return Ok(Some(file));
}
return self
.slow_store
.update_with_whole_file(key, file, upload_size)
Expand Down Expand Up @@ -317,6 +356,22 @@ impl StoreDriver for FastSlowStore {
.slow_store_hit_count
.fetch_add(1, Ordering::Acquire);

if self
.fast_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Update
{
self.slow_store
.get_part(key, writer.borrow_mut(), offset, length)
.await?;
self.metrics
.slow_store_downloaded_bytes
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
return Ok(());
}

let send_range = offset..length.map_or(u64::MAX, |length| length + offset);
let mut bytes_received: u64 = 0;

Expand Down
111 changes: 109 additions & 2 deletions nativelink-store/tests/fast_slow_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use bytes::Bytes;
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreSpec};
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_metric::MetricsComponent;
Expand All @@ -35,20 +35,29 @@ use rand::{Rng, SeedableRng};

const MEGABYTE_SZ: usize = 1024 * 1024;

fn make_stores() -> (Store, Store, Store) {
fn make_stores_direction(
fast_direction: StoreDirection,
slow_direction: StoreDirection,
) -> (Store, Store, Store) {
let fast_store = Store::new(MemoryStore::new(&MemorySpec::default()));
let slow_store = Store::new(MemoryStore::new(&MemorySpec::default()));
let fast_slow_store = Store::new(FastSlowStore::new(
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction,
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction,
},
fast_store.clone(),
slow_store.clone(),
));
(fast_slow_store, fast_store, slow_store)
}

fn make_stores() -> (Store, Store, Store) {
make_stores_direction(StoreDirection::default(), StoreDirection::default())
}

fn make_random_data(sz: usize) -> Vec<u8> {
let mut value = vec![0u8; sz];
let mut rng = SmallRng::seed_from_u64(1);
Expand Down Expand Up @@ -331,7 +340,9 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> {
let fast_slow_store = FastSlowStore::new(
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction: StoreDirection::default(),
},
fast_store,
slow_store,
Expand Down Expand Up @@ -372,7 +383,9 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> {
let fast_slow_store = Arc::new(FastSlowStore::new(
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction: StoreDirection::default(),
},
fast_store.clone(),
slow_store,
Expand All @@ -395,7 +408,9 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> {
let slow_store = Store::new(NoopStore::new());
let fast_slow_store_config = FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::noop(NoopSpec::default()),
slow_direction: StoreDirection::default(),
};
let fast_slow_store = Arc::new(FastSlowStore::new(
&fast_slow_store_config,
Expand Down Expand Up @@ -430,3 +445,95 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> {
);
Ok(())
}

#[nativelink_test]
async fn fast_get_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Get, StoreDirection::Both);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_none(),
"Expected data to not be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_some(),
"Expected data in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn fast_readonly_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::ReadOnly, StoreDirection::Both);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_none(),
"Expected data to not be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_some(),
"Expected data in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn slow_readonly_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Both, StoreDirection::ReadOnly);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_some(),
"Expected data to be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_none(),
"Expected data to not be in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn slow_get_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Both, StoreDirection::Get);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_some(),
"Expected data to be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_none(),
"Expected data to not be in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn fast_put_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Update, StoreDirection::Both);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
let _ = fast_slow_store.get_part_unchunked(digest, 0, None).await;
assert!(
fast_store.has(digest).await?.is_none(),
"Expected data to not be in the fast store"
);
Ok(())
}
6 changes: 5 additions & 1 deletion nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use filetime::{set_file_atime, FileTime};
use futures::executor::block_on;
use futures::task::Poll;
use futures::{poll, Future, FutureExt};
use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec};
use nativelink_config::stores::{
FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec,
};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_store::fast_slow_store::FastSlowStore;
Expand Down Expand Up @@ -1333,7 +1335,9 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result<
// Note: The config is not needed for this test, so use dummy data.
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction: StoreDirection::default(),
},
Store::new(
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
Expand Down
Loading
Loading