Skip to content

Commit 935ef64

Browse files
committed
refactor(wassette): Refactor LifecycleManager and related components
- Introduced LifecycleConfig / LifecycleBuilder for LifecycleManager creation - Component state is centralized in ComponentRegistry / ComponentStorage - Implemented PolicyManager to handle policy attachment and management. - Updated LifecycleManager methods to utilize new abstractions - The loading pipline is split into resolve/stage/compile/register steps Signed-off-by: Jiaxiao Zhou <duibao55328@gmail.com>
1 parent ff117d9 commit 935ef64

20 files changed

+1555
-1286
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,12 @@ Wassette comes with several built-in tools for managing components and their per
118118
```json
119119
{
120120
"status": "component loaded successfully",
121-
"id": "component-unique-id"
121+
"id": "component-unique-id",
122+
"tools": ["tool-one", "tool-two"]
122123
}
123124
```
125+
When an existing component is replaced, the `status` value becomes
126+
`component reloaded successfully`.
124127

125128
### unload-component
126129
**Parameters:**

crates/mcp-server/src/components.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use rmcp::model::{CallToolRequestParam, CallToolResult, Content, Tool};
1010
use rmcp::{Peer, RoleServer};
1111
use serde_json::{json, Map, Value};
1212
use tracing::{debug, error, info, instrument};
13-
use wassette::LifecycleManager;
13+
use wassette::{ComponentLoadOutcome, LifecycleManager, LoadResult};
1414

