Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ either = { workspace = true }
etcd-client = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
oneshot = { workspace = true }
parking_lot = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions lib/runtime/src/discovery/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ mod tests {

#[tokio::test]
async fn test_kv_store_discovery_register_endpoint() {
let store = kv::Manager::memory();
let store = kv::Manager::default();
let cancel_token = CancellationToken::new();
let client = KVStoreDiscovery::new(store, cancel_token);

Expand All @@ -625,7 +625,7 @@ mod tests {

#[tokio::test]
async fn test_kv_store_discovery_list() {
let store = kv::Manager::memory();
let store = kv::Manager::default();
let cancel_token = CancellationToken::new();
let client = KVStoreDiscovery::new(store, cancel_token);

Expand Down Expand Up @@ -680,7 +680,7 @@ mod tests {

#[tokio::test]
async fn test_kv_store_discovery_watch() {
let store = kv::Manager::memory();
let store = kv::Manager::default();
let cancel_token = CancellationToken::new();
let client = Arc::new(KVStoreDiscovery::new(store, cancel_token.clone()));

Expand Down
14 changes: 1 addition & 13 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,7 @@ impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (selected_kv_store, nats_config, request_plane) = config.dissolve();

let runtime_clone = runtime.clone();

let store = match selected_kv_store {
kv::Selector::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
// The returned error doesn't show because of a dropped runtime error, so
// log it first.
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you lost this part. That was a hard earned logging statement and comment.

kv::Manager::etcd(etcd_client)
}
kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
kv::Selector::Memory => kv::Manager::memory(),
};
let store = kv::Manager::new(selected_kv_store, Some(runtime.clone()));

let nats_client = match nats_config {
Some(nc) => Some(nc.connect().await?),
Expand Down
98 changes: 75 additions & 23 deletions lib/runtime/src/storage/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use std::time::Duration;
use std::{collections::HashMap, path::PathBuf};
use std::{env, fmt};

use crate::CancellationToken;
use crate::transports::etcd as etcd_transport;
use crate::{CancellationToken, Runtime};
use async_trait::async_trait;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use oneshot;
use percent_encoding::{NON_ALPHANUMERIC, percent_decode_str, percent_encode};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -139,6 +141,47 @@ pub enum Selector {
// Nats not listed because likely we want to remove that impl. It is not currently used and not well tested.
}

impl Selector {
fn build(self, runtime: Option<Runtime>) -> Result<KeyValueStoreEnum, StoreError> {
match self {
Selector::Etcd(opts) => {
if let Some(runtime) = runtime {
let (tx, rx) = oneshot::channel();
runtime.primary().spawn(async move {
let etcd_client = etcd_transport::Client::new(*opts, runtime.clone())
.await
.map_err(StoreError::from);
tx.send(etcd_client).unwrap();
});

// We block our async task a tiny bit here, but not a big deal since we only ever do this once.
Ok(KeyValueStoreEnum::Etcd(EtcdStore::new(
rx.recv()
.map_err(|x| StoreError::from(anyhow::anyhow!(x)))??,
)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create the transport in a different task? Doesn't this resolve to creating it inline?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, it's the async. Thats' a pain.

} else {
Err(StoreError::BuildError(anyhow::anyhow!(
"Runtime is required for Etcd selector"
)))
}
}
Selector::File(path) => {
if let Some(runtime) = runtime {
Ok(KeyValueStoreEnum::File(FileStore::new(
runtime.primary_token(),
path,
)))
} else {
Err(StoreError::BuildError(anyhow::anyhow!(
"Runtime is required for File selector"
)))
}
}
Selector::Memory => Ok(KeyValueStoreEnum::Memory(MemoryStore::new())),
}
}
}

impl fmt::Display for Selector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down Expand Up @@ -247,30 +290,31 @@ impl KeyValueStoreEnum {
}

#[derive(Clone)]
pub struct Manager(Arc<KeyValueStoreEnum>);
pub struct Manager {
selector: Selector,
kv_store: Arc<OnceCell<KeyValueStoreEnum>>,
runtime: Option<Runtime>,
}

impl Default for Manager {
fn default() -> Self {
Manager::memory()
Manager::new(Selector::Memory, None)
}
}

impl Manager {
/// In-memory KeyValueStoreManager for testing
pub fn memory() -> Self {
Self::new(KeyValueStoreEnum::Memory(MemoryStore::new()))
}

pub fn etcd(etcd_client: crate::transports::etcd::Client) -> Self {
Self::new(KeyValueStoreEnum::Etcd(EtcdStore::new(etcd_client)))
}

pub fn file<P: Into<PathBuf>>(cancel_token: CancellationToken, root: P) -> Self {
Self::new(KeyValueStoreEnum::File(FileStore::new(cancel_token, root)))
pub fn new(s: Selector, runtime: Option<Runtime>) -> Manager {
Manager {
selector: s,
kv_store: Arc::new(OnceCell::new()),
runtime,
}
}

fn new(s: KeyValueStoreEnum) -> Manager {
Manager(Arc::new(s))
fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Result is unfortunate, because it will always be Ok after it's initialize. I wonder if you can design it without?

let selector = self.selector.clone();
Copy link
Contributor

@grahamking grahamking Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are cloning the selector every time, but only using it on the very first call.

  1. You could move the clone into the function.
  2. If you put it in an Option you can do selector.take() to get it with no copying.

self.kv_store
.get_or_try_init(|| selector.build(self.runtime.clone()))
}

pub async fn get_or_create_bucket(
Expand All @@ -279,26 +323,28 @@ impl Manager {
// auto-delete items older than this
ttl: Option<Duration>,
) -> Result<Box<dyn Bucket>, StoreError> {
self.0.get_or_create_bucket(bucket_name, ttl).await
self.get_kv_store()?
.get_or_create_bucket(bucket_name, ttl)
.await
}

pub async fn get_bucket(
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn Bucket>>, StoreError> {
self.0.get_bucket(bucket_name).await
self.get_kv_store()?.get_bucket(bucket_name).await
}

pub fn connection_id(&self) -> u64 {
self.0.connection_id()
self.get_kv_store().unwrap().connection_id()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have any unwrap() in production code, unless they really can never happen. In that case add a comment: // Safety: why it can't happen.

}
Comment on lines 338 to 340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential panic on misconfigured Manager.

connection_id() uses .unwrap() which will panic if get_kv_store() fails. This can occur if:

  • Manager was created with Selector::Etcd or Selector::File but runtime is None
  • The ETCD connection fails during lazy initialization

Consider returning Result<u64, StoreError> or documenting this as a precondition. The same applies to shutdown() on line 440.

💡 Suggested alternatives

Option 1: Return Result

-    pub fn connection_id(&self) -> u64 {
-        self.get_kv_store().unwrap().connection_id()
+    pub fn connection_id(&self) -> Result<u64, StoreError> {
+        Ok(self.get_kv_store()?.connection_id())
     }

Option 2: Add expect with context

-        self.get_kv_store().unwrap().connection_id()
+        self.get_kv_store()
+            .expect("KV store must be initialized before calling connection_id")
+            .connection_id()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn connection_id(&self) -> u64 {
self.0.connection_id()
self.get_kv_store().unwrap().connection_id()
}
pub fn connection_id(&self) -> u64 {
self.get_kv_store()
.expect("KV store must be initialized before calling connection_id")
.connection_id()
}
🤖 Prompt for AI Agents
In `@lib/runtime/src/storage/kv.rs` around lines 338 - 340, The current
connection_id() method calls self.get_kv_store().unwrap() and can panic on
misconfigured Manager or failed lazy init; change connection_id() to return
Result<u64, StoreError> (or the crate's appropriate error type) and propagate
the error from get_kv_store() instead of unwrapping, updating its signature and
all callers to handle the Result; do the same for shutdown() (remove unwraps,
return Result<(), StoreError>) or alternatively replace unwrap() with expect()
that includes clear context only if you decide to keep panics — locate and
update the connection_id(), shutdown(), and get_kv_store() call sites to
propagate/handle the new Result types.


pub async fn load<T: for<'a> Deserialize<'a>>(
&self,
bucket: &str,
key: &Key,
) -> Result<Option<T>, StoreError> {
let Some(bucket) = self.0.get_bucket(bucket).await? else {
let Some(bucket) = self.get_kv_store()?.get_bucket(bucket).await? else {
// No bucket means no cards
return Ok(None);
};
Expand Down Expand Up @@ -328,7 +374,7 @@ impl Manager {
let watch_task = tokio::spawn(async move {
// Start listening for changes but don't poll this yet
let bucket = self
.0
.get_kv_store()?
.get_or_create_bucket(&bucket_name, bucket_ttl)
.await?;
let mut stream = bucket.watch().await?;
Expand Down Expand Up @@ -373,7 +419,10 @@ impl Manager {
obj: &mut T,
) -> anyhow::Result<StoreOutcome> {
let obj_json = serde_json::to_vec(obj)?;
let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?;
let bucket = self
.get_kv_store()?
.get_or_create_bucket(bucket_name, bucket_ttl)
.await?;

let outcome = bucket.insert(key, obj_json.into(), obj.revision()).await?;

Expand All @@ -388,7 +437,7 @@ impl Manager {
/// Cleanup any temporary state.
/// TODO: Should this be async? Take &mut self?
pub fn shutdown(&self) {
self.0.shutdown()
self.get_kv_store().unwrap().shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for unwrap. A panic here would mask any shutdown errors.

}
}

Expand Down Expand Up @@ -471,6 +520,9 @@ pub enum StoreError {

#[error("Race condition, retry the call")]
Retry,

#[error("Error building key-value store: {0}")]
BuildError(#[from] anyhow::Error),
}

/// A trait allowing to get/set a revision on an object.
Expand Down
Loading