From 7f7a0c8eeaafc80c20e6057843eff26154edbf4f Mon Sep 17 00:00:00 2001 From: jthomson04 Date: Mon, 26 Jan 2026 12:34:28 -0800 Subject: [PATCH 1/6] Defer kv store creation Signed-off-by: jthomson04 --- lib/runtime/Cargo.toml | 1 + lib/runtime/src/distributed.rs | 16 +------ lib/runtime/src/storage/kv.rs | 79 +++++++++++++++++++++++----------- 3 files changed, 58 insertions(+), 38 deletions(-) diff --git a/lib/runtime/Cargo.toml b/lib/runtime/Cargo.toml index b84d9d6cc6f..bb2fb43a596 100644 --- a/lib/runtime/Cargo.toml +++ b/lib/runtime/Cargo.toml @@ -39,6 +39,7 @@ either = { workspace = true } etcd-client = { workspace = true } futures = { workspace = true } humantime = { workspace = true } +oneshot = { workspace = true } parking_lot = { workspace = true } prometheus = { workspace = true } rand = { workspace = true } diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index fa22165c0dc..69bfc1dd6a1 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -103,19 +103,7 @@ impl DistributedRuntime { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result { let (selected_kv_store, nats_config, request_plane) = config.dissolve(); - let runtime_clone = runtime.clone(); - - let store = match selected_kv_store { - kv::Selector::Etcd(etcd_config) => { - let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err| - // The returned error doesn't show because of a dropped runtime error, so - // log it first. - tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?; - kv::Manager::etcd(etcd_client) - } - kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root), - kv::Selector::Memory => kv::Manager::memory(), - }; + let store = kv::Manager::new(selected_kv_store, Some(runtime.clone())); let nats_client = match nats_config { Some(nc) => Some(nc.connect().await?), @@ -577,7 +565,7 @@ impl DistributedConfig { let nats_enabled = request_plane.is_nats() || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok(); DistributedConfig { - store_backend: kv::Selector::Etcd(Box::new(etcd_config)), + store_backend: kv::Selector::Etcd(etcd_config), nats_config: if nats_enabled { Some(nats::ClientOptions::default()) } else { diff --git a/lib/runtime/src/storage/kv.rs b/lib/runtime/src/storage/kv.rs index 695474f67bd..4bff738a1b1 100644 --- a/lib/runtime/src/storage/kv.rs +++ b/lib/runtime/src/storage/kv.rs @@ -12,9 +12,11 @@ use std::time::Duration; use std::{collections::HashMap, path::PathBuf}; use std::{env, fmt}; -use crate::CancellationToken; +use crate::{CancellationToken, Runtime}; use crate::transports::etcd as etcd_transport; use async_trait::async_trait; +use once_cell::sync::OnceCell; +use oneshot; use futures::StreamExt; use percent_encoding::{NON_ALPHANUMERIC, percent_decode_str, percent_encode}; use serde::{Deserialize, Serialize}; @@ -132,13 +134,42 @@ pub trait Store: Send + Sync { #[derive(Clone, Debug, Default)] pub enum Selector { // Box it because it is significantly bigger than the other variants - Etcd(Box), + Etcd(etcd_transport::ClientOptions), File(PathBuf), #[default] Memory, // Nats not listed because likely we want to remove that impl. It is not currently used and not well tested. } +impl Selector { + fn build(self, runtime: Option) -> Result { + match self { + Selector::Etcd(opts) => { + if let Some(runtime) = runtime { + let (tx, rx) = oneshot::channel(); + runtime.primary().spawn(async move { + let etcd_client = etcd_transport::Client::new(opts.clone(), runtime.clone()).await.map_err(StoreError::from); + tx.send(etcd_client).unwrap(); + }); + + // We block our async task a tiny bit here, but not a big deal since we only ever do this once. + Ok(KeyValueStoreEnum::Etcd(EtcdStore::new(rx.recv().map_err(|x| StoreError::from(anyhow::anyhow!(x)))??))) + } else { + Err(StoreError::BuildError(anyhow::anyhow!("Runtime is required for Etcd selector"))) + } + } + Selector::File(path) => { + if let Some(runtime) = runtime { + Ok(KeyValueStoreEnum::File(FileStore::new(runtime.primary_token(), path))) + } else { + Err(StoreError::BuildError(anyhow::anyhow!("Runtime is required for File selector"))) + } + }, + Selector::Memory => Ok(KeyValueStoreEnum::Memory(MemoryStore::new())), + } + } +} + impl fmt::Display for Selector { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -157,7 +188,7 @@ impl FromStr for Selector { fn from_str(s: &str) -> anyhow::Result { match s { - "etcd" => Ok(Self::Etcd(Box::default())), + "etcd" => Ok(Self::Etcd(etcd_transport::ClientOptions::default())), "file" => { let root = env::var("DYN_FILE_KV") .map(PathBuf::from) @@ -247,30 +278,27 @@ impl KeyValueStoreEnum { } #[derive(Clone)] -pub struct Manager(Arc); +pub struct Manager { + selector: Selector, + kv_store: Arc>, + runtime: Option +} impl Default for Manager { fn default() -> Self { - Manager::memory() + Manager::new(Selector::Memory, None) } } impl Manager { - /// In-memory KeyValueStoreManager for testing - pub fn memory() -> Self { - Self::new(KeyValueStoreEnum::Memory(MemoryStore::new())) - } - pub fn etcd(etcd_client: crate::transports::etcd::Client) -> Self { - Self::new(KeyValueStoreEnum::Etcd(EtcdStore::new(etcd_client))) + pub fn new(s: Selector, runtime: Option) -> Manager { + Manager { selector: s, kv_store: Arc::new(OnceCell::new()), runtime } } - pub fn file>(cancel_token: CancellationToken, root: P) -> Self { - Self::new(KeyValueStoreEnum::File(FileStore::new(cancel_token, root))) - } - - fn new(s: KeyValueStoreEnum) -> Manager { - Manager(Arc::new(s)) + fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> { + let selector = self.selector.clone(); + self.kv_store.get_or_try_init(|| { selector.build(self.runtime.clone()).map_err(StoreError::from) }) } pub async fn get_or_create_bucket( @@ -279,18 +307,18 @@ impl Manager { // auto-delete items older than this ttl: Option, ) -> Result, StoreError> { - self.0.get_or_create_bucket(bucket_name, ttl).await + self.get_kv_store()?.get_or_create_bucket(bucket_name, ttl).await } pub async fn get_bucket( &self, bucket_name: &str, ) -> Result>, StoreError> { - self.0.get_bucket(bucket_name).await + self.get_kv_store()?.get_bucket(bucket_name).await } pub fn connection_id(&self) -> u64 { - self.0.connection_id() + self.get_kv_store().unwrap().connection_id() } pub async fn load Deserialize<'a>>( @@ -298,7 +326,7 @@ impl Manager { bucket: &str, key: &Key, ) -> Result, StoreError> { - let Some(bucket) = self.0.get_bucket(bucket).await? else { + let Some(bucket) = self.get_kv_store()?.get_bucket(bucket).await? else { // No bucket means no cards return Ok(None); }; @@ -328,7 +356,7 @@ impl Manager { let watch_task = tokio::spawn(async move { // Start listening for changes but don't poll this yet let bucket = self - .0 + .get_kv_store()? .get_or_create_bucket(&bucket_name, bucket_ttl) .await?; let mut stream = bucket.watch().await?; @@ -373,7 +401,7 @@ impl Manager { obj: &mut T, ) -> anyhow::Result { let obj_json = serde_json::to_vec(obj)?; - let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?; + let bucket = self.get_kv_store()?.get_or_create_bucket(bucket_name, bucket_ttl).await?; let outcome = bucket.insert(key, obj_json.into(), obj.revision()).await?; @@ -388,7 +416,7 @@ impl Manager { /// Cleanup any temporary state. /// TODO: Should this be async? Take &mut self? pub fn shutdown(&self) { - self.0.shutdown() + self.get_kv_store().unwrap().shutdown() } } @@ -471,6 +499,9 @@ pub enum StoreError { #[error("Race condition, retry the call")] Retry, + + #[error("Error building key-value store: {0}")] + BuildError(#[from] anyhow::Error), } /// A trait allowing to get/set a revision on an object. From ef40c05d11ba53c61064154b8264517bf1793cb9 Mon Sep 17 00:00:00 2001 From: jthomson04 Date: Mon, 26 Jan 2026 14:52:44 -0800 Subject: [PATCH 2/6] fmt and clippy Signed-off-by: jthomson04 --- lib/runtime/src/storage/kv.rs | 54 +++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/lib/runtime/src/storage/kv.rs b/lib/runtime/src/storage/kv.rs index 4bff738a1b1..a5d606e604e 100644 --- a/lib/runtime/src/storage/kv.rs +++ b/lib/runtime/src/storage/kv.rs @@ -12,12 +12,12 @@ use std::time::Duration; use std::{collections::HashMap, path::PathBuf}; use std::{env, fmt}; -use crate::{CancellationToken, Runtime}; use crate::transports::etcd as etcd_transport; +use crate::{CancellationToken, Runtime}; use async_trait::async_trait; +use futures::StreamExt; use once_cell::sync::OnceCell; use oneshot; -use futures::StreamExt; use percent_encoding::{NON_ALPHANUMERIC, percent_decode_str, percent_encode}; use serde::{Deserialize, Serialize}; @@ -132,6 +132,7 @@ pub trait Store: Send + Sync { } #[derive(Clone, Debug, Default)] +#[allow(clippy::large_enum_variant)] pub enum Selector { // Box it because it is significantly bigger than the other variants Etcd(etcd_transport::ClientOptions), @@ -148,23 +149,36 @@ impl Selector { if let Some(runtime) = runtime { let (tx, rx) = oneshot::channel(); runtime.primary().spawn(async move { - let etcd_client = etcd_transport::Client::new(opts.clone(), runtime.clone()).await.map_err(StoreError::from); + let etcd_client = + etcd_transport::Client::new(opts.clone(), runtime.clone()) + .await + .map_err(StoreError::from); tx.send(etcd_client).unwrap(); }); - + // We block our async task a tiny bit here, but not a big deal since we only ever do this once. - Ok(KeyValueStoreEnum::Etcd(EtcdStore::new(rx.recv().map_err(|x| StoreError::from(anyhow::anyhow!(x)))??))) + Ok(KeyValueStoreEnum::Etcd(EtcdStore::new( + rx.recv() + .map_err(|x| StoreError::from(anyhow::anyhow!(x)))??, + ))) } else { - Err(StoreError::BuildError(anyhow::anyhow!("Runtime is required for Etcd selector"))) + Err(StoreError::BuildError(anyhow::anyhow!( + "Runtime is required for Etcd selector" + ))) } } Selector::File(path) => { if let Some(runtime) = runtime { - Ok(KeyValueStoreEnum::File(FileStore::new(runtime.primary_token(), path))) + Ok(KeyValueStoreEnum::File(FileStore::new( + runtime.primary_token(), + path, + ))) } else { - Err(StoreError::BuildError(anyhow::anyhow!("Runtime is required for File selector"))) + Err(StoreError::BuildError(anyhow::anyhow!( + "Runtime is required for File selector" + ))) } - }, + } Selector::Memory => Ok(KeyValueStoreEnum::Memory(MemoryStore::new())), } } @@ -281,7 +295,7 @@ impl KeyValueStoreEnum { pub struct Manager { selector: Selector, kv_store: Arc>, - runtime: Option + runtime: Option, } impl Default for Manager { @@ -291,14 +305,19 @@ impl Default for Manager { } impl Manager { - pub fn new(s: Selector, runtime: Option) -> Manager { - Manager { selector: s, kv_store: Arc::new(OnceCell::new()), runtime } + Manager { + selector: s, + kv_store: Arc::new(OnceCell::new()), + runtime, + } } fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> { let selector = self.selector.clone(); - self.kv_store.get_or_try_init(|| { selector.build(self.runtime.clone()).map_err(StoreError::from) }) + self.kv_store.get_or_try_init(|| { + selector + .build(self.runtime.clone())}) } pub async fn get_or_create_bucket( @@ -307,7 +326,9 @@ impl Manager { // auto-delete items older than this ttl: Option, ) -> Result, StoreError> { - self.get_kv_store()?.get_or_create_bucket(bucket_name, ttl).await + self.get_kv_store()? + .get_or_create_bucket(bucket_name, ttl) + .await } pub async fn get_bucket( @@ -401,7 +422,10 @@ impl Manager { obj: &mut T, ) -> anyhow::Result { let obj_json = serde_json::to_vec(obj)?; - let bucket = self.get_kv_store()?.get_or_create_bucket(bucket_name, bucket_ttl).await?; + let bucket = self + .get_kv_store()? + .get_or_create_bucket(bucket_name, bucket_ttl) + .await?; let outcome = bucket.insert(key, obj_json.into(), obj.revision()).await?; From 57fb3ee0111b4616f83391389b6bf20128624429 Mon Sep 17 00:00:00 2001 From: jthomson04 Date: Mon, 26 Jan 2026 22:59:18 -0800 Subject: [PATCH 3/6] More fmt Signed-off-by: jthomson04 --- lib/runtime/src/storage/kv.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/runtime/src/storage/kv.rs b/lib/runtime/src/storage/kv.rs index a5d606e604e..51cf48ecf8c 100644 --- a/lib/runtime/src/storage/kv.rs +++ b/lib/runtime/src/storage/kv.rs @@ -315,9 +315,8 @@ impl Manager { fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> { let selector = self.selector.clone(); - self.kv_store.get_or_try_init(|| { - selector - .build(self.runtime.clone())}) + self.kv_store + .get_or_try_init(|| selector.build(self.runtime.clone())) } pub async fn get_or_create_bucket( From dec2950a677120f9943246357370931f766f4ac9 Mon Sep 17 00:00:00 2001 From: jthomson04 Date: Tue, 27 Jan 2026 10:47:38 -0800 Subject: [PATCH 4/6] check in cargo lock Signed-off-by: jthomson04 --- Cargo.lock | 1 + lib/bindings/python/Cargo.lock | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6000decaf7..538bcc3ece2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2507,6 +2507,7 @@ dependencies = [ "notify", "nuid", "once_cell", + "oneshot", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index fbf6cc1c5cd..405989a16cc 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -1587,7 +1587,6 @@ dependencies = [ "tokio-util", "tracing", "url", - "utoipa", "uuid", ] @@ -1800,6 +1799,7 @@ dependencies = [ "notify", "nuid", "once_cell", + "oneshot", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", @@ -7795,8 +7795,6 @@ dependencies = [ "quote", "regex", "syn 2.0.110", - "url", - "uuid", ] [[package]] From 14bd34651bcfb399af79983c3e234274a9fb8f4f Mon Sep 17 00:00:00 2001 From: jthomson04 Date: Tue, 27 Jan 2026 11:02:42 -0800 Subject: [PATCH 5/6] fix tests Signed-off-by: jthomson04 --- lib/runtime/src/discovery/kv_store.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/runtime/src/discovery/kv_store.rs b/lib/runtime/src/discovery/kv_store.rs index 16456a20d79..b02ec2d827e 100644 --- a/lib/runtime/src/discovery/kv_store.rs +++ b/lib/runtime/src/discovery/kv_store.rs @@ -600,7 +600,7 @@ mod tests { #[tokio::test] async fn test_kv_store_discovery_register_endpoint() { - let store = kv::Manager::memory(); + let store = kv::Manager::default(); let cancel_token = CancellationToken::new(); let client = KVStoreDiscovery::new(store, cancel_token); @@ -625,7 +625,7 @@ mod tests { #[tokio::test] async fn test_kv_store_discovery_list() { - let store = kv::Manager::memory(); + let store = kv::Manager::default(); let cancel_token = CancellationToken::new(); let client = KVStoreDiscovery::new(store, cancel_token); @@ -680,7 +680,7 @@ mod tests { #[tokio::test] async fn test_kv_store_discovery_watch() { - let store = kv::Manager::memory(); + let store = kv::Manager::default(); let cancel_token = CancellationToken::new(); let client = Arc::new(KVStoreDiscovery::new(store, cancel_token.clone())); From 0e47dba7cff60d7fd2133266bb2182c2f6e0ce03 Mon Sep 17 00:00:00 2001 From: jthomson04 Date: Tue, 27 Jan 2026 11:14:57 -0800 Subject: [PATCH 6/6] more clippy fixes Signed-off-by: jthomson04 --- lib/runtime/src/distributed.rs | 2 +- lib/runtime/src/storage/kv.rs | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 69bfc1dd6a1..2f60337862b 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -565,7 +565,7 @@ impl DistributedConfig { let nats_enabled = request_plane.is_nats() || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok(); DistributedConfig { - store_backend: kv::Selector::Etcd(etcd_config), + store_backend: kv::Selector::Etcd(Box::new(etcd_config)), nats_config: if nats_enabled { Some(nats::ClientOptions::default()) } else { diff --git a/lib/runtime/src/storage/kv.rs b/lib/runtime/src/storage/kv.rs index 51cf48ecf8c..e2220bd4477 100644 --- a/lib/runtime/src/storage/kv.rs +++ b/lib/runtime/src/storage/kv.rs @@ -132,10 +132,9 @@ pub trait Store: Send + Sync { } #[derive(Clone, Debug, Default)] -#[allow(clippy::large_enum_variant)] pub enum Selector { // Box it because it is significantly bigger than the other variants - Etcd(etcd_transport::ClientOptions), + Etcd(Box), File(PathBuf), #[default] Memory, @@ -149,10 +148,9 @@ impl Selector { if let Some(runtime) = runtime { let (tx, rx) = oneshot::channel(); runtime.primary().spawn(async move { - let etcd_client = - etcd_transport::Client::new(opts.clone(), runtime.clone()) - .await - .map_err(StoreError::from); + let etcd_client = etcd_transport::Client::new(*opts, runtime.clone()) + .await + .map_err(StoreError::from); tx.send(etcd_client).unwrap(); }); @@ -202,7 +200,7 @@ impl FromStr for Selector { fn from_str(s: &str) -> anyhow::Result { match s { - "etcd" => Ok(Self::Etcd(etcd_transport::ClientOptions::default())), + "etcd" => Ok(Self::Etcd(Box::default())), "file" => { let root = env::var("DYN_FILE_KV") .map(PathBuf::from)