Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion docs/swarms/structs/sequential_workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The SequentialWorkflow now includes a powerful **sequential awareness** feature

## Methods

### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)`
### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, streaming_callback: Optional[Callable[[str], None]] = None, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)`

The constructor initializes the `SequentialWorkflow` object with enhanced sequential awareness capabilities.

Expand All @@ -66,6 +66,7 @@ The constructor initializes the `SequentialWorkflow` object with enhanced sequen
- `team_awareness` (`bool`, optional): **NEW**: Enables sequential awareness features. Defaults to `False`.
- `time_enabled` (`bool`, optional): **NEW**: Enables timestamps in conversation. Defaults to `False`.
- `message_id_on` (`bool`, optional): **NEW**: Enables message IDs in conversation. Defaults to `False`.
- `streaming_callback` (`Callable[[str], None]`, optional): **NEW**: Enables streaming callback in conversation. Defaults to `None`.
- `*args`: Variable length argument list.
- `**kwargs`: Arbitrary keyword arguments.

Expand Down Expand Up @@ -281,3 +282,60 @@ The `run` method now includes enhanced logging to track the sequential awareness
5. **Professional Workflows**: Mimics real-world team collaboration patterns

The SequentialWorkflow with sequential awareness represents a significant advancement in multi-agent coordination, enabling more sophisticated and professional workflows that closely mirror human team collaboration patterns.


## **Usage Example with Streaming Callback in Sequential Workflows:**

