Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion crates/openfang-api/src/openai_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,15 @@ async fn stream_response(

let (mut rx, _handle) = state
.kernel
.send_message_streaming(agent_id, message, Some(kernel_handle), None, None, None)
.send_message_streaming(
agent_id,
message,
Some(kernel_handle),
None,
None,
None,
None,
)
.map_err(|e| format!("Streaming setup failed: {e}"))?;

let (tx, stream_rx) = tokio::sync::mpsc::channel::<Result<SseEvent, Infallible>>(64);
Expand Down
534 changes: 403 additions & 131 deletions crates/openfang-api/src/routes.rs

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion crates/openfang-api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,18 @@ pub async fn build_router(
"/api/agents/{id}/message",
axum::routing::post(routes::send_message),
)
.route(
"/api/agents/{id}/sessions/{session_id}/message",
axum::routing::post(routes::send_message_to_session),
)
.route(
"/api/agents/{id}/message/stream",
axum::routing::post(routes::send_message_stream),
)
.route(
"/api/agents/{id}/sessions/{session_id}/message/stream",
axum::routing::post(routes::send_message_stream_to_session),
)
.route(
"/api/agents/{id}/session",
axum::routing::get(routes::get_agent_session),
Expand Down Expand Up @@ -515,7 +523,7 @@ pub async fn build_router(
.route("/api/sessions", axum::routing::get(routes::list_sessions))
.route(
"/api/sessions/{id}",
axum::routing::delete(routes::delete_session),
axum::routing::get(routes::get_session).delete(routes::delete_session),
)
.route(
"/api/sessions/{id}/label",
Expand Down
35 changes: 34 additions & 1 deletion crates/openfang-api/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,27 @@ async fn handle_text_message(
}
}

// Resolve specific session target if provided, otherwise fallback to agent's active session
let session_id = parsed["session_id"]
.as_str()
.and_then(|s| s.parse::<uuid::Uuid>().ok())
.map(openfang_types::agent::SessionId)
.unwrap_or_else(|| {
state
.kernel
.registry
.get(agent_id)
.map(|e| e.session_id)
.unwrap_or_else(|| openfang_types::agent::SessionId(uuid::Uuid::nil()))
});

// Send typing lifecycle: start
let _ = send_json(
sender,
&serde_json::json!({
"type": "typing",
"state": "start",
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand All @@ -506,6 +521,7 @@ async fn handle_text_message(
None,
None,
ws_content_blocks,
Some(session_id),
) {
Ok((mut rx, handle)) => {
// Forward stream events to WebSocket with debouncing.
Expand Down Expand Up @@ -541,6 +557,7 @@ async fn handle_text_message(
let _ = flush_text_buffer(
&sender_stream,
&mut text_buffer,
session_id,
)
.await;
break;
Expand All @@ -560,6 +577,7 @@ async fn handle_text_message(
let _ = flush_text_buffer(
&sender_stream,
&mut text_buffer,
session_id,
)
.await;
flush_deadline = far_future;
Expand All @@ -573,6 +591,7 @@ async fn handle_text_message(
let _ = flush_text_buffer(
&sender_stream,
&mut text_buffer,
session_id,
)
.await;
flush_deadline = far_future;
Expand All @@ -588,15 +607,19 @@ async fn handle_text_message(
"type": "typing",
"state": "tool",
"tool": name,
"session_id": session_id.0.to_string(),
}),
)
.await;
}

// Map event to JSON with verbose filtering
if let Some(json) =
if let Some(mut json) =
map_stream_event(&ev, vlevel)
{
if let Some(obj) = json.as_object_mut() {
obj.insert("session_id".to_string(), serde_json::Value::String(session_id.0.to_string()));
}
if send_json(&sender_stream, &json)
.await
.is_err()
Expand All @@ -613,6 +636,7 @@ async fn handle_text_message(
let _ = flush_text_buffer(
&sender_stream,
&mut text_buffer,
session_id,
)
.await;
flush_deadline = far_future;
Expand Down Expand Up @@ -681,6 +705,7 @@ async fn handle_text_message(
&serde_json::json!({
"type": "typing",
"state": "stop",
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand All @@ -694,6 +719,7 @@ async fn handle_text_message(
"type": "silent_complete",
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand Down Expand Up @@ -735,6 +761,7 @@ async fn handle_text_message(
"iterations": 0, // Not available from stream; handle updates later if needed
"cost_usd": null,
"context_pressure": pressure,
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand All @@ -745,6 +772,7 @@ async fn handle_text_message(
sender,
&serde_json::json!({
"type": "typing", "state": "stop",
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand All @@ -753,6 +781,7 @@ async fn handle_text_message(
&serde_json::json!({
"type": "error",
"content": "Internal error occurred",
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand All @@ -765,6 +794,7 @@ async fn handle_text_message(
sender,
&serde_json::json!({
"type": "typing", "state": "stop",
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand All @@ -774,6 +804,7 @@ async fn handle_text_message(
&serde_json::json!({
"type": "error",
"content": user_msg,
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand Down Expand Up @@ -1097,6 +1128,7 @@ fn map_stream_event(event: &StreamEvent, verbose: VerboseLevel) -> Option<serde_
async fn flush_text_buffer(
sender: &Arc<Mutex<SplitSink<WebSocket, Message>>>,
buffer: &mut String,
session_id: openfang_types::agent::SessionId,
) -> Result<(), axum::Error> {
if buffer.is_empty() {
return Ok(());
Expand All @@ -1106,6 +1138,7 @@ async fn flush_text_buffer(
&serde_json::json!({
"type": "text_delta",
"content": buffer.as_str(),
"session_id": session_id.0.to_string(),
}),
)
.await;
Expand Down
84 changes: 84 additions & 0 deletions crates/openfang-api/tests/api_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,18 @@ async fn start_test_server_with_provider(
"/api/agents/{id}/message",
axum::routing::post(routes::send_message),
)
.route(
"/api/agents/{id}/sessions/{session_id}/message",
axum::routing::post(routes::send_message_to_session),
)
.route(
"/api/agents/{id}/session",
axum::routing::get(routes::get_agent_session),
)
.route(
"/api/sessions/{id}",
axum::routing::get(routes::get_session),
)
.route("/api/agents/{id}/ws", axum::routing::get(ws::agent_ws))
.route(
"/api/agents/{id}",
Expand Down Expand Up @@ -314,6 +322,82 @@ async fn test_agent_session_empty() {
assert_eq!(body["messages"].as_array().unwrap().len(), 0);
}

#[tokio::test]
async fn test_specific_session_messaging() {
if std::env::var("GROQ_API_KEY").is_err() {
eprintln!("GROQ_API_KEY not set, skipping LLM specific session test");
return;
}

let server = start_test_server_with_llm().await;
let client = reqwest::Client::new();

// Spawn agent
let resp = client
.post(format!("{}/api/agents", server.base_url))
.json(&serde_json::json!({"manifest_toml": LLM_MANIFEST}))
.send()
.await
.unwrap();
let body: serde_json::Value = resp.json().await.unwrap();
let agent_id = body["agent_id"].as_str().unwrap().to_string();

let custom_session_id = uuid::Uuid::new_v4().to_string();

// Send message to the SPECIFIC session
let resp = client
.post(format!(
"{}/api/agents/{}/sessions/{}/message",
server.base_url, agent_id, custom_session_id
))
.json(&serde_json::json!({"message": "Hello from custom session!"}))
.send()
.await
.unwrap();

assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
let response_text = body["response"].as_str().unwrap();
assert!(!response_text.is_empty());

// Fetch the specific session directly
let resp = client
.get(format!(
"{}/api/sessions/{}",
server.base_url, custom_session_id
))
.send()
.await
.unwrap();

assert_eq!(resp.status(), 200);
let session_body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(
session_body["session_id"].as_str().unwrap(),
custom_session_id
);
assert_eq!(session_body["agent_id"].as_str().unwrap(), agent_id);
assert!(session_body["message_count"].as_u64().unwrap() > 0);
assert!(!session_body["messages"].as_array().unwrap().is_empty());

// Verify the agent's DEFAULT active session is still empty/untouched (if not the same)
let resp = client
.get(format!(
"{}/api/agents/{}/session",
server.base_url, agent_id
))
.send()
.await
.unwrap();

assert_eq!(resp.status(), 200);
let default_session: serde_json::Value = resp.json().await.unwrap();
// Default session should either be 0 or point to a different session ID
if default_session["session_id"].as_str().unwrap() != custom_session_id {
assert_eq!(default_session["message_count"], 0);
}
}

#[tokio::test]
async fn test_send_message_with_llm() {
if std::env::var("GROQ_API_KEY").is_err() {
Expand Down
2 changes: 1 addition & 1 deletion crates/openfang-cli/src/tui/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub fn spawn_inprocess_stream(
// send_message_streaming() finds the reactor.
let _guard = rt.enter();

match kernel.send_message_streaming(agent_id, &message, None, None, None, None) {
match kernel.send_message_streaming(agent_id, &message, None, None, None, None, None) {
Ok((mut rx, handle)) => {
rt.block_on(async {
while let Some(ev) = rx.recv().await {
Expand Down
Loading