Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ either = { workspace = true }
etcd-client = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
oneshot = { workspace = true }
parking_lot = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions lib/runtime/src/discovery/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ mod tests {

#[tokio::test]
async fn test_kv_store_discovery_register_endpoint() {
let store = kv::Manager::memory();
let store = kv::Manager::default();
let cancel_token = CancellationToken::new();
let client = KVStoreDiscovery::new(store, cancel_token);

Expand All @@ -625,7 +625,7 @@ mod tests {

#[tokio::test]
async fn test_kv_store_discovery_list() {
let store = kv::Manager::memory();
let store = kv::Manager::default();
let cancel_token = CancellationToken::new();
let client = KVStoreDiscovery::new(store, cancel_token);

Expand Down Expand Up @@ -680,7 +680,7 @@ mod tests {

#[tokio::test]
async fn test_kv_store_discovery_watch() {
let store = kv::Manager::memory();
let store = kv::Manager::default();
let cancel_token = CancellationToken::new();
let client = Arc::new(KVStoreDiscovery::new(store, cancel_token.clone()));

Expand Down
14 changes: 1 addition & 13 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,7 @@ impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (selected_kv_store, nats_config, request_plane) = config.dissolve();

let runtime_clone = runtime.clone();

let store = match selected_kv_store {
kv::Selector::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
// The returned error doesn't show because of a dropped runtime error, so
// log it first.
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
kv::Manager::etcd(etcd_client)
}
kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
kv::Selector::Memory => kv::Manager::memory(),
};
let store = kv::Manager::new(selected_kv_store, Some(runtime.clone()));

let nats_client = match nats_config {
Some(nc) => Some(nc.connect().await?),
Expand Down
98 changes: 75 additions & 23 deletions lib/runtime/src/storage/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use std::time::Duration;
use std::{collections::HashMap, path::PathBuf};
use std::{env, fmt};

use crate::CancellationToken;
use crate::transports::etcd as etcd_transport;
use crate::{CancellationToken, Runtime};
use async_trait::async_trait;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use oneshot;
use percent_encoding::{NON_ALPHANUMERIC, percent_decode_str, percent_encode};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -139,6 +141,47 @@ pub enum Selector {
// Nats not listed because likely we want to remove that impl. It is not currently used and not well tested.
}

impl Selector {
fn build(self, runtime: Option<Runtime>) -> Result<KeyValueStoreEnum, StoreError> {
match self {
Selector::Etcd(opts) => {
if let Some(runtime) = runtime {
let (tx, rx) = oneshot::channel();
runtime.primary().spawn(async move {
let etcd_client = etcd_transport::Client::new(*opts, runtime.clone())
.await
.map_err(StoreError::from);
tx.send(etcd_client).unwrap();
});

// We block our async task a tiny bit here, but not a big deal since we only ever do this once.
Ok(KeyValueStoreEnum::Etcd(EtcdStore::new(
rx.recv()
.map_err(|x| StoreError::from(anyhow::anyhow!(x)))??,
)))
} else {
Err(StoreError::BuildError(anyhow::anyhow!(
"Runtime is required for Etcd selector"
)))
}
}
Selector::File(path) => {
if let Some(runtime) = runtime {
Ok(KeyValueStoreEnum::File(FileStore::new(
runtime.primary_token(),
path,
)))
} else {
Err(StoreError::BuildError(anyhow::anyhow!(
"Runtime is required for File selector"
)))
}
}
Selector::Memory => Ok(KeyValueStoreEnum::Memory(MemoryStore::new())),
}
}
}

impl fmt::Display for Selector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down Expand Up @@ -247,30 +290,31 @@ impl KeyValueStoreEnum {
}

#[derive(Clone)]
pub struct Manager(Arc<KeyValueStoreEnum>);
pub struct Manager {
selector: Selector,
kv_store: Arc<OnceCell<KeyValueStoreEnum>>,
runtime: Option<Runtime>,
}

impl Default for Manager {
fn default() -> Self {
Manager::memory()
Manager::new(Selector::Memory, None)
}
}

impl Manager {
/// In-memory KeyValueStoreManager for testing
pub fn memory() -> Self {
Self::new(KeyValueStoreEnum::Memory(MemoryStore::new()))
}

pub fn etcd(etcd_client: crate::transports::etcd::Client) -> Self {
Self::new(KeyValueStoreEnum::Etcd(EtcdStore::new(etcd_client)))
}

pub fn file<P: Into<PathBuf>>(cancel_token: CancellationToken, root: P) -> Self {
Self::new(KeyValueStoreEnum::File(FileStore::new(cancel_token, root)))
pub fn new(s: Selector, runtime: Option<Runtime>) -> Manager {
Manager {
selector: s,
kv_store: Arc::new(OnceCell::new()),
runtime,
}
}

fn new(s: KeyValueStoreEnum) -> Manager {
Manager(Arc::new(s))
fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> {
let selector = self.selector.clone();
self.kv_store
.get_or_try_init(|| selector.build(self.runtime.clone()))
}

pub async fn get_or_create_bucket(
Expand All @@ -279,26 +323,28 @@ impl Manager {
// auto-delete items older than this
ttl: Option<Duration>,
) -> Result<Box<dyn Bucket>, StoreError> {
self.0.get_or_create_bucket(bucket_name, ttl).await
self.get_kv_store()?
.get_or_create_bucket(bucket_name, ttl)
.await
}

pub async fn get_bucket(
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn Bucket>>, StoreError> {
self.0.get_bucket(bucket_name).await
self.get_kv_store()?.get_bucket(bucket_name).await
}

pub fn connection_id(&self) -> u64 {
self.0.connection_id()
self.get_kv_store().unwrap().connection_id()
}

pub async fn load<T: for<'a> Deserialize<'a>>(
&self,
bucket: &str,
key: &Key,
) -> Result<Option<T>, StoreError> {
let Some(bucket) = self.0.get_bucket(bucket).await? else {
let Some(bucket) = self.get_kv_store()?.get_bucket(bucket).await? else {
// No bucket means no cards
return Ok(None);
};
Expand Down Expand Up @@ -328,7 +374,7 @@ impl Manager {
let watch_task = tokio::spawn(async move {
// Start listening for changes but don't poll this yet
let bucket = self
.0
.get_kv_store()?
.get_or_create_bucket(&bucket_name, bucket_ttl)
.await?;
let mut stream = bucket.watch().await?;
Expand Down Expand Up @@ -373,7 +419,10 @@ impl Manager {
obj: &mut T,
) -> anyhow::Result<StoreOutcome> {
let obj_json = serde_json::to_vec(obj)?;
let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?;
let bucket = self
.get_kv_store()?
.get_or_create_bucket(bucket_name, bucket_ttl)
.await?;

let outcome = bucket.insert(key, obj_json.into(), obj.revision()).await?;

Expand All @@ -388,7 +437,7 @@ impl Manager {
/// Cleanup any temporary state.
/// TODO: Should this be async? Take &mut self?
pub fn shutdown(&self) {
self.0.shutdown()
self.get_kv_store().unwrap().shutdown()
}
}

Expand Down Expand Up @@ -471,6 +520,9 @@ pub enum StoreError {

#[error("Race condition, retry the call")]
Retry,

#[error("Error building key-value store: {0}")]
BuildError(#[from] anyhow::Error),
}

/// A trait allowing to get/set a revision on an object.
Expand Down
Loading