```python
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow

"""Callback Function to Be Used"""
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
if chunk:
print(chunk, end="", flush=True)
if is_final:
print()

def create_agents():
"""Create specialized agents for the workflow."""
return [
Agent(
agent_name="Research_Agent",
agent_description="Specialized in gathering and analyzing information",
system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
),
Agent(
agent_name="Analysis_Agent",
agent_description="Expert at analyzing data and drawing insights",
system_prompt="You are an analysis expert. Break down complex information and provide clear insights.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
),
]

if __name__ == "__main__":
agents = create_agents()
workflow = SequentialWorkflow(
id="research_analysis_workflow",
name="Research Analysis Workflow",
description="A sequential workflow that researches and analyzes topics",
agents=agents,
max_loops=1,
output_type="str",
multi_agent_collab_prompt=True,
)

task = "What are the latest advancements in AI?"

workflow.run(
task=task,
streaming_callback=streaming_callback,
)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow

def streaming_callback(agent_name: str, chunk: str, is_final: bool):
if chunk:
print(chunk, end="", flush=True)
if is_final:
print()

def create_agents():
"""Create specialized agents for the workflow."""
return [
Agent(
agent_name="Research_Agent",
agent_description="Specialized in gathering and analyzing information",
system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
),
Agent(
agent_name="Analysis_Agent",
agent_description="Expert at analyzing data and drawing insights",
system_prompt="You are an analysis expert. Break down complex information and provide clear insights.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
),
]

if __name__ == "__main__":
agents = create_agents()
workflow = SequentialWorkflow(
id="research_analysis_workflow",
name="Research Analysis Workflow",
description="A sequential workflow that researches and analyzes topics",
agents=agents,
max_loops=1,
output_type="str",
multi_agent_collab_prompt=True,
)

task = "What are the latest advancements in AI?"

workflow.run(
task=task,
streaming_callback=streaming_callback,
)
125 changes: 92 additions & 33 deletions swarms/structs/agent_rearrange.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.telemetry.main import log_agent_data
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
Expand Down Expand Up @@ -38,6 +37,7 @@ def __init__(
team_awareness: bool = False,
time_enabled: bool = False,
message_id_on: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
):
Expand All @@ -55,6 +55,7 @@ def __init__(
self.autosave = autosave
self.time_enabled = time_enabled
self.message_id_on = message_id_on
self.streaming_callback = streaming_callback

self.conversation = Conversation(
name=f"{self.name}-Conversation",
Expand Down Expand Up @@ -311,6 +312,9 @@ def _run(
task: str = None,
img: str = None,
custom_tasks: Dict[str, str] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -345,8 +349,6 @@ def _run(
return "Invalid flow configuration."

tasks = self.flow.split("->")
current_task = task
response_dict = {}

logger.info(
f"Starting task execution with {len(tasks)} steps"
Expand Down Expand Up @@ -384,24 +386,51 @@ def _run(

for agent_name in agent_names:
agent = self.agents[agent_name]
result = agent.run(
task=self.conversation.get_str(),
img=img,
*args,
**kwargs,
)
result = any_to_str(result)

self.conversation.add(
agent.agent_name, result
)

response_dict[agent_name] = result
logger.debug(
f"Agent {agent_name} output: {result}"
)

",".join(agent_names)
# Handle streaming callback if provided
if streaming_callback is not None:

def agent_streaming_callback(chunk: str):
"""Wrapper for agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(
agent_name, chunk, False
)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}"
)

output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
streaming_callback=agent_streaming_callback,
*args,
**kwargs,
)

# Call completion callback
try:
streaming_callback(agent_name, "", True)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}"
)
else:
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
*args,
**kwargs,
)
self.conversation.add(role=agent_name, content=output)

if self.verbose:
logger.success(
f"[SUCCESS] Agent {agent_name} completed task successfully"
)

self.conversation.add(role=agent_name, content=output)

Check failure

Code scanning / Pyre

Uninitialized local Error

Uninitialized local [61]: Local variable agent_name is undefined, or not always defined.

Check failure

Code scanning / Pyre

Uninitialized local Error

Uninitialized local [61]: Local variable output is undefined, or not always defined.

else:
# Sequential processing
Expand All @@ -412,7 +441,6 @@ def _run(

agent = self.agents[agent_name]

# Add sequential awareness information for the agent
awareness_info = (
self._get_sequential_awareness(
agent_name, tasks
Expand All @@ -426,19 +454,50 @@ def _run(
f"Added sequential awareness for {agent_name}: {awareness_info}"
)

current_task = agent.run(
task=self.conversation.get_str(),
img=img,
*args,
**kwargs,
)
current_task = any_to_str(current_task)
# Handle streaming callback if provided
if streaming_callback is not None:

def agent_streaming_callback(chunk: str):
"""Wrapper for agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(
agent_name, chunk, False
)
Comment on lines +464 to +466

Check failure

Code scanning / Pyre

Call error Error

Call error [29]: Optional[typing.Callable[[str, str, bool], None]] is not a function.
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}"
)

output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
streaming_callback=agent_streaming_callback,
*args,
**kwargs,
)

self.conversation.add(
agent.agent_name, current_task
)
# Call completion callback
try:
streaming_callback(agent_name, "", True)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}"
)
else:
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
*args,
**kwargs,
)
self.conversation.add(role=agent_name, content=output)

if self.verbose:
logger.success(
f"[SUCCESS] Agent {agent_name} completed task successfully"
)

response_dict[agent_name] = current_task

loop_count += 1

Expand Down
13 changes: 11 additions & 2 deletions swarms/structs/sequential_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
shared_memory_system: callable = None,
multi_agent_collab_prompt: bool = True,
team_awareness: bool = False,
streaming_callback: Optional[Callable[[str, str, bool], None]] = None,
*args,
**kwargs,
):
Expand All @@ -64,6 +65,8 @@ def __init__(
output_type (OutputType, optional): Output format for the workflow. Defaults to "dict".
shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None.
multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent.
team_awareness (bool, optional): Whether to enable team awareness. Defaults to False.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.

Expand All @@ -79,6 +82,7 @@ def __init__(
self.shared_memory_system = shared_memory_system
self.multi_agent_collab_prompt = multi_agent_collab_prompt
self.team_awareness = team_awareness
self.streaming_callback = streaming_callback

self.reliability_check()
self.flow = self.sequential_flow()
Expand All @@ -91,6 +95,7 @@ def __init__(
max_loops=self.max_loops,
output_type=self.output_type,
team_awareness=self.team_awareness,
streaming_callback=self.streaming_callback,

Check failure

Code scanning / Pyre

Incompatible parameter type Error

Incompatible parameter type [6]: In call AgentRearrange.__init__, for argument streaming_callback, expected Optional[typing.Callable[[str], None]] but got Optional[typing.Callable[[str, str, bool], None]].
*args,
**kwargs,
)
Expand Down Expand Up @@ -128,7 +133,6 @@ def sequential_flow(self):
agent_names = []
for agent in self.agents:
try:
# Try to get agent_name, fallback to name if not available
agent_name = (
getattr(agent, "agent_name", None)
or agent.name
Expand Down Expand Up @@ -158,6 +162,9 @@ def run(
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
*args,
**kwargs,
):
Expand All @@ -179,10 +186,12 @@ def run(
Exception: If any error occurs during task execution.
"""
try:
# prompt = f"{MULTI_AGENT_COLLAB_PROMPT}\n\n{task}"
return self.agent_rearrange.run(
task=task,
img=img,
streaming_callback=streaming_callback,
*args,
**kwargs,
)

except Exception as e:
Expand Down