Skip to content

Commit 0cc4d69

Browse files
committed
feat: add Streamable HTTP server
Signed-off-by: mrizzi <mrizzi@redhat.com>
1 parent 5efcfdf commit 0cc4d69

File tree

4 files changed

+125
-25
lines changed

4 files changed

+125
-25
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@ path = "src/bin/stdio.rs"
1111
name = "sse"
1212
path = "src/bin/sse.rs"
1313

14+
[[bin]]
15+
name = "streamable"
16+
path = "src/bin/streamhttp.rs"
17+
1418
[dependencies]
1519
anyhow = "1.0"
20+
axum = { version = "0.8", features = ["macros"] }
1621
chrono = "0.4"
1722
openid = "0.17"
1823
reqwest = { version = "0.12", features = ["json", "blocking"] }
19-
rmcp = { version = "0.2.1", features = ["server", "transport-sse-server", "transport-io"] }
24+
rmcp = { version = "0.2.1", features = ["server", "transport-sse-server", "transport-io", "transport-streamable-http-server"] }
2025
serde = { version = "1.0", features = ["derive"] }
2126
serde_json = "1.0"
2227
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "io-std", "signal"] }

src/bin/streamhttp.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use rmcp::transport::streamable_http_server::{
2+
StreamableHttpService, session::local::LocalSessionManager,
3+
};
4+
use tracing_subscriber::{
5+
layer::SubscriberExt,
6+
util::SubscriberInitExt,
7+
{self},
8+
};
9+
mod common;
10+
use common::trustify::Trustify;
11+
12+
const BIND_ADDRESS: &str = "[::]:8000";
13+
14+
#[tokio::main]
15+
async fn main() -> anyhow::Result<()> {
16+
tracing_subscriber::registry()
17+
.with(
18+
tracing_subscriber::EnvFilter::try_from_default_env()
19+
.unwrap_or_else(|_| "debug".to_string().into()),
20+
)
21+
.with(tracing_subscriber::fmt::layer())
22+
.init();
23+
24+
let service = StreamableHttpService::new(
25+
|| Ok(Trustify::new()),
26+
LocalSessionManager::default().into(),
27+
Default::default(),
28+
);
29+
30+
let router = axum::Router::new().nest_service("/mcp", service);
31+
let tcp_listener = tokio::net::TcpListener::bind(BIND_ADDRESS).await?;
32+
let _ = axum::serve(tcp_listener, router)
33+
.with_graceful_shutdown(async { tokio::signal::ctrl_c().await.unwrap() })
34+
.await;
35+
Ok(())
36+
}

tests/integration_test.rs

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,10 @@
11
use anyhow::Error;
22
use rmcp::{ServiceExt, transport::TokioChildProcess};
3-
use serde_json::json;
3+
use serde_json::Value;
44
use std::{env, process::Command};
55
use trustify_test_context::subset::ContainsSubset;
66

7-
#[test]
8-
fn tools_list_mcp_inspector() {
9-
let inspector_commmand = format!(
10-
"npx @modelcontextprotocol/inspector --cli {} --method tools/list",
11-
env!("CARGO_BIN_EXE_stdio")
12-
);
13-
log::debug!("inspector command: {}", inspector_commmand);
14-
let output = Command::new("sh")
15-
.arg("-c")
16-
.arg(inspector_commmand)
17-
.env("API_URL", "")
18-
.env("OPENID_ISSUER_URL", "")
19-
.env("OPENID_CLIENT_ID", "")
20-
.env("OPENID_CLIENT_SECRET", "")
21-
.output()
22-
.expect("failed to execute process");
23-
24-
let result = serde_json::from_str(str::from_utf8(&output.stdout).unwrap_or_default())
25-
.unwrap_or_default();
26-
log::debug!("{:#?}", result);
27-
log::debug!("{:#?}", str::from_utf8(&output.stderr).unwrap_or_default());
28-
let expected_result = json!({
7+
const EXPECTED_TOOLS_LIST_RESPONSE: &str = r#"{
298
"tools": [
309
{
3110
"name": "trustify_vulnerabilities_list",
@@ -261,10 +240,47 @@ fn tools_list_mcp_inspector() {
261240
}
262241
}
263242
]
264-
});
243+
}"#;
244+
245+
#[test]
246+
fn tools_list_mcp_inspector_stdio() {
247+
let inspector_commmand = format!(
248+
"npx @modelcontextprotocol/inspector --cli {} --method tools/list",
249+
env!("CARGO_BIN_EXE_stdio")
250+
);
251+
log::debug!("inspector command: {}", inspector_commmand);
252+
let output = Command::new("sh")
253+
.arg("-c")
254+
.arg(inspector_commmand)
255+
.env("API_URL", "")
256+
.env("OPENID_ISSUER_URL", "")
257+
.env("OPENID_CLIENT_ID", "")
258+
.env("OPENID_CLIENT_SECRET", "")
259+
.output()
260+
.expect("failed to execute process");
261+
262+
let result = serde_json::from_str(str::from_utf8(&output.stdout).unwrap_or_default())
263+
.unwrap_or_default();
264+
log::debug!("{:#?}", result);
265+
log::debug!("{:#?}", str::from_utf8(&output.stderr).unwrap_or_default());
266+
let expected_result: Value =
267+
serde_json::from_str(EXPECTED_TOOLS_LIST_RESPONSE).unwrap_or_default();
265268
assert!(expected_result.contains_subset(result));
266269
}
267270

271+
#[test]
272+
fn tools_list_mcp_inspector_sse() -> Result<(), Error> {
273+
run_server_test(env!("CARGO_BIN_EXE_sse"), "http://localhost:8081/sse")
274+
}
275+
276+
#[test]
277+
fn tools_list_mcp_inspector_streamable_http() -> Result<(), Error> {
278+
run_server_test(
279+
env!("CARGO_BIN_EXE_streamable"),
280+
"http://localhost:8000/mcp --transport http",
281+
)
282+
}
283+
268284
#[tokio::test]
269285
async fn tools_list_mcp_client() -> Result<(), Error> {
270286
let mut command = tokio::process::Command::new(env!("CARGO_BIN_EXE_stdio"));
@@ -288,3 +304,33 @@ async fn tools_list_mcp_client() -> Result<(), Error> {
288304

289305
Ok(())
290306
}
307+
308+
fn run_server_test(server_command: &str, inspector_cli_parameter: &str) -> Result<(), Error> {
309+
let mut server = Command::new("sh")
310+
.arg("-c")
311+
.arg(server_command)
312+
.env("API_URL", "")
313+
.env("OPENID_ISSUER_URL", "")
314+
.env("OPENID_CLIENT_ID", "")
315+
.env("OPENID_CLIENT_SECRET", "")
316+
.spawn()?;
317+
318+
let inspector_commmand = format!(
319+
"npx @modelcontextprotocol/inspector --cli {} --method tools/list",
320+
inspector_cli_parameter
321+
);
322+
log::debug!("inspector command: {}", inspector_commmand);
323+
let output = Command::new("sh")
324+
.arg("-c")
325+
.arg(inspector_commmand)
326+
.output()?;
327+
328+
let result: Value = serde_json::from_str(str::from_utf8(&output.stdout)?)?;
329+
log::debug!("{:#?}", result);
330+
331+
let expected_result: Value = serde_json::from_str(EXPECTED_TOOLS_LIST_RESPONSE)?;
332+
assert!(expected_result.contains_subset(result));
333+
334+
server.kill()?;
335+
Ok(())
336+
}

0 commit comments

Comments
 (0)