Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions otel-worker-cli/src/commands/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio::sync::{mpsc, watch, RwLock};
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, warn};
use url::Url;
Expand Down Expand Up @@ -136,17 +136,17 @@
}
}

async fn register_session(&mut self) -> (String, mpsc::Receiver<ServerMessage>) {
async fn register_session(&mut self) -> (McpSession, mpsc::Receiver<ServerMessage>) {
loop {
let id = Uuid::new_v4().to_string();

let mut sessions = self.sessions.write().await;
match sessions.entry(id.clone()) {
Entry::Occupied(_) => continue,
Entry::Vacant(entry) => {
let (mcp_session, messages) = McpSession::new();
entry.insert(mcp_session);
break (id, messages);
let (session, messages) = McpSession::new(id);
entry.insert(session.clone());
break (session, messages);
}
}
}
Expand Down Expand Up @@ -186,7 +186,8 @@
// Go through all the sessions and drop them. This makes sure that the
// receiver will also be closed since all the transmitters will be gone.
let mut sessions = self.sessions.write().await;
for (session_id, _mcp_session) in sessions.drain() {
for (session_id, session) in sessions.drain() {
session.shutdown().await;
debug!(?session_id, "Closing session");
}
}
Expand All @@ -195,27 +196,48 @@
fn is_shutting_down(&self) -> bool {
self.shutdown.load(Ordering::Relaxed)
}

async fn cleanup_session(&self, session_id: String) {
debug!(?session_id, "Cleaning up session");

let mut sessions = self.sessions.write().await;
match sessions.remove(&session_id) {
Some(session) => {
session.shutdown().await;
debug!(?session_id, "Cleanup successful");
}
None => warn!(?session_id, "Cleanup attempted but session was not found"),
};
}
}

#[derive(Debug, Clone)]
struct McpSession {
/// An ID unique to this session
id: String,

/// This channel is used to send message to the receiver which should
/// send these message to the MCP client using the transport that is being
/// used.
///
/// The receiver channel is returned during the creation of the [`McpSession`].
messages: mpsc::Sender<ServerMessage>,

shutdown: watch::Sender<bool>,
}

impl McpSession {
/// Create a new [`McpSession`]. The [`mpsc::Receiver`] returned will
/// receive any message intended for the Mcp client.
pub fn new() -> (Self, mpsc::Receiver<ServerMessage>) {
pub fn new(id: String) -> (Self, mpsc::Receiver<ServerMessage>) {
let (shutdown, _on_shutdown) = watch::channel(false);
let (messages_tx, messages_rx) = mpsc::channel(100);

(
Self {
id,
messages: messages_tx,
shutdown,
},
messages_rx,
)
Expand Down Expand Up @@ -258,6 +280,21 @@
let message = ServerMessage::Notification(notification);
self.send_message(message).await
}

/// Completes when the underlying session is closed.
async fn on_shutdown(&self) {
self.shutdown.subscribe().changed().await;

Check warning on line 286 in otel-worker-cli/src/commands/mcp.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

unused `std::result::Result` that must be used

Check warning on line 286 in otel-worker-cli/src/commands/mcp.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

unused `std::result::Result` that must be used

Check warning on line 286 in otel-worker-cli/src/commands/mcp.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

[clippy] reported by reviewdog 🐶 warning: unused `std::result::Result` that must be used --> otel-worker-cli/src/commands/mcp.rs:286:9 | 286 | self.shutdown.subscribe().changed().await; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: this `Result` may be an `Err` variant, which should be handled = note: `#[warn(unused_must_use)]` on by default help: use `let _ = ...` to ignore the resulting value | 286 | let _ = self.shutdown.subscribe().changed().await; | +++++++ Raw Output: otel-worker-cli/src/commands/mcp.rs:286:9:w:warning: unused `std::result::Result` that must be used --> otel-worker-cli/src/commands/mcp.rs:286:9 | 286 | self.shutdown.subscribe().changed().await; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: this `Result` may be an `Err` variant, which should be handled = note: `#[warn(unused_must_use)]` on by default help: use `let _ = ...` to ignore the resulting value | 286 | let _ = self.shutdown.subscribe().changed().await; | +++++++ __END__
}

async fn shutdown(&self) {
self.shutdown.send(true);

Check warning on line 290 in otel-worker-cli/src/commands/mcp.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

unused `std::result::Result` that must be used

Check warning on line 290 in otel-worker-cli/src/commands/mcp.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

unused `std::result::Result` that must be used

Check warning on line 290 in otel-worker-cli/src/commands/mcp.rs

View workflow job for this annotation

GitHub Actions / Create binary for x86_64-unknown-linux-gnu

[clippy] reported by reviewdog 🐶 warning: unused `std::result::Result` that must be used --> otel-worker-cli/src/commands/mcp.rs:290:9 | 290 | self.shutdown.send(true); | ^^^^^^^^^^^^^^^^^^^^^^^^ | = note: this `Result` may be an `Err` variant, which should be handled help: use `let _ = ...` to ignore the resulting value | 290 | let _ = self.shutdown.send(true); | +++++++ Raw Output: otel-worker-cli/src/commands/mcp.rs:290:9:w:warning: unused `std::result::Result` that must be used --> otel-worker-cli/src/commands/mcp.rs:290:9 | 290 | self.shutdown.send(true); | ^^^^^^^^^^^^^^^^^^^^^^^^ | = note: this `Result` may be an `Err` variant, which should be handled help: use `let _ = ...` to ignore the resulting value | 290 | let _ = self.shutdown.send(true); | +++++++ __END__
}

/// Completes when the messages channel's receiver side closes. Currently
/// used by the sse implementation, when the client closes the sse page.
async fn on_messages_close(&self) {
self.messages.closed().await
}
}

/// Give the otel_work_url generate the websocket uri. This will convert the
Expand Down Expand Up @@ -449,14 +486,14 @@
}

async fn handle_tool_call(
state: &McpState,
McpState { api_client, .. }: &McpState,
session: &McpSession,
request_id: RequestId,
params: CallToolRequestParams,
) -> Result<()> {
match params.name.as_str() {
"get_trace" => match params.arguments.try_into() {
Ok(params) => handle_tool_call_get_trace(state, session, request_id, params).await,
Ok(params) => handle_tool_call_get_trace(api_client, session, request_id, params).await,
Err(_) => {
session
.send_error(request_id, JsonrpcErrorError::invalid_params())
Expand Down Expand Up @@ -526,12 +563,12 @@
}

async fn handle_tool_call_get_trace(
state: &McpState,
api_client: &ApiClient,
session: &McpSession,
request_id: RequestId,
params: GetTraceParams,
) -> std::result::Result<(), anyhow::Error> {
let trace = state.api_client.trace_get(params.trace_id).await?;
let trace = api_client.trace_get(params.trace_id).await?;
let response = CallToolResult::text_content(
serde_json::to_string(&trace).expect("unable to serialize to trace"),
None,
Expand Down
36 changes: 30 additions & 6 deletions otel-worker-cli/src/commands/mcp/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::process::exit;
use std::time::Instant;
use tokio::net::TcpListener;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, info_span, warn, Instrument};
use tracing::{debug, info, info_span, trace, warn, Instrument};

pub(crate) async fn serve(listen_address: &str, state: McpState) -> Result<()> {
let listener = TcpListener::bind(listen_address)
Expand Down Expand Up @@ -94,14 +94,36 @@ async fn sse_handler(State(mut state): State<McpState>) -> Response {
return (StatusCode::SERVICE_UNAVAILABLE, "server is shutting down").into_response();
}

let (session_id, messages) = state.register_session().await;
let (session, messages) = state.register_session().await;
let session_id = session.id.clone();

// Spawn a task which will cleanup the session once the client disconnects
let cleanup_state = state.clone();
tokio::spawn(async move {
let state = cleanup_state;

tokio::select! {
_ = session.on_messages_close() => {
debug!(session_id=?session.id, "Session has disconnected");

// If we are shutting down we don't want to do any cleanup since that is
// already happening and we would battle for the rwlock.
if state.is_shutting_down() {
return;
}

state.cleanup_session(session.id).await;
},
_ = session.on_shutdown() => {
trace!(session_id=?session.id, "Watchdog stopped");
}
};
});

// This message needs to be send as soon as the client accesses the page.
let initial_event = futures::stream::once(async move {
let querystring = serde_urlencoded::to_string(JsonRpcQuery::new(Some(session_id)))
.expect("querystring encoding is expected to work");
// We need to explicitly specify the error type, since we are not
// constructing this anywhere

Event::default()
.event("endpoint")
Expand All @@ -117,6 +139,8 @@ async fn sse_handler(State(mut state): State<McpState>) -> Response {
.expect("unable to serialize data")
});

// We need to explicitly specify the error type in map, since we are not
// constructing this type anywhere
Sse::new(initial_event.chain(events).map(Ok::<Event, Infallible>))
.keep_alive(KeepAlive::new())
.into_response()
Expand All @@ -128,8 +152,8 @@ async fn log_and_metrics(req: Request, next: Next) -> impl IntoResponse {
let method = req.method().to_string();
let matched_path = req
.extensions()
.get::<MatchedPath>()
.map(|p| p.as_str().to_string());
.get()
.map(|p: &MatchedPath| p.as_str().to_string());

let span = info_span!("http_request",
%method,
Expand Down
6 changes: 1 addition & 5 deletions otel-worker-cli/src/commands/mcp/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ use tracing::{debug, error, info};
pub(crate) async fn serve(mut state: McpState) -> Result<()> {
// Stdio only has support for a single session, so just start that at the
// beginning and use it throughout the life cycle.
let (session_id, mut messages) = state.register_session().await;
let session = state
.get_session(session_id)
.await
.expect("should have session");
let (session, mut messages) = state.register_session().await;

// spawn two tasks, one to read lines on stdin, parse payloads, and dispatch
// to super::*. The other has to read from messages and serialize them
Expand Down