Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions swarms/structs/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,18 @@
self.save()
self._autosave_config_step(loop_count=loop_count)

# Generate autonomous loop summary with streaming support when max_loops="auto"
if (
self.max_loops == "auto"
and (self.streaming_on or self.stream or streaming_callback is not None)
):
self._generate_autonomous_loop_summary(
task=task,
streaming_callback=streaming_callback,
*args,
**kwargs,
)

# Output formatting based on output_type
return history_output_formatter(
self.short_memory, type=self.output_type
Expand Down Expand Up @@ -1821,6 +1833,75 @@
role=self.agent_name, content=message
)

def _generate_autonomous_loop_summary(
self,

Check failure

Code scanning / Pyre

Undefined attribute Error

Undefined attribute [16]: Optional has no attribute lower.
task: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> None:
"""
Generate a comprehensive summary of the autonomous loop execution.
This method is called after the autonomous loop completes (when max_loops="auto").
It synthesizes all the execution phases (planning, execution) into a coherent,
comprehensive summary while supporting real-time streaming.
Args:
task (Optional[str]): The original task that was executed
streaming_callback (Optional[Callable[[str], None]]): Callback function to stream summary tokens in real-time
*args: Additional positional arguments for extensibility

Check failure

Code scanning / Pyre

Incompatible parameter type Error

Incompatible parameter type [6]: In call time.sleep, for 1st positional argument, expected float but got Optional[int].
**kwargs: Additional keyword arguments for extensibility
Returns:
None: The summary is added to short-term memory and streamed to the callback
"""
try:
# Get the conversation history up to this point
history = self.short_memory.get_str()

# Create a comprehensive summary prompt
summary_prompt = f"""Based on the complete execution history below, generate a comprehensive final summary of the task completion:
## EXECUTION HISTORY:
{history}
## SUMMARY GENERATION INSTRUCTIONS:
1. **Task Completion Status**: Clearly state whether the task was successfully completed
2. **Key Accomplishments**: List the major achievements and milestones reached
3. **Process Summary**: Briefly describe the phases executed (planning, execution, etc.)
4. **Results**: Provide concrete results and outputs from the execution
5. **Insights**: Share key insights, learnings, or important findings
6. **Final Recommendations**: If applicable, provide recommendations for next steps or improvements
Please generate a clear, well-structured summary that captures the essence of the entire autonomous loop execution."""

# Call LLM with streaming support for the summary
summary = self.call_llm(
task=summary_prompt,
current_loop=0,
streaming_callback=streaming_callback,
*args,
**kwargs,
)

# Add the summary to short-term memory
self.short_memory.add(
role=self.agent_name,
content=f"## FINAL AUTONOMOUS LOOP SUMMARY ##\n{summary}",
)

if self.verbose:
logger.info(
f"Autonomous loop summary generated for agent '{self.agent_name}'"
)

except Exception as error:
logger.error(
f"Failed to generate autonomous loop summary for agent '{self.agent_name}': {error}"
)
# Don't raise - summary generation failure shouldn't break the execution

def plan(self, task: str, *args, **kwargs) -> None:
"""
Create a strategic plan for executing the given task.
Expand Down
125 changes: 125 additions & 0 deletions test_autonomous_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""
Test script to verify autonomous loop streaming support.

This script tests that streaming works correctly in all three phases:
1. Planning Phase - should stream
2. Execution Phase - should stream
3. Summary Phase - should NOW stream (this was the fix)
"""

import sys
from swarms import Agent


def test_autonomous_loop_streaming():
"""Test autonomous loop with streaming enabled."""

print("\n" + "="*80)
print("TESTING AUTONOMOUS LOOP WITH STREAMING SUPPORT")
print("="*80 + "\n")

# Create a streaming callback to collect tokens
collected_tokens = {
"planning_phase": [],
"execution_phase": [],
"summary_phase": [],
}

current_phase = "planning_phase"

def streaming_callback(token):
"""Callback to receive streaming tokens."""
# Print token for real-time feedback
if isinstance(token, dict):
# Handle token info dict from call_llm
if "token" in token:
print(token["token"], end="", flush=True)
collected_tokens[current_phase].append(token["token"])
else:
# Handle plain string token
print(str(token), end="", flush=True)
collected_tokens[current_phase].append(str(token))

try:
# Initialize agent with streaming enabled and max_loops="auto"
agent = Agent(
agent_name="StreamingTestAgent",
system_prompt="""You are a helpful assistant that provides clear, comprehensive responses.
When asked to plan, provide structured step-by-step plans.
When asked to execute, provide detailed execution results.
When asked to summarize, provide comprehensive summaries.""",
model_name="gpt-4o-mini",
max_loops="auto",
streaming_on=True,
verbose=False, # Reduce noise
print_on=False,
)

# Test task that will trigger autonomous loop
task = """Please help me with the following:
1. Create a plan for organizing a small tech conference
2. Outline the key steps needed
3. List important considerations

Then provide a comprehensive summary of everything discussed."""

print(f"πŸ“‹ Task: {task}\n")
print("-" * 80)
print("🎬 EXECUTING WITH AUTONOMOUS LOOP STREAMING...\n")

# Run agent with streaming callback
result = agent.run(
task=task,
streaming_callback=streaming_callback,
)

print("\n" + "-" * 80)
print("\nβœ… EXECUTION COMPLETED!\n")

# Verify results
print("πŸ“Š STREAMING VERIFICATION REPORT:")
print("-" * 80)

phases_with_tokens = []
phases_without_tokens = []

for phase, tokens in collected_tokens.items():
token_count = len(tokens)
if token_count > 0:
phases_with_tokens.append((phase, token_count))
status = "βœ…"
else:
phases_without_tokens.append(phase)
status = "❌"

print(f"{status} {phase}: {token_count} tokens streamed")

print("\n" + "="*80)
if "summary_phase" in [p for p, _ in phases_with_tokens]:
print("βœ… SUCCESS: Summary phase is NOW streaming!")
print(" The autonomous loop streaming fix is working correctly.")
else:
print("❌ ISSUE: Summary phase is still not streaming.")
print(" The fix may need additional investigation.")

print("="*80 + "\n")

# Show final result
print("πŸ“„ FINAL AGENT OUTPUT:")
print("-" * 80)
print(result)
print("-" * 80 + "\n")

return len(phases_without_tokens) == 0

except Exception as e:
print(f"\n❌ ERROR during test: {str(e)}")
import traceback
traceback.print_exc()
return False


if __name__ == "__main__":
success = test_autonomous_loop_streaming()
sys.exit(0 if success else 1)
Loading