-
Notifications
You must be signed in to change notification settings - Fork 60
Implement parallel component loading with graceful error handling for Wassette MCP server startup optimization #69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a4b868f
1b66ad9
c1b2bd7
85869ad
e43d157
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -9,7 +9,7 @@ use component2json::{ | |||||
| component_exports_to_json_schema, component_exports_to_tools, create_placeholder_results, | ||||||
| json_to_vals, vals_to_json, FunctionIdentifier, ToolMetadata, | ||||||
| }; | ||||||
| use futures::TryStreamExt; | ||||||
| use futures::{future, TryStreamExt}; | ||||||
| use policy::{ | ||||||
| AccessType, EnvironmentPermission, NetworkHostPermission, NetworkPermission, PolicyParser, | ||||||
| StoragePermission, | ||||||
|
|
@@ -543,52 +543,108 @@ impl LifecycleManager { | |||||
| let mut components = HashMap::new(); | ||||||
| let mut policy_registry = PolicyRegistry::default(); | ||||||
|
|
||||||
| let loaded_components = | ||||||
| // Scan for component files | ||||||
| let scanned_components = | ||||||
| tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&plugin_dir).await?) | ||||||
| .map_err(anyhow::Error::from) | ||||||
| .try_filter_map(|entry| { | ||||||
| let value = engine.clone(); | ||||||
| async move { load_component_from_entry(value, entry).await } | ||||||
| }) | ||||||
| .try_filter_map(|entry| async move { scan_component_from_entry(entry).await }) | ||||||
| .try_collect::<Vec<_>>() | ||||||
| .await?; | ||||||
|
|
||||||
| for (component, name) in loaded_components.into_iter() { | ||||||
| let tool_metadata = component_exports_to_tools(&component, &engine, true); | ||||||
| registry | ||||||
| .register_tools(&name, tool_metadata) | ||||||
| .context("unable to insert component into registry")?; | ||||||
| components.insert(name.clone(), Arc::new(component)); | ||||||
|
|
||||||
| // Check for co-located policy file and restore policy association | ||||||
| let policy_path = plugin_dir.as_ref().join(format!("{name}.policy.yaml")); | ||||||
| if policy_path.exists() { | ||||||
| match tokio::fs::read_to_string(&policy_path).await { | ||||||
| Ok(policy_content) => match PolicyParser::parse_str(&policy_content) { | ||||||
| Ok(policy) => { | ||||||
| match wasistate::create_wasi_state_template_from_policy( | ||||||
| &policy, | ||||||
| plugin_dir.as_ref(), | ||||||
| ) { | ||||||
| Ok(wasi_template) => { | ||||||
| policy_registry | ||||||
| .component_policies | ||||||
| .insert(name.clone(), Arc::new(wasi_template)); | ||||||
| info!(component_id = %name, "Restored policy association from co-located file"); | ||||||
| } | ||||||
| info!( | ||||||
| "Found {} components to load in parallel", | ||||||
| scanned_components.len() | ||||||
| ); | ||||||
|
|
||||||
| // Load all components in parallel for faster startup with parallelization | ||||||
| let component_loading_tasks = scanned_components | ||||||
| .into_iter() | ||||||
| .map(|(component_path, component_id)| { | ||||||
| let engine = engine.clone(); | ||||||
| let plugin_dir = plugin_dir.as_ref().to_path_buf(); | ||||||
| async move { | ||||||
| let start_time = Instant::now(); | ||||||
|
|
||||||
| // Load and compile the component in a blocking task | ||||||
| let component = { | ||||||
| let engine = engine.clone(); | ||||||
| let path = component_path.clone(); | ||||||
| tokio::task::spawn_blocking(move || Component::from_file(&engine, path)).await?? | ||||||
| }; | ||||||
|
|
||||||
| // Generate tool metadata | ||||||
| let tool_metadata = component_exports_to_tools(&component, &engine, true); | ||||||
|
|
||||||
| // Load co-located policy if it exists | ||||||
| let policy_template = { | ||||||
| let policy_path = plugin_dir.join(format!("{component_id}.policy.yaml")); | ||||||
| if policy_path.exists() { | ||||||
| match tokio::fs::read_to_string(&policy_path).await { | ||||||
| Ok(policy_content) => match PolicyParser::parse_str(&policy_content) { | ||||||
| Ok(policy) => { | ||||||
| match wasistate::create_wasi_state_template_from_policy( | ||||||
| &policy, | ||||||
| &plugin_dir, | ||||||
| ) { | ||||||
| Ok(wasi_template) => { | ||||||
| info!(component_id = %component_id, "Loaded policy association from co-located file"); | ||||||
| Some(Arc::new(wasi_template)) | ||||||
| } | ||||||
| Err(e) => { | ||||||
| warn!(component_id = %component_id, error = %e, "Failed to create WASI template from policy"); | ||||||
| None | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| Err(e) => { | ||||||
| warn!(component_id = %component_id, error = %e, "Failed to parse co-located policy file"); | ||||||
| None | ||||||
| } | ||||||
| }, | ||||||
| Err(e) => { | ||||||
| warn!(component_id = %name, error = %e, "Failed to create WASI template from policy"); | ||||||
| warn!(component_id = %component_id, error = %e, "Failed to read co-located policy file"); | ||||||
| None | ||||||
| } | ||||||
| } | ||||||
| } else { | ||||||
| None | ||||||
| } | ||||||
| Err(e) => { | ||||||
| warn!(component_id = %name, error = %e, "Failed to parse co-located policy file"); | ||||||
| } | ||||||
| }, | ||||||
| Err(e) => { | ||||||
| warn!(component_id = %name, error = %e, "Failed to read co-located policy file"); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| info!(component_id = %component_id, elapsed = ?start_time.elapsed(), "Component loaded successfully"); | ||||||
|
|
||||||
| Ok::<_, anyhow::Error>((component_id, Arc::new(component), tool_metadata, policy_template)) | ||||||
| } | ||||||
| }); | ||||||
|
|
||||||
| // Await all component loading tasks in parallel, filtering out failed components | ||||||
| let component_results = future::join_all(component_loading_tasks).await; | ||||||
| let loaded_components: Vec<_> = component_results | ||||||
| .into_iter() | ||||||
| .filter_map(|result| match result { | ||||||
| Ok((component_id, component, tool_metadata, policy_template)) => { | ||||||
| Some((component_id, component, tool_metadata, policy_template)) | ||||||
| } | ||||||
| Err(e) => { | ||||||
| warn!(error = %e, "Failed to load component, skipping"); | ||||||
| None | ||||||
| } | ||||||
| }) | ||||||
| .collect(); | ||||||
|
|
||||||
| // Now register all loaded components | ||||||
| for (component_id, component, tool_metadata, policy_template) in loaded_components { | ||||||
| registry | ||||||
| .register_tools(&component_id, tool_metadata) | ||||||
| .with_context(|| { | ||||||
| format!("unable to register tools for component {component_id}") | ||||||
| })?; | ||||||
| components.insert(component_id.clone(), component); | ||||||
|
|
||||||
| if let Some(policy_template) = policy_template { | ||||||
| policy_registry | ||||||
| .component_policies | ||||||
| .insert(component_id, policy_template); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -600,7 +656,10 @@ impl LifecycleManager { | |||||
| .await | ||||||
| .context("Failed to create downloads directory")?; | ||||||
|
|
||||||
| info!("LifecycleManager initialized successfully"); | ||||||
| info!( | ||||||
| "LifecycleManager initialized successfully with {} components loaded in parallel", | ||||||
| components.len() | ||||||
| ); | ||||||
| Ok(Self { | ||||||
| engine, | ||||||
| components: Arc::new(RwLock::new(components)), | ||||||
|
|
@@ -675,6 +734,7 @@ impl LifecycleManager { | |||||
| pub async fn uninstall_component(&self, id: &str) -> Result<()> { | ||||||
| debug!("Uninstalling component"); | ||||||
| self.unload_component(id).await; | ||||||
|
|
||||||
| let component_file = self.component_path(id); | ||||||
| tokio::fs::remove_file(&component_file) | ||||||
| .await | ||||||
|
|
@@ -689,40 +749,43 @@ impl LifecycleManager { | |||||
| #[instrument(skip(self))] | ||||||
| pub async fn get_component_id_for_tool(&self, tool_name: &str) -> Result<String> { | ||||||
| let registry = self.registry.read().await; | ||||||
| let tool_infos = registry | ||||||
| .get_tool_info(tool_name) | ||||||
| .context("Tool not found")?; | ||||||
|
|
||||||
| if tool_infos.len() > 1 { | ||||||
| bail!( | ||||||
| "Multiple components found for tool '{}': {}", | ||||||
| tool_name, | ||||||
| tool_infos | ||||||
| .iter() | ||||||
| .map(|info| info.component_id.as_str()) | ||||||
| .collect::<Vec<_>>() | ||||||
| .join(", ") | ||||||
| ); | ||||||
| if let Some(tool_infos) = registry.get_tool_info(tool_name) { | ||||||
| if tool_infos.len() > 1 { | ||||||
| bail!( | ||||||
| "Multiple components found for tool '{}': {}", | ||||||
| tool_name, | ||||||
| tool_infos | ||||||
| .iter() | ||||||
| .map(|info| info.component_id.as_str()) | ||||||
| .collect::<Vec<_>>() | ||||||
| .join(", ") | ||||||
| ); | ||||||
| } | ||||||
| Ok(tool_infos[0].component_id.clone()) | ||||||
| } else { | ||||||
| bail!("Tool not found: {}", tool_name); | ||||||
| } | ||||||
|
|
||||||
| Ok(tool_infos[0].component_id.clone()) | ||||||
| } | ||||||
|
|
||||||
| /// Lists all available tools across all components | ||||||
| #[instrument(skip(self))] | ||||||
| pub async fn list_tools(&self) -> Vec<Value> { | ||||||
| // All components are loaded at startup with parallel loading | ||||||
| self.registry.read().await.list_tools() | ||||||
| } | ||||||
|
|
||||||
| /// Returns the requested component. Returns `None` if the component is not found. | ||||||
| #[instrument(skip(self))] | ||||||
| pub async fn get_component(&self, component_id: &str) -> Option<Arc<Component>> { | ||||||
| // All components are loaded at startup with parallel loading | ||||||
|
||||||
| // All components are loaded at startup with parallel loading | |
| // Components are pre-loaded at startup, so this is an immediate lookup |
Copilot
AI
Aug 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The comment doesn't add value since it restates what 'parallel loading' already implies. Consider removing this comment or making it more specific about why this matters for the function's behavior.
Note: See the diff below for a potential fix:
@@ -748,20 +748,20 @@
/// Lists all available tools across all components
#[instrument(skip(self))]
pub async fn list_tools(&self) -> Vec<Value> {
- // All components are loaded at startup with parallel loading
+
self.registry.read().await.list_tools()
}
/// Returns the requested component. Returns `None` if the component is not found.
#[instrument(skip(self))]
pub async fn get_component(&self, component_id: &str) -> Option<Arc<Component>> {
- // All components are loaded at startup with parallel loading
+
self.components.read().await.get(component_id).cloned()
}
#[instrument(skip(self))]
pub async fn list_components(&self) -> Vec<String> {
- // All components are loaded at startup with parallel loading
+
let components = self.components.read().await;
components.keys().cloned().collect()
}
Copilot
AI
Aug 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This comment is repetitive and doesn't add value. Consider removing it or making it more specific about the performance characteristics.
Note: See the diff below for a potential fix:
@@ -748,21 +748,21 @@
/// Lists all available tools across all components
#[instrument(skip(self))]
pub async fn list_tools(&self) -> Vec<Value> {
- // All components are loaded at startup with parallel loading
self.registry.read().await.list_tools()
+
}
/// Returns the requested component. Returns `None` if the component is not found.
#[instrument(skip(self))]
pub async fn get_component(&self, component_id: &str) -> Option<Arc<Component>> {
- // All components are loaded at startup with parallel loading
self.components.read().await.get(component_id).cloned()
+
}
#[instrument(skip(self))]
pub async fn list_components(&self) -> Vec<String> {
- // All components are loaded at startup with parallel loading
let components = self.components.read().await;
+
components.keys().cloned().collect()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The comment is redundant - 'for faster startup with parallelization' repeats the same concept as 'in parallel'. Consider simplifying to '// Load all components in parallel for faster startup'.