diff --git a/crates/openfang-api/src/openai_compat.rs b/crates/openfang-api/src/openai_compat.rs index d5d1cebd85..317a8a7fb1 100644 --- a/crates/openfang-api/src/openai_compat.rs +++ b/crates/openfang-api/src/openai_compat.rs @@ -379,7 +379,15 @@ async fn stream_response( let (mut rx, _handle) = state .kernel - .send_message_streaming(agent_id, message, Some(kernel_handle), None, None, None) + .send_message_streaming( + agent_id, + message, + Some(kernel_handle), + None, + None, + None, + None, + ) .map_err(|e| format!("Streaming setup failed: {e}"))?; let (tx, stream_rx) = tokio::sync::mpsc::channel::>(64); diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index b3ce41dc7b..ae8cb73c8c 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -386,6 +386,7 @@ pub async fn send_message( content_blocks, req.sender_id, req.sender_name, + None, ) .await { @@ -435,6 +436,119 @@ pub async fn send_message( } } +/// POST /api/agents/:id/sessions/:session_id/message — Send a message to a specific session of an agent. +pub async fn send_message_to_session( + State(state): State>, + Path((id, session_id_str)): Path<(String, String)>, + Json(req): Json, +) -> impl IntoResponse { + let agent_id: AgentId = match id.parse() { + Ok(id) => id, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid agent ID"})), + ); + } + }; + + let session_id: openfang_types::agent::SessionId = match session_id_str.parse::() { + Ok(u) => openfang_types::agent::SessionId(u), + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid session ID"})), + ); + } + }; + + // SECURITY: Reject oversized messages to prevent OOM / LLM token abuse. + const MAX_MESSAGE_SIZE: usize = 64 * 1024; // 64KB + if req.message.len() > MAX_MESSAGE_SIZE { + return ( + StatusCode::PAYLOAD_TOO_LARGE, + Json(serde_json::json!({"error": "Message too large (max 64KB)"})), + ); + } + + // Check agent exists before processing + if state.kernel.registry.get(agent_id).is_none() { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "Agent not found"})), + ); + } + + // Resolve file attachments into image content blocks. + let content_blocks = if !req.attachments.is_empty() { + let image_blocks = resolve_attachments(&req.attachments); + if image_blocks.is_empty() { + None + } else { + Some(image_blocks) + } + } else { + None + }; + + let kernel_handle: Arc = state.kernel.clone() as Arc; + match state + .kernel + .send_message_with_handle_and_blocks( + agent_id, + &req.message, + Some(kernel_handle), + content_blocks, + req.sender_id, + req.sender_name, + Some(session_id), + ) + .await + { + Ok(result) => { + // Strip ... blocks from model output + let cleaned = crate::ws::strip_think_tags(&result.response); + + let response = if result.silent { + String::new() + } else if cleaned.trim().is_empty() { + format!( + "[The agent completed processing but returned no text response. ({} in / {} out | {} iter)]", + result.total_usage.input_tokens, + result.total_usage.output_tokens, + result.iterations, + ) + } else { + cleaned + }; + ( + StatusCode::OK, + Json(serde_json::json!(MessageResponse { + response, + input_tokens: result.total_usage.input_tokens, + output_tokens: result.total_usage.output_tokens, + iterations: result.iterations, + cost_usd: result.cost_usd, + })), + ) + } + Err(e) => { + tracing::warn!("send_message_to_session failed for agent {id}: {e}"); + let status = if format!("{e}").contains("Agent not found") { + StatusCode::NOT_FOUND + } else if format!("{e}").contains("quota") || format!("{e}").contains("Quota") { + StatusCode::TOO_MANY_REQUESTS + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + ( + status, + Json(serde_json::json!({"error": format!("Message delivery failed: {e}")})), + ) + } + } +} + /// GET /api/agents/:id/session — Get agent session (conversation history). pub async fn get_agent_session( State(state): State>, @@ -462,137 +576,7 @@ pub async fn get_agent_session( match state.kernel.memory.get_session(entry.session_id) { Ok(Some(session)) => { - // Two-pass approach: ToolUse blocks live in Assistant messages while - // ToolResult blocks arrive in subsequent User messages. Pass 1 - // collects all tool_use entries keyed by id; pass 2 attaches results. - - // Pass 1: build messages and a lookup from tool_use_id → (msg_idx, tool_idx) - use base64::Engine as _; - let mut built_messages: Vec = Vec::new(); - let mut tool_use_index: std::collections::HashMap = - std::collections::HashMap::new(); - - for m in &session.messages { - let mut tools: Vec = Vec::new(); - let mut msg_images: Vec = Vec::new(); - let content = match &m.content { - openfang_types::message::MessageContent::Text(t) => t.clone(), - openfang_types::message::MessageContent::Blocks(blocks) => { - let mut texts = Vec::new(); - for b in blocks { - match b { - openfang_types::message::ContentBlock::Text { text, .. } => { - texts.push(text.clone()); - } - openfang_types::message::ContentBlock::Image { - media_type, - data, - } => { - texts.push("[Image]".to_string()); - // Persist image to upload dir so it can be - // served back when loading session history. - let file_id = uuid::Uuid::new_v4().to_string(); - let upload_dir = std::env::temp_dir().join("openfang_uploads"); - let _ = std::fs::create_dir_all(&upload_dir); - if let Ok(bytes) = - base64::engine::general_purpose::STANDARD.decode(data) - { - let _ = std::fs::write(upload_dir.join(&file_id), &bytes); - UPLOAD_REGISTRY.insert( - file_id.clone(), - UploadMeta { - filename: format!( - "image.{}", - media_type.rsplit('/').next().unwrap_or("png") - ), - content_type: media_type.clone(), - }, - ); - msg_images.push(serde_json::json!({ - "file_id": file_id, - "filename": format!("image.{}", media_type.rsplit('/').next().unwrap_or("png")), - })); - } - } - openfang_types::message::ContentBlock::ToolUse { - id, - name, - input, - .. - } => { - let tool_idx = tools.len(); - tools.push(serde_json::json!({ - "name": name, - "input": input, - "running": false, - "expanded": false, - })); - // Will be filled after this loop when we know msg_idx - tool_use_index.insert(id.clone(), (usize::MAX, tool_idx)); - } - // ToolResult blocks are handled in pass 2 - openfang_types::message::ContentBlock::ToolResult { .. } => {} - _ => {} - } - } - texts.join("\n") - } - }; - // Skip messages that are purely tool results (User role with only ToolResult blocks) - if content.is_empty() && tools.is_empty() { - continue; - } - let msg_idx = built_messages.len(); - // Fix up the msg_idx for tool_use entries registered with sentinel - for (_, (mi, _)) in tool_use_index.iter_mut() { - if *mi == usize::MAX { - *mi = msg_idx; - } - } - let mut msg = serde_json::json!({ - "role": format!("{:?}", m.role), - "content": content, - }); - if !tools.is_empty() { - msg["tools"] = serde_json::Value::Array(tools); - } - if !msg_images.is_empty() { - msg["images"] = serde_json::Value::Array(msg_images); - } - built_messages.push(msg); - } - - // Pass 2: walk messages again and attach ToolResult to the correct tool - for m in &session.messages { - if let openfang_types::message::MessageContent::Blocks(blocks) = &m.content { - for b in blocks { - if let openfang_types::message::ContentBlock::ToolResult { - tool_use_id, - content: result, - is_error, - .. - } = b - { - if let Some(&(msg_idx, tool_idx)) = tool_use_index.get(tool_use_id) { - if let Some(msg) = built_messages.get_mut(msg_idx) { - if let Some(tools_arr) = - msg.get_mut("tools").and_then(|v| v.as_array_mut()) - { - if let Some(tool_obj) = tools_arr.get_mut(tool_idx) { - tool_obj["result"] = - serde_json::Value::String(result.clone()); - tool_obj["is_error"] = - serde_json::Value::Bool(*is_error); - } - } - } - } - } - } - } - } - - let messages = built_messages; + let messages = format_session_messages(&session); ( StatusCode::OK, Json(serde_json::json!({ @@ -625,6 +609,132 @@ pub async fn get_agent_session( } } +fn format_session_messages(session: &openfang_memory::session::Session) -> Vec { + // Two-pass approach: ToolUse blocks live in Assistant messages while + // ToolResult blocks arrive in subsequent User messages. Pass 1 + // collects all tool_use entries keyed by id; pass 2 attaches results. + + // Pass 1: build messages and a lookup from tool_use_id → (msg_idx, tool_idx) + use base64::Engine as _; + let mut built_messages: Vec = Vec::new(); + let mut tool_use_index: std::collections::HashMap = + std::collections::HashMap::new(); + + for m in &session.messages { + let mut tools: Vec = Vec::new(); + let mut msg_images: Vec = Vec::new(); + let content = match &m.content { + openfang_types::message::MessageContent::Text(t) => t.clone(), + openfang_types::message::MessageContent::Blocks(blocks) => { + let mut texts = Vec::new(); + for b in blocks { + match b { + openfang_types::message::ContentBlock::Text { text, .. } => { + texts.push(text.clone()); + } + openfang_types::message::ContentBlock::Image { media_type, data } => { + texts.push("[Image]".to_string()); + // Persist image to upload dir so it can be + // served back when loading session history. + let file_id = uuid::Uuid::new_v4().to_string(); + let upload_dir = std::env::temp_dir().join("openfang_uploads"); + let _ = std::fs::create_dir_all(&upload_dir); + if let Ok(bytes) = + base64::engine::general_purpose::STANDARD.decode(data) + { + let _ = std::fs::write(upload_dir.join(&file_id), &bytes); + UPLOAD_REGISTRY.insert( + file_id.clone(), + UploadMeta { + filename: format!( + "image.{}", + media_type.rsplit('/').next().unwrap_or("png") + ), + content_type: media_type.clone(), + }, + ); + msg_images.push(serde_json::json!({ + "file_id": file_id, + "filename": format!("image.{}", media_type.rsplit('/').next().unwrap_or("png")), + })); + } + } + openfang_types::message::ContentBlock::ToolUse { + id, name, input, .. + } => { + let tool_idx = tools.len(); + tools.push(serde_json::json!({ + "name": name, + "input": input, + "running": false, + "expanded": false, + })); + // Will be filled after this loop when we know msg_idx + tool_use_index.insert(id.clone(), (usize::MAX, tool_idx)); + } + // ToolResult blocks are handled in pass 2 + openfang_types::message::ContentBlock::ToolResult { .. } => {} + _ => {} + } + } + texts.join("\n") + } + }; + // Skip messages that are purely tool results (User role with only ToolResult blocks) + if content.is_empty() && tools.is_empty() { + continue; + } + let msg_idx = built_messages.len(); + // Fix up the msg_idx for tool_use entries registered with sentinel + for (_, (mi, _)) in tool_use_index.iter_mut() { + if *mi == usize::MAX { + *mi = msg_idx; + } + } + let mut msg = serde_json::json!({ + "role": format!("{:?}", m.role), + "content": content, + }); + if !tools.is_empty() { + msg["tools"] = serde_json::Value::Array(tools); + } + if !msg_images.is_empty() { + msg["images"] = serde_json::Value::Array(msg_images); + } + built_messages.push(msg); + } + + // Pass 2: walk messages again and attach ToolResult to the correct tool + for m in &session.messages { + if let openfang_types::message::MessageContent::Blocks(blocks) = &m.content { + for b in blocks { + if let openfang_types::message::ContentBlock::ToolResult { + tool_use_id, + content: result, + is_error, + .. + } = b + { + if let Some(&(msg_idx, tool_idx)) = tool_use_index.get(tool_use_id.as_str()) { + if let Some(msg) = built_messages.get_mut(msg_idx) { + if let Some(tools_arr) = + msg.get_mut("tools").and_then(|v| v.as_array_mut()) + { + if let Some(tool_obj) = tools_arr.get_mut(tool_idx) { + tool_obj["result"] = serde_json::Value::String(result.clone()); + tool_obj["is_error"] = serde_json::Value::Bool(*is_error); + } + } + } + } + } + } + } + } + + built_messages +} + /// DELETE /api/agents/:id — Kill an agent. pub async fn kill_agent( State(state): State>, @@ -1442,6 +1552,7 @@ pub async fn send_message_stream( req.sender_id, req.sender_name, None, // SSE streaming doesn't support image attachments yet + None, ) { Ok(pair) => pair, Err(e) => { @@ -1500,6 +1611,123 @@ pub async fn send_message_stream( .into_response() } +/// POST /api/agents/:id/sessions/:session_id/message/stream — SSE streaming response to specific session. +pub async fn send_message_stream_to_session( + State(state): State>, + Path((id, session_id_str)): Path<(String, String)>, + Json(req): Json, +) -> axum::response::Response { + use axum::response::sse::{Event, Sse}; + use futures::stream; + use openfang_runtime::llm_driver::StreamEvent; + + // SECURITY: Reject oversized messages to prevent OOM / LLM token abuse. + const MAX_MESSAGE_SIZE: usize = 64 * 1024; // 64KB + if req.message.len() > MAX_MESSAGE_SIZE { + return ( + StatusCode::PAYLOAD_TOO_LARGE, + Json(serde_json::json!({"error": "Message too large (max 64KB)"})), + ) + .into_response(); + } + + let agent_id: AgentId = match id.parse() { + Ok(id) => id, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid agent ID"})), + ) + .into_response(); + } + }; + + let session_id: openfang_types::agent::SessionId = match session_id_str.parse::() { + Ok(u) => openfang_types::agent::SessionId(u), + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid session ID"})), + ) + .into_response(); + } + }; + + if state.kernel.registry.get(agent_id).is_none() { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "Agent not found"})), + ) + .into_response(); + } + + let kernel_handle: Arc = state.kernel.clone() as Arc; + let (rx, _handle) = match state.kernel.send_message_streaming( + agent_id, + &req.message, + Some(kernel_handle), + req.sender_id, + req.sender_name, + None, // SSE streaming doesn't support image attachments yet + Some(session_id), + ) { + Ok(pair) => pair, + Err(e) => { + tracing::warn!("Streaming message to session failed for agent {id}: {e}"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "Streaming message failed"})), + ) + .into_response(); + } + }; + + let sse_stream = stream::unfold(rx, |mut rx| async move { + match rx.recv().await { + Some(event) => { + let sse_event: Result = Ok(match event { + StreamEvent::TextDelta { text } => Event::default() + .event("chunk") + .json_data(serde_json::json!({"content": text, "done": false})) + .unwrap_or_else(|_| Event::default().data("error")), + StreamEvent::ToolUseStart { name, .. } => Event::default() + .event("tool_use") + .json_data(serde_json::json!({"tool": name})) + .unwrap_or_else(|_| Event::default().data("error")), + StreamEvent::ToolUseEnd { name, input, .. } => Event::default() + .event("tool_result") + .json_data(serde_json::json!({"tool": name, "input": input})) + .unwrap_or_else(|_| Event::default().data("error")), + StreamEvent::ContentComplete { usage, .. } => Event::default() + .event("done") + .json_data(serde_json::json!({ + "done": true, + "usage": { + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + } + })) + .unwrap_or_else(|_| Event::default().data("error")), + StreamEvent::PhaseChange { phase, detail } => Event::default() + .event("phase") + .json_data(serde_json::json!({ + "phase": phase, + "detail": detail, + })) + .unwrap_or_else(|_| Event::default().data("error")), + _ => Event::default().comment("skip"), + }); + Some((sse_event, rx)) + } + None => None, + } + }); + + Sse::new(sse_stream) + .keep_alive(axum::response::sse::KeepAlive::default()) + .into_response() +} + // --------------------------------------------------------------------------- // Channel status endpoints — data-driven registry for all 40 adapters // --------------------------------------------------------------------------- @@ -5523,6 +5751,50 @@ pub async fn list_sessions(State(state): State>) -> impl IntoRespo } } +/// GET /api/sessions/:id — Get session by ID (including conversation history). +pub async fn get_session( + State(state): State>, + Path(id): Path, +) -> impl IntoResponse { + let session_id = match id.parse::() { + Ok(u) => openfang_types::agent::SessionId(u), + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid session ID"})), + ); + } + }; + + match state.kernel.memory.get_session(session_id) { + Ok(Some(session)) => { + let messages = format_session_messages(&session); + ( + StatusCode::OK, + Json(serde_json::json!({ + "session_id": session.id.0.to_string(), + "agent_id": session.agent_id.0.to_string(), + "message_count": session.messages.len(), + "context_window_tokens": session.context_window_tokens, + "label": session.label, + "messages": messages, + })), + ) + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "Session not found"})), + ), + Err(e) => { + tracing::warn!("Session load failed for session {id}: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "Session load failed"})), + ) + } + } +} + /// DELETE /api/sessions/:id — Delete a session. pub async fn delete_session( State(state): State>, diff --git a/crates/openfang-api/src/server.rs b/crates/openfang-api/src/server.rs index cd78c8b560..c0e61d1326 100644 --- a/crates/openfang-api/src/server.rs +++ b/crates/openfang-api/src/server.rs @@ -172,10 +172,18 @@ pub async fn build_router( "/api/agents/{id}/message", axum::routing::post(routes::send_message), ) + .route( + "/api/agents/{id}/sessions/{session_id}/message", + axum::routing::post(routes::send_message_to_session), + ) .route( "/api/agents/{id}/message/stream", axum::routing::post(routes::send_message_stream), ) + .route( + "/api/agents/{id}/sessions/{session_id}/message/stream", + axum::routing::post(routes::send_message_stream_to_session), + ) .route( "/api/agents/{id}/session", axum::routing::get(routes::get_agent_session), @@ -515,7 +523,7 @@ pub async fn build_router( .route("/api/sessions", axum::routing::get(routes::list_sessions)) .route( "/api/sessions/{id}", - axum::routing::delete(routes::delete_session), + axum::routing::get(routes::get_session).delete(routes::delete_session), ) .route( "/api/sessions/{id}/label", diff --git a/crates/openfang-api/src/ws.rs b/crates/openfang-api/src/ws.rs index 353ffd770e..e68d39cdeb 100644 --- a/crates/openfang-api/src/ws.rs +++ b/crates/openfang-api/src/ws.rs @@ -486,12 +486,27 @@ async fn handle_text_message( } } + // Resolve specific session target if provided, otherwise fallback to agent's active session + let session_id = parsed["session_id"] + .as_str() + .and_then(|s| s.parse::().ok()) + .map(openfang_types::agent::SessionId) + .unwrap_or_else(|| { + state + .kernel + .registry + .get(agent_id) + .map(|e| e.session_id) + .unwrap_or_else(|| openfang_types::agent::SessionId(uuid::Uuid::nil())) + }); + // Send typing lifecycle: start let _ = send_json( sender, &serde_json::json!({ "type": "typing", "state": "start", + "session_id": session_id.0.to_string(), }), ) .await; @@ -506,6 +521,7 @@ async fn handle_text_message( None, None, ws_content_blocks, + Some(session_id), ) { Ok((mut rx, handle)) => { // Forward stream events to WebSocket with debouncing. @@ -541,6 +557,7 @@ async fn handle_text_message( let _ = flush_text_buffer( &sender_stream, &mut text_buffer, + session_id, ) .await; break; @@ -560,6 +577,7 @@ async fn handle_text_message( let _ = flush_text_buffer( &sender_stream, &mut text_buffer, + session_id, ) .await; flush_deadline = far_future; @@ -573,6 +591,7 @@ async fn handle_text_message( let _ = flush_text_buffer( &sender_stream, &mut text_buffer, + session_id, ) .await; flush_deadline = far_future; @@ -588,15 +607,19 @@ async fn handle_text_message( "type": "typing", "state": "tool", "tool": name, + "session_id": session_id.0.to_string(), }), ) .await; } // Map event to JSON with verbose filtering - if let Some(json) = + if let Some(mut json) = map_stream_event(&ev, vlevel) { + if let Some(obj) = json.as_object_mut() { + obj.insert("session_id".to_string(), serde_json::Value::String(session_id.0.to_string())); + } if send_json(&sender_stream, &json) .await .is_err() @@ -613,6 +636,7 @@ async fn handle_text_message( let _ = flush_text_buffer( &sender_stream, &mut text_buffer, + session_id, ) .await; flush_deadline = far_future; @@ -681,6 +705,7 @@ async fn handle_text_message( &serde_json::json!({ "type": "typing", "state": "stop", + "session_id": session_id.0.to_string(), }), ) .await; @@ -694,6 +719,7 @@ async fn handle_text_message( "type": "silent_complete", "input_tokens": usage.input_tokens, "output_tokens": usage.output_tokens, + "session_id": session_id.0.to_string(), }), ) .await; @@ -735,6 +761,7 @@ async fn handle_text_message( "iterations": 0, // Not available from stream; handle updates later if needed "cost_usd": null, "context_pressure": pressure, + "session_id": session_id.0.to_string(), }), ) .await; @@ -745,6 +772,7 @@ async fn handle_text_message( sender, &serde_json::json!({ "type": "typing", "state": "stop", + "session_id": session_id.0.to_string(), }), ) .await; @@ -753,6 +781,7 @@ async fn handle_text_message( &serde_json::json!({ "type": "error", "content": "Internal error occurred", + "session_id": session_id.0.to_string(), }), ) .await; @@ -765,6 +794,7 @@ async fn handle_text_message( sender, &serde_json::json!({ "type": "typing", "state": "stop", + "session_id": session_id.0.to_string(), }), ) .await; @@ -774,6 +804,7 @@ async fn handle_text_message( &serde_json::json!({ "type": "error", "content": user_msg, + "session_id": session_id.0.to_string(), }), ) .await; @@ -1097,6 +1128,7 @@ fn map_stream_event(event: &StreamEvent, verbose: VerboseLevel) -> Option>>, buffer: &mut String, + session_id: openfang_types::agent::SessionId, ) -> Result<(), axum::Error> { if buffer.is_empty() { return Ok(()); @@ -1106,6 +1138,7 @@ async fn flush_text_buffer( &serde_json::json!({ "type": "text_delta", "content": buffer.as_str(), + "session_id": session_id.0.to_string(), }), ) .await; diff --git a/crates/openfang-api/tests/api_integration_test.rs b/crates/openfang-api/tests/api_integration_test.rs index e3592abebc..96636d9c99 100644 --- a/crates/openfang-api/tests/api_integration_test.rs +++ b/crates/openfang-api/tests/api_integration_test.rs @@ -92,10 +92,18 @@ async fn start_test_server_with_provider( "/api/agents/{id}/message", axum::routing::post(routes::send_message), ) + .route( + "/api/agents/{id}/sessions/{session_id}/message", + axum::routing::post(routes::send_message_to_session), + ) .route( "/api/agents/{id}/session", axum::routing::get(routes::get_agent_session), ) + .route( + "/api/sessions/{id}", + axum::routing::get(routes::get_session), + ) .route("/api/agents/{id}/ws", axum::routing::get(ws::agent_ws)) .route( "/api/agents/{id}", @@ -314,6 +322,82 @@ async fn test_agent_session_empty() { assert_eq!(body["messages"].as_array().unwrap().len(), 0); } +#[tokio::test] +async fn test_specific_session_messaging() { + if std::env::var("GROQ_API_KEY").is_err() { + eprintln!("GROQ_API_KEY not set, skipping LLM specific session test"); + return; + } + + let server = start_test_server_with_llm().await; + let client = reqwest::Client::new(); + + // Spawn agent + let resp = client + .post(format!("{}/api/agents", server.base_url)) + .json(&serde_json::json!({"manifest_toml": LLM_MANIFEST})) + .send() + .await + .unwrap(); + let body: serde_json::Value = resp.json().await.unwrap(); + let agent_id = body["agent_id"].as_str().unwrap().to_string(); + + let custom_session_id = uuid::Uuid::new_v4().to_string(); + + // Send message to the SPECIFIC session + let resp = client + .post(format!( + "{}/api/agents/{}/sessions/{}/message", + server.base_url, agent_id, custom_session_id + )) + .json(&serde_json::json!({"message": "Hello from custom session!"})) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + let response_text = body["response"].as_str().unwrap(); + assert!(!response_text.is_empty()); + + // Fetch the specific session directly + let resp = client + .get(format!( + "{}/api/sessions/{}", + server.base_url, custom_session_id + )) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let session_body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!( + session_body["session_id"].as_str().unwrap(), + custom_session_id + ); + assert_eq!(session_body["agent_id"].as_str().unwrap(), agent_id); + assert!(session_body["message_count"].as_u64().unwrap() > 0); + assert!(!session_body["messages"].as_array().unwrap().is_empty()); + + // Verify the agent's DEFAULT active session is still empty/untouched (if not the same) + let resp = client + .get(format!( + "{}/api/agents/{}/session", + server.base_url, agent_id + )) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let default_session: serde_json::Value = resp.json().await.unwrap(); + // Default session should either be 0 or point to a different session ID + if default_session["session_id"].as_str().unwrap() != custom_session_id { + assert_eq!(default_session["message_count"], 0); + } +} + #[tokio::test] async fn test_send_message_with_llm() { if std::env::var("GROQ_API_KEY").is_err() { diff --git a/crates/openfang-cli/src/tui/event.rs b/crates/openfang-cli/src/tui/event.rs index dbd57eef77..850bba9d0b 100644 --- a/crates/openfang-cli/src/tui/event.rs +++ b/crates/openfang-cli/src/tui/event.rs @@ -303,7 +303,7 @@ pub fn spawn_inprocess_stream( // send_message_streaming() finds the reactor. let _guard = rt.enter(); - match kernel.send_message_streaming(agent_id, &message, None, None, None, None) { + match kernel.send_message_streaming(agent_id, &message, None, None, None, None, None) { Ok((mut rx, handle)) => { rt.block_on(async { while let Some(ev) = rx.recv().await { diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index d9fe60f971..7b0d65911e 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -881,12 +881,12 @@ impl OpenFangKernel { // Auto-detect embedding provider by checking API key env vars in // priority order. First match wins. const API_KEY_PROVIDERS: &[(&str, &str)] = &[ - ("OPENAI_API_KEY", "openai"), - ("GROQ_API_KEY", "groq"), - ("MISTRAL_API_KEY", "mistral"), - ("TOGETHER_API_KEY", "together"), + ("OPENAI_API_KEY", "openai"), + ("GROQ_API_KEY", "groq"), + ("MISTRAL_API_KEY", "mistral"), + ("TOGETHER_API_KEY", "together"), ("FIREWORKS_API_KEY", "fireworks"), - ("COHERE_API_KEY", "cohere"), + ("COHERE_API_KEY", "cohere"), ]; let detected_from_key = API_KEY_PROVIDERS @@ -1127,8 +1127,7 @@ impl OpenFangKernel { != entry.manifest.tool_allowlist || disk_manifest.tool_blocklist != entry.manifest.tool_blocklist - || disk_manifest.skills - != entry.manifest.skills + || disk_manifest.skills != entry.manifest.skills || disk_manifest.mcp_servers != entry.manifest.mcp_servers; if changed { @@ -1539,6 +1538,7 @@ impl OpenFangKernel { Some(blocks), None, None, + None, ) .await } @@ -1559,6 +1559,7 @@ impl OpenFangKernel { None, sender_id, sender_name, + None, ) .await } @@ -1572,6 +1573,7 @@ impl OpenFangKernel { /// Per-agent locking ensures that concurrent messages for the same agent /// are serialized (preventing session corruption), while messages for /// different agents run in parallel. + #[allow(clippy::too_many_arguments)] pub async fn send_message_with_handle_and_blocks( &self, agent_id: AgentId, @@ -1580,6 +1582,7 @@ impl OpenFangKernel { content_blocks: Option>, sender_id: Option, sender_name: Option, + target_session_id: Option, ) -> KernelResult { // Acquire per-agent lock to serialize concurrent messages for the same agent. // This prevents session corruption when multiple messages arrive in quick @@ -1617,6 +1620,7 @@ impl OpenFangKernel { content_blocks, sender_id, sender_name, + target_session_id, ) .await }; @@ -1667,6 +1671,7 @@ impl OpenFangKernel { /// /// WASM and Python agents don't support true streaming — they execute /// synchronously and emit a single `TextDelta` + `ContentComplete` pair. + #[allow(clippy::too_many_arguments)] pub fn send_message_streaming( self: &Arc, agent_id: AgentId, @@ -1675,6 +1680,7 @@ impl OpenFangKernel { sender_id: Option, sender_name: Option, content_blocks: Option>, + target_session_id: Option, ) -> KernelResult<( tokio::sync::mpsc::Receiver, tokio::task::JoinHandle>, @@ -1743,12 +1749,13 @@ impl OpenFangKernel { } // LLM agent: true streaming via agent loop + let active_session_id = target_session_id.unwrap_or(entry.session_id); let mut session = self .memory - .get_session(entry.session_id) + .get_session(active_session_id) .map_err(KernelError::OpenFang)? .unwrap_or_else(|| openfang_memory::session::Session { - id: entry.session_id, + id: active_session_id, agent_id, messages: Vec::new(), context_window_tokens: 0, @@ -2300,18 +2307,20 @@ impl OpenFangKernel { content_blocks: Option>, sender_id: Option, sender_name: Option, + target_session_id: Option, ) -> KernelResult { // Check metering quota before starting self.metering .check_quota(agent_id, &entry.manifest.resources) .map_err(KernelError::OpenFang)?; + let active_session_id = target_session_id.unwrap_or(entry.session_id); let mut session = self .memory - .get_session(entry.session_id) + .get_session(active_session_id) .map_err(KernelError::OpenFang)? .unwrap_or_else(|| openfang_memory::session::Session { - id: entry.session_id, + id: active_session_id, agent_id, messages: Vec::new(), context_window_tokens: 0, diff --git a/crates/openfang-kernel/tests/wasm_agent_integration_test.rs b/crates/openfang-kernel/tests/wasm_agent_integration_test.rs index 2fd1cd1931..76aeb3aafe 100644 --- a/crates/openfang-kernel/tests/wasm_agent_integration_test.rs +++ b/crates/openfang-kernel/tests/wasm_agent_integration_test.rs @@ -303,7 +303,7 @@ async fn test_wasm_agent_streaming_fallback() { let agent_id = kernel.spawn_agent(manifest).unwrap(); let (mut rx, handle) = kernel - .send_message_streaming(agent_id, "Hi!", None, None, None, None) + .send_message_streaming(agent_id, "Hi!", None, None, None, None, None) .expect("Streaming should start"); // Collect all stream events diff --git a/crates/openfang-runtime/src/mcp.rs b/crates/openfang-runtime/src/mcp.rs index b9f5f3819f..ca3837a3d4 100644 --- a/crates/openfang-runtime/src/mcp.rs +++ b/crates/openfang-runtime/src/mcp.rs @@ -307,11 +307,9 @@ impl McpConnection { } } - let config = StreamableHttpClientTransportConfig { - uri: Arc::from(url), - custom_headers, - ..Default::default() - }; + let mut config = StreamableHttpClientTransportConfig::default(); + config.uri = Arc::from(url); + config.custom_headers = custom_headers; let transport = StreamableHttpClientTransport::from_config(config); diff --git a/crates/openfang-runtime/src/web_fetch.rs b/crates/openfang-runtime/src/web_fetch.rs index 81021aefca..7d318a7d8a 100644 --- a/crates/openfang-runtime/src/web_fetch.rs +++ b/crates/openfang-runtime/src/web_fetch.rs @@ -506,7 +506,11 @@ mod tests { assert!(check_ssrf("http://169.254.169.254/latest/meta-data/", &allow).is_err()); // Also verify hostname-based metadata blocks let allow2 = vec!["metadata.google.internal".to_string()]; - assert!(check_ssrf("http://metadata.google.internal/computeMetadata/v1/", &allow2).is_err()); + assert!(check_ssrf( + "http://metadata.google.internal/computeMetadata/v1/", + &allow2 + ) + .is_err()); } #[test] @@ -514,7 +518,7 @@ mod tests { let allow = vec!["*.example.com".to_string()]; assert!(check_ssrf("http://api.example.com", &allow).is_ok()); // Non-matching domain should still go through normal checks - assert!(is_host_allowed("other.net", &allow) == false); + assert!(!is_host_allowed("other.net", &allow)); } #[test] diff --git a/crates/openfang-runtime/src/web_search.rs b/crates/openfang-runtime/src/web_search.rs index 28e92259e3..11b2f5823f 100644 --- a/crates/openfang-runtime/src/web_search.rs +++ b/crates/openfang-runtime/src/web_search.rs @@ -358,7 +358,10 @@ impl WebSearchEngine { let resp = self .client - .get(format!("{}/search", self.config.searxng.url.trim_end_matches('/'))) + .get(format!( + "{}/search", + self.config.searxng.url.trim_end_matches('/') + )) .query(&[ ("q", query), ("format", "json"), @@ -451,7 +454,10 @@ impl WebSearchEngine { let resp = self .client - .get(format!("{}/config", self.config.searxng.url.trim_end_matches('/'))) + .get(format!( + "{}/config", + self.config.searxng.url.trim_end_matches('/') + )) .header("User-Agent", "Mozilla/5.0 (compatible; OpenFangAgent/0.1)") .send() .await