Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions crates/mcp-server/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) async fn get_component_tools(lifecycle_manager: &LifecycleManager) ->
pub(crate) async fn handle_load_component(
req: &CallToolRequestParam,
lifecycle_manager: &LifecycleManager,
server_peer: Option<Peer<RoleServer>>,
server_peer: Peer<RoleServer>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case still for having this be optional? I'm not sure I follow why we are removing the optionality here based on the PR description as we could still notify before

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server_peer is always None as far as I understand. I remember we used to have a set_peer function but it was removed, and thus this variable is never Some(...), causing the peer.notify_tool_list_changed() to never be invoked.

I am not sure why we could still notify before though...

) -> Result<CallToolResult> {
let args = extract_args_from_request(req)?;
let path = args
Expand All @@ -60,15 +60,18 @@ pub(crate) async fn handle_load_component(

let contents = vec![Content::text(status_text)];

if let Some(peer) = server_peer {
if let Err(e) = peer.notify_tool_list_changed().await {
error!("Failed to send tool list change notification: {}", e);
} else {
info!(
"Sent tool list changed notification after loading component {}",
id
);
}
info!(
"Notifying server peer about tool list change after loading component {}",
id
);
// Notify the server peer about the tool list change
if let Err(e) = server_peer.notify_tool_list_changed().await {
error!("Failed to send tool list change notification: {}", e);
} else {
info!(
"Sent tool list changed notification after loading component {}",
id
);
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The logging is verbose with three separate log statements for a single notification operation. Consider consolidating into fewer statements or using a single log with conditional messaging based on success/failure."

Note: See the diff below for a potential fix:

@@ -60,18 +60,21 @@
 
             let contents = vec![Content::text(status_text)];
 
-            info!(
-                component_id = %id,
-                "Notifying server peer about tool list change after loading component"
-            );
             // Notify the server peer about the tool list change
-            if let Err(e) = server_peer.notify_tool_list_changed().await {
-                error!(error = %e, "Failed to send tool list change notification");
-            } else {
-                info!(
-                    component_id = %id,
-                    "Sent tool list changed notification after loading component"
-                );
+            match server_peer.notify_tool_list_changed().await {
+                Ok(_) => {
+                    info!(
+                        component_id = %id,
+                        "Tool list change notification sent to server peer after loading component"
+                    );
+                }
+                Err(e) => {
+                    error!(
+                        component_id = %id,
+                        error = %e,
+                        "Failed to send tool list change notification to server peer after loading component"
+                    );
+                }
             }
 
             Ok(CallToolResult {

Copilot uses AI. Check for mistakes.
}

Ok(CallToolResult {
Expand All @@ -91,7 +94,7 @@ pub(crate) async fn handle_load_component(
pub(crate) async fn handle_unload_component(
req: &CallToolRequestParam,
lifecycle_manager: &LifecycleManager,
server_peer: Option<Peer<RoleServer>>,
server_peer: Peer<RoleServer>,
) -> Result<CallToolResult> {
let args = extract_args_from_request(req)?;

Expand All @@ -110,15 +113,13 @@ pub(crate) async fn handle_unload_component(

let contents = vec![Content::text(status_text)];

if let Some(peer) = server_peer {
if let Err(e) = peer.notify_tool_list_changed().await {
error!("Failed to send tool list change notification: {}", e);
} else {
info!(
"Sent tool list changed notification after unloading component {}",
id
);
}
if let Err(e) = server_peer.notify_tool_list_changed().await {
error!("Failed to send tool list change notification: {}", e);
} else {
info!(
"Sent tool list changed notification after unloading component {}",
id
);
}

Ok(CallToolResult {
Expand Down
2 changes: 1 addition & 1 deletion crates/mcp-server/src/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub async fn handle_tools_list(lifecycle_manager: &LifecycleManager) -> Result<V
pub async fn handle_tools_call(
req: CallToolRequestParam,
lifecycle_manager: &LifecycleManager,
server_peer: Option<Peer<RoleServer>>,
server_peer: Peer<RoleServer>,
) -> Result<Value> {
info!("Handling tool call");

Expand Down
10 changes: 3 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,11 @@ fn get_component_dir() -> PathBuf {
#[derive(Clone)]
pub struct McpServer {
lifecycle_manager: LifecycleManager,
peer: Option<rmcp::service::Peer<RoleServer>>,
}

impl McpServer {
pub fn new(lifecycle_manager: LifecycleManager) -> Self {
Self {
lifecycle_manager,
peer: None,
}
Self { lifecycle_manager }
}
}

Expand Down Expand Up @@ -114,9 +110,9 @@ Key points:
fn call_tool<'a>(
&'a self,
params: CallToolRequestParam,
_ctx: RequestContext<RoleServer>,
ctx: RequestContext<RoleServer>,
) -> Pin<Box<dyn Future<Output = Result<CallToolResult, ErrorData>> + Send + 'a>> {
let peer_clone = self.peer.clone();
let peer_clone = ctx.peer.clone();

Box::pin(async move {
let result = handle_tools_call(params, &self.lifecycle_manager, peer_clone).await;
Expand Down
234 changes: 234 additions & 0 deletions tests/transport_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,240 @@ async fn test_stdio_transport() -> Result<()> {
Ok(())
}

#[test(tokio::test)]
async fn test_tool_list_notification() -> Result<()> {
// Create a temporary directory for this test to avoid loading existing components
let temp_dir = tempfile::tempdir()?;
let plugin_dir_arg = format!("--plugin-dir={}", temp_dir.path().display());

// Get the path to the built binary
let binary_path = std::env::current_dir()
.context("Failed to get current directory")?
.join("target/debug/wassette");

// Start the server with stdio transport (disable logs to avoid stdout pollution)
let mut child = tokio::process::Command::new(&binary_path)
.args(["serve", &plugin_dir_arg])
.env("RUST_LOG", "off")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("Failed to start wassette with stdio transport")?;

let stdin = child.stdin.take().context("Failed to get stdin handle")?;
let stdout = child.stdout.take().context("Failed to get stdout handle")?;
let stderr = child.stderr.take().context("Failed to get stderr handle")?;

let mut stdin = stdin;
let mut stdout = BufReader::new(stdout);
let mut stderr = BufReader::new(stderr);

// Give the server time to start
tokio::time::sleep(Duration::from_millis(1000)).await;

Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Hard-coded sleep duration makes the test slower and potentially flaky. Consider using a more robust approach like polling the process status or checking for readiness indicators."

Suggested change
// Wait for the server to become ready by reading stdout for a readiness message, with a timeout
let mut ready = false;
let readiness_timeout = Duration::from_secs(5);
let mut stdout_line = String::new();
let readiness_result = timeout(readiness_timeout, async {
loop {
stdout_line.clear();
let n = stdout.read_line(&mut stdout_line).await?;
if n == 0 {
// EOF reached
break;
}
// Adjust the readiness message as appropriate for your server
if stdout_line.contains("Listening") || stdout_line.contains("Server started") {
ready = true;
break;
}
}
Ok::<(), anyhow::Error>(())
}).await;
if !ready {
let mut stderr_output = String::new();
let _ = stderr.read_line(&mut stderr_output).await;
return Err(anyhow::anyhow!(
"Server did not become ready in time. Last stdout: '{}', stderr: '{}'",
stdout_line,
stderr_output
));
}

Copilot uses AI. Check for mistakes.
// Check if the process is still running
if let Ok(Some(status)) = child.try_wait() {
let mut stderr_output = String::new();
let _ = stderr.read_line(&mut stderr_output).await;
return Err(anyhow::anyhow!(
"Server process exited with status: {:?}, stderr: {}",
status,
stderr_output
));
}

// Send MCP initialize request
let initialize_request = r#"{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test-client", "version": "1.0.0"}}, "id": 1}
"#;

stdin.write_all(initialize_request.as_bytes()).await?;
stdin.flush().await?;

// Read and verify initialize response
let mut response_line = String::new();
match tokio::time::timeout(
Duration::from_secs(10),
stdout.read_line(&mut response_line),
)
.await
{
Ok(Ok(_)) => {}
Ok(Err(e)) => {
return Err(anyhow::anyhow!("Failed to read initialize response: {}", e));
}
Err(_) => {
let mut stderr_output = String::new();
let _ =
tokio::time::timeout(Duration::from_secs(1), stderr.read_line(&mut stderr_output))
.await;
return Err(anyhow::anyhow!(
"Timeout waiting for initialize response. Stderr: {}",
stderr_output
));
}
}

let response: serde_json::Value =
serde_json::from_str(&response_line).context("Failed to parse initialize response")?;

assert_eq!(response["jsonrpc"], "2.0");
assert_eq!(response["id"], 1);
assert!(response["result"].is_object());

// Send initialized notification (required by MCP protocol)
let initialized_notification = r#"{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}
"#;

stdin.write_all(initialized_notification.as_bytes()).await?;
stdin.flush().await?;

// Step 1: Send initial list_tools request to get baseline tool count
let list_tools_request = r#"{"jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": 2}
"#;

stdin.write_all(list_tools_request.as_bytes()).await?;
stdin.flush().await?;

// Read initial tools list response
let mut tools_response_line = String::new();
tokio::time::timeout(
Duration::from_secs(10),
stdout.read_line(&mut tools_response_line),
)
.await
.context("Timeout waiting for initial tools/list response")?
.context("Failed to read initial tools/list response")?;

let initial_tools_response: serde_json::Value = serde_json::from_str(&tools_response_line)
.context("Failed to parse initial tools/list response")?;

assert_eq!(initial_tools_response["jsonrpc"], "2.0");
assert_eq!(initial_tools_response["id"], 2);
assert!(initial_tools_response["result"].is_object());
assert!(initial_tools_response["result"]["tools"].is_array());

let initial_tools = &initial_tools_response["result"]["tools"]
.as_array()
.unwrap();
let initial_tool_count = initial_tools.len();
println!("Initial tool count: {}", initial_tool_count);

// Build a component to load
let component_path = build_fetch_component().await?;

// Step 2: Load a component using the load-component tool
let load_component_request = format!(
r#"{{"jsonrpc": "2.0", "method": "tools/call", "params": {{"name": "load-component", "arguments": {{"path": "file://{}"}}}}, "id": 3}}
"#,
component_path.to_str().unwrap()
);

stdin.write_all(load_component_request.as_bytes()).await?;
stdin.flush().await?;

// Read the tool list change notification first (this is what we're testing!)
let mut notification_line = String::new();
tokio::time::timeout(
Duration::from_secs(15),
stdout.read_line(&mut notification_line),
)
.await
.context("Timeout waiting for tool list change notification")?
.context("Failed to read tool list change notification")?;

let notification: serde_json::Value = serde_json::from_str(&notification_line)
.context("Failed to parse tool list change notification")?;

// Verify we received a tools/list_changed notification
assert_eq!(notification["jsonrpc"], "2.0");
assert_eq!(notification["method"], "notifications/tools/list_changed");
println!("✓ Received tools/list_changed notification as expected");

// Read the actual load-component response
let mut load_response_line = String::new();
tokio::time::timeout(
Duration::from_secs(15),
stdout.read_line(&mut load_response_line),
)
.await
.context("Timeout waiting for load-component response")?
.context("Failed to read load-component response")?;

let load_response: serde_json::Value = serde_json::from_str(&load_response_line)
.context("Failed to parse load-component response")?;

assert_eq!(load_response["jsonrpc"], "2.0");
assert_eq!(load_response["id"], 3);

// Check if the load succeeded
if load_response["error"].is_object() {
panic!("Failed to load component: {}", load_response["error"]);
}
assert!(load_response["result"].is_object());
println!("✓ Component loaded successfully");

// Step 3: Send another list_tools request to verify tools were added
let list_tools_request_after = r#"{"jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": 4}
"#;

stdin.write_all(list_tools_request_after.as_bytes()).await?;
stdin.flush().await?;

// Read updated tools list response
let mut updated_tools_response_line = String::new();
tokio::time::timeout(
Duration::from_secs(10),
stdout.read_line(&mut updated_tools_response_line),
)
.await
.context("Timeout waiting for updated tools/list response")?
.context("Failed to read updated tools/list response")?;

let updated_tools_response: serde_json::Value =
serde_json::from_str(&updated_tools_response_line)
.context("Failed to parse updated tools/list response")?;

assert_eq!(updated_tools_response["jsonrpc"], "2.0");
assert_eq!(updated_tools_response["id"], 4);
assert!(updated_tools_response["result"].is_object());
assert!(updated_tools_response["result"]["tools"].is_array());

let updated_tools = &updated_tools_response["result"]["tools"]
.as_array()
.unwrap();
let updated_tool_count = updated_tools.len();
println!("Updated tool count: {}", updated_tool_count);

// Verify that the tool count increased after loading the component
assert!(
updated_tool_count > initial_tool_count,
"Tool count should have increased from {} to {}, but it didn't",
initial_tool_count,
updated_tool_count
);
println!("✓ Tool count increased as expected after loading component");

// Verify that the new tools from the component are present
let updated_tool_names: Vec<String> = updated_tools
.iter()
.map(|tool| tool["name"].as_str().unwrap_or("").to_string())
.collect();

// The fetch component should add a "fetch" tool
assert!(
updated_tool_names.contains(&"fetch".to_string()),
"Expected 'fetch' tool from loaded component, but found tools: {:?}",
updated_tool_names
);
println!("✓ New tools from loaded component are present in the list");

// Clean up
child.kill().await.ok();

Ok(())
}

#[test(tokio::test)]
async fn test_http_transport() -> Result<()> {
// Use a random available port to avoid conflicts
Expand Down