Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion swarms-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ path = "examples/multiple_agent/swarm_router.rs"

[[example]]
name = "pretty_print_example"
path = "examples/single_agent/pretty_print_example.rs"
path = "examples/single_agent/pretty_print_example.rs"

[[example]]
name = "streaming_agent"
path = "examples/single_agent/streaming_agent.rs"
100 changes: 100 additions & 0 deletions swarms-rs/examples/single_agent/streaming_agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::channel::mpsc;
use std::time::Duration;
use swarms_rs::{Agent};
use swarms_rs::structs::agent::AgentError;

struct StreamingAgent;

impl Agent for StreamingAgent {
fn run(&self, task: String) -> futures::future::BoxFuture<'static, Result<String, AgentError>> {
Box::pin(async move {
let mut stream = self.run_stream(task);
let mut last: Option<String> = None;
while let Some(item) = stream.next().await {
match item {
Ok(v) => last = Some(v),
Err(e) => return Err(e),
}
}
match last {
Some(v) => Ok(v),
None => Err(AgentError::NoChoiceFound),
}
})
}

fn run_stream(&self, task: String) -> BoxStream<'static, Result<String, AgentError>> {
let (mut tx, rx) = mpsc::unbounded::<Result<String, AgentError>>();

tokio::spawn(async move {
let _ = tx.send(Ok(format!("start: {}", task))).await;
for i in 0..3u8 {
tokio::time::sleep(Duration::from_millis(200)).await;
let _ = tx.send(Ok(format!("progress {}", i))).await;
}
let _ = tx.send(Ok("done".to_string())).await;
});

rx.boxed()
}

fn run_multiple_tasks(&mut self, _tasks: Vec<String>) -> futures::future::BoxFuture<'static, Result<Vec<String>, AgentError>> {
Box::pin(async move { Ok(vec![]) })
}

fn plan(&self, _task: String) -> futures::future::BoxFuture<'static, Result<(), AgentError>> {
Box::pin(async move { Ok(()) })
}

fn query_long_term_memory(&self, _task: String) -> futures::future::BoxFuture<'static, Result<(), AgentError>> {
Box::pin(async move { Ok(()) })
}

fn save_task_state(&self, _task: String) -> futures::future::BoxFuture<'static, Result<(), AgentError>> {
Box::pin(async move { Ok(()) })
}

fn is_response_complete(&self, _response: String) -> bool {
true
}

fn id(&self) -> String {
"streaming-agent".to_string()
}

fn name(&self) -> String {
"StreamingAgent".to_string()
}

fn description(&self) -> String {
"Example streaming agent that yields multiple values".to_string()
}

fn clone_box(&self) -> Box<dyn Agent> {
Box::new(StreamingAgent)
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let agent = StreamingAgent;

println!("Starting streaming agent demo...");
let mut stream = agent.run_stream("example task".to_string());
while let Some(item) = stream.next().await {
match item {
Ok(v) => println!("STREAM: {}", v),
Err(e) => println!("STREAM ERROR: {}", e),
}
}

// Optionally, use run() to get final value
match agent.run("final task".to_string()).await {
Ok(r) => println!("RUN result: {}", r),
Err(e) => println!("RUN error: {}", e),
}

Ok(())
}
11 changes: 11 additions & 0 deletions swarms-rs/src/structs/agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::structs::persistence;
use crate::structs::tool::ToolError;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -254,6 +256,15 @@ pub trait Agent: Send + Sync {
/// Runs the autonomous agent loop to complete the given task.
fn run(&self, task: String) -> BoxFuture<Result<String, AgentError>>;

/// Stream partial or incremental results for the given task.
///
/// Default implementation wraps `run` into a single-item stream so
/// existing implementors remain compatible.
fn run_stream(&self, task: String) -> BoxStream<'static, Result<String, AgentError>> {
let fut = self.run(task);
futures::stream::once(fut).boxed()
}

/// Run multiple tasks concurrently
fn run_multiple_tasks(
&mut self,
Expand Down
Loading