1515
#[instrument(skip(lifecycle_manager))]
1616
pub(crate) async fn get_component_tools(lifecycle_manager: &LifecycleManager) -> Result<Vec<Tool>> {
@@ -54,9 +54,9 @@ pub(crate) async fn handle_load_component(
5454
info!(path, "Loading component");
5555

5656
match lifecycle_manager.load_component(path).await {
57-
Ok((id, _load_result)) => {
58-
handle_tool_list_notification(Some(server_peer), &id, "load").await;
59-
create_component_success_result("load", &id)
57+
Ok(outcome) => {
58+
handle_tool_list_notification(Some(server_peer), &outcome.component_id, "load").await;
59+
create_load_component_success_result(&outcome)
6060
}
6161
Err(e) => {
6262
error!(error = %e, path, "Failed to load component");
@@ -320,6 +320,27 @@ fn create_component_success_result(
320320
})
321321
}
322322

323+
fn create_load_component_success_result(outcome: &ComponentLoadOutcome) -> Result<CallToolResult> {
324+
let status = match outcome.status {
325+
LoadResult::New => "component loaded successfully",
326+
LoadResult::Replaced => "component reloaded successfully",
327+
};
328+
329+
let status_text = serde_json::to_string(&json!({
330+
"status": status,
331+
"id": &outcome.component_id,
332+
"tools": &outcome.tool_names,
333+
}))?;
334+
335+
let contents = vec![Content::text(status_text)];
336+
337+
Ok(CallToolResult {
338+
content: Some(contents),
339+
structured_content: None,
340+
is_error: None,
341+
})
342+
}
343+
323344
/// Create error result for component operations
324345
fn create_component_error_result(
325346
operation_name: &str,
@@ -379,9 +400,9 @@ pub async fn handle_load_component_cli(
379400
info!(path, "Loading component (CLI mode)");
380401

381402
match lifecycle_manager.load_component(path).await {
382-
Ok((id, _load_result)) => {
383-
handle_tool_list_notification(None, &id, "load").await;
384-
create_component_success_result("load", &id)
403+
Ok(outcome) => {
404+
handle_tool_list_notification(None, &outcome.component_id, "load").await;
405+
create_load_component_success_result(&outcome)
385406
}
386407
Err(e) => {
387408
error!(error = %e, path, "Failed to load component");
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
use std::path::{Path, PathBuf};
2+
use std::sync::Arc;
3+
4+
use anyhow::{anyhow, Context, Result};
5+
use sha2::Digest;
6+
use tokio::sync::Semaphore;
7+
8+
use crate::{ComponentMetadata, ValidationStamp};
9+
10+
/// Handles filesystem layout and metadata persistence for components.
11+
#[derive(Clone)]
12+
pub struct ComponentStorage {
13+
root: PathBuf,
14+
downloads_dir: PathBuf,
15+
downloads_semaphore: Arc<Semaphore>,
16+
}
17+
18+
impl ComponentStorage {
19+
/// Create a new storage manager rooted at the plugin directory.
20+
pub fn new(root: PathBuf, max_concurrent_downloads: usize) -> Self {
21+
let downloads_dir = root.join(crate::DOWNLOADS_DIR);
22+
Self {
23+
root,
24+
downloads_dir,
25+
downloads_semaphore: Arc::new(Semaphore::new(max_concurrent_downloads.max(1))),
26+
}
27+
}
28+
29+
/// Ensure the directory structure exists on disk.
30+
pub async fn ensure_layout(&self) -> Result<()> {
31+
tokio::fs::create_dir_all(&self.root)
32+
.await
33+
.with_context(|| {
34+
format!(
35+
"Failed to create plugin directory at {}",
36+
self.root.display()
37+
)
38+
})?;
39+
tokio::fs::create_dir_all(&self.downloads_dir)
40+
.await
41+
.with_context(|| {
42+
format!(
43+
"Failed to create downloads directory at {}",
44+
self.downloads_dir.display()
45+
)
46+
})?;
47+
Ok(())
48+
}
49+
50+
/// Root plugin directory containing components.
51+
pub fn root(&self) -> &Path {
52+
&self.root
53+
}
54+
55+
/// Directory used for staging downloaded artifacts.
56+
#[allow(dead_code)]
57+
pub fn downloads_dir(&self) -> &Path {
58+
&self.downloads_dir
59+
}
60+
61+
/// Acquire a permit for filesystem-bound downloads.
62+
pub async fn acquire_download_permit(&self) -> tokio::sync::OwnedSemaphorePermit {
63+
self.downloads_semaphore
64+
.clone()
65+
.acquire_owned()
66+
.await
67+
.expect("Semaphore closed")
68+
}
69+
70+
/// Absolute path to the component `.wasm` file.
71+
pub fn component_path(&self, component_id: &str) -> PathBuf {
72+
self.root.join(format!("{component_id}.wasm"))
73+
}
74+
75+
/// Absolute path to the policy file associated with a component.
76+
pub fn policy_path(&self, component_id: &str) -> PathBuf {
77+
self.root.join(format!("{component_id}.policy.yaml"))
78+
}
79+
80+
/// Absolute path to the metadata JSON for a component.
81+
pub fn metadata_path(&self, component_id: &str) -> PathBuf {
82+
self.root
83+
.join(format!("{component_id}.{}", crate::METADATA_EXT))
84+
}
85+
86+
/// Absolute path to the precompiled component cache file.
87+
pub fn precompiled_path(&self, component_id: &str) -> PathBuf {
88+
self.root
89+
.join(format!("{component_id}.{}", crate::PRECOMPILED_EXT))
90+
}
91+
92+
/// Absolute path to the policy metadata JSON for a component.
93+
pub fn policy_metadata_path(&self, component_id: &str) -> PathBuf {
94+
self.root.join(format!("{component_id}.policy.meta.json"))
95+
}
96+
97+
/// Persist component metadata to disk.
98+
pub async fn write_metadata(&self, metadata: &ComponentMetadata) -> Result<()> {
99+
let path = self.metadata_path(&metadata.component_id);
100+
let json = serde_json::to_string_pretty(metadata)
101+
.context("Failed to serialize component metadata")?;
102+
tokio::fs::write(&path, json)
103+
.await
104+
.with_context(|| format!("Failed to write component metadata to {}", path.display()))
105+
}
106+
107+
/// Load component metadata from disk if present.
108+
pub async fn read_metadata(&self, component_id: &str) -> Result<Option<ComponentMetadata>> {
109+
let path = self.metadata_path(component_id);
110+
if !path.exists() {
111+
return Ok(None);
112+
}
113+
114+
let contents = tokio::fs::read_to_string(&path).await.with_context(|| {
115+
format!("Failed to read component metadata from {}", path.display())
116+
})?;
117+
118+
let metadata = serde_json::from_str(&contents).with_context(|| {
119+
format!("Failed to parse component metadata from {}", path.display())
120+
})?;
121+
Ok(Some(metadata))
122+
}
123+
124+
/// Write precompiled component bytes to disk.
125+
pub async fn write_precompiled(&self, component_id: &str, bytes: &[u8]) -> Result<()> {
126+
let path = self.precompiled_path(component_id);
127+
tokio::fs::write(&path, bytes).await.with_context(|| {
128+
format!(
129+
"Failed to write precompiled component to {}",
130+
path.display()
131+
)
132+
})
133+
}
134+
135+
/// Remove a file if it exists, translating IO errors into `anyhow`.
136+
pub async fn remove_if_exists(
137+
&self,
138+
path: &Path,
139+
description: &str,
140+
component_id: &str,
141+
) -> Result<()> {
142+
match tokio::fs::remove_file(path).await {
143+
Ok(()) => {
144+
tracing::debug!(component_id = %component_id, path = %path.display(), "Removed {}", description);
145+
}
146+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
147+
tracing::debug!(component_id = %component_id, path = %path.display(), "{} already absent", description);
148+
}
149+
Err(e) => {
150+
return Err(anyhow!(
151+
"Failed to remove {} at {}: {}",
152+
description,
153+
path.display(),
154+
e
155+
));
156+
}
157+
}
158+
Ok(())
159+
}
160+
161+
/// Create a validation stamp for a file.
162+
pub async fn create_validation_stamp(
163+
path: &Path,
164+
include_hash: bool,
165+
) -> Result<ValidationStamp> {
166+
let metadata = tokio::fs::metadata(path)
167+
.await
168+
.with_context(|| format!("Failed to read metadata for {}", path.display()))?;
169+
170+
let file_size = metadata.len();
171+
let mtime = metadata
172+
.modified()
173+
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))
174+
.and_then(|t| {
175+
t.duration_since(std::time::UNIX_EPOCH)
176+
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))
177+
})?
178+
.as_secs();
179+
180+
let content_hash = if include_hash {
181+
let bytes = tokio::fs::read(path)
182+
.await
183+
.with_context(|| format!("Failed to read {} for hashing", path.display()))?;
184+
let mut hasher = sha2::Sha256::new();
185+
hasher.update(&bytes);
186+
Some(format!("{:x}", hasher.finalize()))
187+
} else {
188+
None
189+
};
190+
191+
Ok(ValidationStamp {
192+
file_size,
193+
mtime,
194+
content_hash,
195+
})
196+
}
197+
198+
/// Check if the validation stamp matches the current file on disk.
199+
pub async fn validate_stamp(path: &Path, stamp: &ValidationStamp) -> bool {
200+
let Ok(metadata) = tokio::fs::metadata(path).await else {
201+
return false;
202+
};
203+
204+
if metadata.len() != stamp.file_size {
205+
return false;
206+
}
207+
208+
let Ok(mtime) = metadata
209+
.modified()
210+
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))
211+
.and_then(|t| {
212+
t.duration_since(std::time::UNIX_EPOCH)
213+
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))
214+
})
215+
.map(|d| d.as_secs())
216+
else {
217+
return false;
218+
};
219+
220+
if mtime != stamp.mtime {
221+
return false;
222+
}
223+
224+
if let Some(expected_hash) = &stamp.content_hash {
225+
let Ok(content) = tokio::fs::read(path).await else {
226+
return false;
227+
};
228+
let mut hasher = sha2::Sha256::new();
229+
hasher.update(&content);
230+
let actual_hash = format!("{:x}", hasher.finalize());
231+
if &actual_hash != expected_hash {
232+
return false;
233+
}
234+
}
235+
236+
true
237+
}
238+
}

0 commit comments

Comments
 (0)