-
Notifications
You must be signed in to change notification settings - Fork 822
feat: Lazy ETCD initialization #5690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: jthomson04 <jwillthomson19@gmail.com>
Signed-off-by: jthomson04 <jwillthomson19@gmail.com>
Signed-off-by: jthomson04 <jwillthomson19@gmail.com>
Signed-off-by: jthomson04 <jwillthomson19@gmail.com>
WalkthroughThe changes introduce a lazy-initialization pattern for the key-value store with runtime-aware configuration. The Manager struct is refactored to defer KV store creation via Selector.build(), the oneshot dependency is added, and initialization logic across multiple modules is simplified to use the new Manager API. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@lib/runtime/src/storage/kv.rs`:
- Around line 338-340: The current connection_id() method calls
self.get_kv_store().unwrap() and can panic on misconfigured Manager or failed
lazy init; change connection_id() to return Result<u64, StoreError> (or the
crate's appropriate error type) and propagate the error from get_kv_store()
instead of unwrapping, updating its signature and all callers to handle the
Result; do the same for shutdown() (remove unwraps, return Result<(),
StoreError>) or alternatively replace unwrap() with expect() that includes clear
context only if you decide to keep panics — locate and update the
connection_id(), shutdown(), and get_kv_store() call sites to propagate/handle
the new Result types.
🧹 Nitpick comments (1)
lib/runtime/src/storage/kv.rs (1)
144-183: Consider usingblock_onor restructuring to avoid blocking an async thread.The
rx.recv()call on line 159 is a synchronous blocking operation that will block the entire OS thread. While the comment acknowledges this is a one-time operation, blocking an async runtime's worker thread can cause issues if:
- The spawned task takes longer than expected (network delays during ETCD connection)
- Other tasks on the same worker thread are starved
Since
Selector::buildis called fromget_kv_storewhich is synchronous (returnsResult<&KeyValueStoreEnum, StoreError>), you may need to reconsider the API design. Alternatives include:
- Making
get_kv_storeasync and using.awaitinstead of blocking- Using
tokio::task::block_in_placeto signal this is intentional blockingThat said, given the PR objective is to defer ETCD initialization until after model loading (when the runtime is less busy), this trade-off may be acceptable.
| pub fn connection_id(&self) -> u64 { | ||
| self.0.connection_id() | ||
| self.get_kv_store().unwrap().connection_id() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential panic on misconfigured Manager.
connection_id() uses .unwrap() which will panic if get_kv_store() fails. This can occur if:
Managerwas created withSelector::EtcdorSelector::FilebutruntimeisNone- The ETCD connection fails during lazy initialization
Consider returning Result<u64, StoreError> or documenting this as a precondition. The same applies to shutdown() on line 440.
💡 Suggested alternatives
Option 1: Return Result
- pub fn connection_id(&self) -> u64 {
- self.get_kv_store().unwrap().connection_id()
+ pub fn connection_id(&self) -> Result<u64, StoreError> {
+ Ok(self.get_kv_store()?.connection_id())
}Option 2: Add expect with context
- self.get_kv_store().unwrap().connection_id()
+ self.get_kv_store()
+ .expect("KV store must be initialized before calling connection_id")
+ .connection_id()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn connection_id(&self) -> u64 { | |
| self.0.connection_id() | |
| self.get_kv_store().unwrap().connection_id() | |
| } | |
| pub fn connection_id(&self) -> u64 { | |
| self.get_kv_store() | |
| .expect("KV store must be initialized before calling connection_id") | |
| .connection_id() | |
| } |
🤖 Prompt for AI Agents
In `@lib/runtime/src/storage/kv.rs` around lines 338 - 340, The current
connection_id() method calls self.get_kv_store().unwrap() and can panic on
misconfigured Manager or failed lazy init; change connection_id() to return
Result<u64, StoreError> (or the crate's appropriate error type) and propagate
the error from get_kv_store() instead of unwrapping, updating its signature and
all callers to handle the Result; do the same for shutdown() (remove unwraps,
return Result<(), StoreError>) or alternatively replace unwrap() with expect()
that includes clear context only if you decide to keep panics — locate and
update the connection_id(), shutdown(), and get_kv_store() call sites to
propagate/handle the new Result types.
| Ok(KeyValueStoreEnum::Etcd(EtcdStore::new( | ||
| rx.recv() | ||
| .map_err(|x| StoreError::from(anyhow::anyhow!(x)))??, | ||
| ))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why create the transport in a different task? Doesn't this resolve to creating it inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, it's the async. Thats' a pain.
| fn new(s: KeyValueStoreEnum) -> Manager { | ||
| Manager(Arc::new(s)) | ||
| fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> { | ||
| let selector = self.selector.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are cloning the selector every time, but only using it on the very first call.
- You could move the clone into the function.
- If you put it in an
Optionyou can doselector.take()to get it with no copying.
|
|
||
| pub fn connection_id(&self) -> u64 { | ||
| self.0.connection_id() | ||
| self.get_kv_store().unwrap().connection_id() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't have any unwrap() in production code, unless they really can never happen. In that case add a comment: // Safety: why it can't happen.
| /// TODO: Should this be async? Take &mut self? | ||
| pub fn shutdown(&self) { | ||
| self.0.shutdown() | ||
| self.get_kv_store().unwrap().shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for unwrap. A panic here would mask any shutdown errors.
|
|
||
| fn new(s: KeyValueStoreEnum) -> Manager { | ||
| Manager(Arc::new(s)) | ||
| fn get_kv_store(&self) -> Result<&KeyValueStoreEnum, StoreError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Result is unfortunate, because it will always be Ok after it's initialize. I wonder if you can design it without?
| let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err| | ||
| // The returned error doesn't show because of a dropped runtime error, so | ||
| // log it first. | ||
| tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you lost this part. That was a hard earned logging statement and comment.
|
It feels like a hack to get around an environment issue. Could you investigate more and find the root cause? I'd prefer we figure it out upstream. This complicates the code. |
ETCD seems to be very sensitive to network and memory pressure. During the network and memory-intensive model loading process (especially when loading from ETCD), pings from the ETCD server can take several seconds to be delivered. To get around this, we lazily initialize ETCD, and only create our lease after the model loading has completed.
Summary by CodeRabbit
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.