Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 206 additions & 38 deletions swarms/agents/i_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

"""

import re
from typing import List, Tuple
from loguru import logger
from swarms.structs.agent import Agent
Expand All @@ -36,6 +37,12 @@

"""

# Configuration constants
MAX_PATHS_PER_ITERATION = 5
SCORE_THRESHOLD = 0.7
EARLY_TERMINATION_SCORE = 0.85
DEFAULT_SCORE = 0.5


class IterativeReflectiveExpansion:
"""
Expand Down Expand Up @@ -77,6 +84,66 @@ def __init__(
dynamic_temperature_enabled=True,
)

def _extract_score_robust(self, response: str) -> float:
"""
Robustly extract a score from LLM response using multiple strategies.

:param response: The LLM response text.
:return: Extracted score between 0.0 and 1.0, or DEFAULT_SCORE if extraction fails.
"""
# Strategy 1: Look for "Score: X.X" format (with or without markdown formatting)
for line in response.splitlines():
line_clean = line.strip().replace('*', '') # Remove markdown formatting
if 'score:' in line_clean.lower():
try:
# Extract everything after "score:"
score_str = line_clean.lower().split('score:')[-1].strip()
# Remove any non-numeric characters except decimal point
score_str = re.sub(r'[^\d.]', '', score_str)
if score_str: # Make sure we have something to parse
score = float(score_str)
# Clamp to valid range
return max(0.0, min(1.0, score))
except (ValueError, IndexError):
pass

# Strategy 2: Look for any number between 0 and 1 with context
score_patterns = [
r'score[:\s]+(\d+\.?\d*)',
r'rating[:\s]+(\d+\.?\d*)',
r'effectiveness[:\s]+(\d+\.?\d*)',
r'(\d+\.?\d*)\s*(?:/|out of)\s*(?:10|1\.0|1)',
]

for pattern in score_patterns:
matches = re.findall(pattern, response.lower())
if matches:
try:
score = float(matches[0])
# Normalize if score is out of 10
if score > 1.0:
score = score / 10.0
return max(0.0, min(1.0, score))
except ValueError:
continue

# Strategy 3: Sentiment analysis fallback
positive_keywords = ['excellent', 'good', 'promising', 'effective', 'successful', 'optimal']
negative_keywords = ['poor', 'bad', 'ineffective', 'failed', 'error', 'wrong', 'incorrect']

response_lower = response.lower()
positive_count = sum(1 for kw in positive_keywords if kw in response_lower)
negative_count = sum(1 for kw in negative_keywords if kw in response_lower)

if positive_count > negative_count and positive_count > 0:
return 0.75 # Likely good
elif negative_count > positive_count and negative_count > 0:
return 0.4 # Likely poor

# Default fallback
logger.warning(f"Could not extract score from response, using default: {DEFAULT_SCORE}")
return DEFAULT_SCORE

def generate_initial_hypotheses(self, task: str) -> List[str]:
"""
Generate an initial set of reasoning hypotheses based on the problem input.
Expand Down Expand Up @@ -110,35 +177,43 @@ def simulate_path(self, path: str) -> Tuple[str, float, str]:
:param path: A candidate reasoning path.
:return: A tuple containing the simulated outcome, a numerical score (0.0 to 1.0), and error information.
"""
logger.info(f"Simulating path: {path}")
logger.info(f"Simulating path: {path[:100]}...")
prompt = (
f"Simulate the following reasoning path step by step and provide:\n"
f"1. Outcome: A brief summary of the resulting solution.\n"
f"2. Score: A numerical effectiveness score between 0.0 and 1.0.\n"
f"2. Score: A numerical effectiveness score between 0.0 and 1.0 (REQUIRED - provide a decimal number).\n"
f"3. Errors: Any potential errors or shortcomings identified during the reasoning.\n\n"
f"IMPORTANT: You MUST provide a score as a decimal number (e.g., 0.8, 0.65, 0.9).\n\n"
f"Reasoning Path: {path}"
)
response = self.agent.run(prompt)
self.conversation.add(
role=self.agent.agent_name, content=response
)

outcome = ""
score = 0.0
error_info = ""
try:
# Expecting a response with lines starting with "Outcome:", "Score:", and "Errors:"
for line in response.splitlines():
if line.startswith("Outcome:"):
outcome = line[len("Outcome:") :].strip()
elif line.startswith("Score:"):
score = float(line[len("Score:") :].strip())
elif line.startswith("Errors:"):
error_info = line[len("Errors:") :].strip()
except Exception as e:
logger.error(f"Error parsing simulation response: {e}")
logger.debug(
f"Simulated outcome: {outcome}, Score: {score}, Errors: {error_info}"
)

# Extract outcome and errors (handle markdown formatting)
for line in response.splitlines():
line_stripped = line.strip().replace('*', '') # Remove markdown
line_lower = line_stripped.lower()

if 'outcome:' in line_lower:
outcome = line_stripped.split(':', 1)[-1].strip()
elif 'errors:' in line_lower or 'error:' in line_lower:
error_info = line_stripped.split(':', 1)[-1].strip()

# Use robust score extraction
score = self._extract_score_robust(response)

# If no explicit errors found, check for error indicators in outcome
if not error_info and outcome:
error_keywords = ['error', 'fail', 'incorrect', 'wrong', 'issue', 'problem']
if any(kw in outcome.lower() for kw in error_keywords):
error_info = "Potential issues identified in outcome"

logger.info(f"Path score: {score:.2f} | Outcome length: {len(outcome)} chars")
return outcome, score, error_info

def meta_reflect(self, error_info: str) -> str:
Expand Down Expand Up @@ -195,24 +270,48 @@ def select_promising_paths(self, paths: List[str]) -> List[str]:
Select the most promising reasoning paths from a list of candidates.

:param paths: A list of candidate reasoning paths.
:return: A pruned list containing the most promising paths.
:return: A pruned list containing the most promising paths (max MAX_PATHS_PER_ITERATION).
"""
logger.info("Selecting promising reasoning paths.")
if not paths:
logger.warning("No paths provided for selection")
return []

# If already within limit, return as is
if len(paths) <= MAX_PATHS_PER_ITERATION:
logger.info(f"Path count ({len(paths)}) within limit, keeping all")
return paths

logger.info(f"Selecting top {MAX_PATHS_PER_ITERATION} from {len(paths)} paths")

# Truncate paths for display to avoid overwhelming the LLM
paths_display = [p[:200] + "..." if len(p) > 200 else p for p in paths]

prompt = (
"Evaluate the following reasoning paths and select the ones that appear most promising for further exploration. "
"List each selected path on a new line:\n"
+ "\n".join(paths)
f"Evaluate the following {len(paths)} reasoning paths and select ONLY the {MAX_PATHS_PER_ITERATION} most promising ones. "
f"Return EXACTLY {MAX_PATHS_PER_ITERATION} paths, each on a new line. Do not add commentary.\n\n"
"Paths:\n"
+ "\n".join(f"{i+1}. {p}" for i, p in enumerate(paths_display))
)
response = self.agent.run(prompt)
self.conversation.add(
role=self.agent.agent_name, content=response
)

selected_paths = [
line.strip()
for line in response.split("\n")
if line.strip()
if line.strip() and not line.strip().startswith('#')
]
logger.debug(f"Selected paths: {selected_paths}")

# Hard limit enforcement - take first MAX_PATHS_PER_ITERATION
selected_paths = selected_paths[:MAX_PATHS_PER_ITERATION]

# If LLM failed to return paths, fall back to first N original paths
if len(selected_paths) < MAX_PATHS_PER_ITERATION:
logger.warning(f"LLM returned only {len(selected_paths)} paths, using first {MAX_PATHS_PER_ITERATION} original paths")
selected_paths = paths[:MAX_PATHS_PER_ITERATION]

logger.info(f"Selected {len(selected_paths)} paths for next iteration")
return selected_paths

def synthesize_solution(
Expand Down Expand Up @@ -250,37 +349,106 @@ def run(self, task: str) -> str:
:return: The final solution generated after iterative reasoning.
"""
logger.info(
f"Starting iterative reflective expansion for problem: {task}"
f"Starting IRE reasoning | Max iterations: {self.max_iterations} | Task: {task[:100]}..."
)

candidate_paths = self.generate_initial_hypotheses(task)
logger.info(f"Generated {len(candidate_paths)} initial hypotheses")

# Limit initial paths
if len(candidate_paths) > MAX_PATHS_PER_ITERATION:
logger.warning(f"Limiting initial paths from {len(candidate_paths)} to {MAX_PATHS_PER_ITERATION}")
candidate_paths = candidate_paths[:MAX_PATHS_PER_ITERATION]

memory_pool: List[str] = []
best_score_overall = 0.0
early_termination = False

for iteration in range(self.max_iterations):
logger.info(
f"Iteration {iteration + 1}/{self.max_iterations}"
f"\n{'='*60}\nIteration {iteration + 1}/{self.max_iterations} | Processing {len(candidate_paths)} paths\n{'='*60}"
)

expanded_paths: List[str] = []
iteration_best_score = 0.0
high_quality_paths = 0

for path in candidate_paths:
for idx, path in enumerate(candidate_paths):
logger.info(f"[Path {idx + 1}/{len(candidate_paths)}] Simulating...")
outcome, score, error_info = self.simulate_path(path)
# Use a threshold score of 0.7 (this can be adjusted)
if score < 0.7:
feedback = self.meta_reflect(error_info)
revised_paths = self.revise_path(path, feedback)
expanded_paths.extend(revised_paths)

# Track best score
iteration_best_score = max(iteration_best_score, score)
best_score_overall = max(best_score_overall, score)

# Check for early termination
if score >= EARLY_TERMINATION_SCORE:
high_quality_paths += 1
logger.info(f"High-quality path found (score: {score:.2f})")
expanded_paths.append(path)

# Early termination if we have excellent solution
if score >= 0.9:
logger.info(f"Excellent solution found (score: {score:.2f})! Triggering early termination.")
expanded_paths = [path] # Use only this path
early_termination = True
break

elif score < SCORE_THRESHOLD:
# Only revise if score is below threshold
logger.info(f"Path scored {score:.2f} (below {SCORE_THRESHOLD}), revising...")
if error_info:
feedback = self.meta_reflect(error_info)
revised_paths = self.revise_path(path, feedback)
# Limit number of revisions per path
revised_paths = revised_paths[:3]
expanded_paths.extend(revised_paths)
logger.info(f"Generated {len(revised_paths)} revised paths")
else:
# No explicit errors, keep original path
expanded_paths.append(path)
else:
# Good enough, keep it
logger.info(f"Path scored {score:.2f}, keeping as-is")
expanded_paths.append(path)

memory_pool.extend(candidate_paths)
candidate_paths = self.select_promising_paths(
expanded_paths
)
logger.info(
f"Candidate paths for next iteration: {candidate_paths}"
f"\nIteration {iteration + 1} Summary:\n"
f" - Paths processed: {len(candidate_paths)}\n"
f" - Expanded to: {len(expanded_paths)} paths\n"
f" - Best score this iteration: {iteration_best_score:.2f}\n"
f" - Best score overall: {best_score_overall:.2f}\n"
f" - High-quality paths: {high_quality_paths}"
)

# Check for early termination
if early_termination:
logger.info("Early termination triggered - excellent solution found")
memory_pool.extend(candidate_paths)
candidate_paths = expanded_paths
break

# If we have multiple high-quality paths, we can stop iterating
if high_quality_paths >= 2 and iteration >= 1:
logger.info(f"Found {high_quality_paths} high-quality paths, stopping iteration")
memory_pool.extend(candidate_paths)
candidate_paths = expanded_paths
break

memory_pool.extend(candidate_paths)

# Select promising paths for next iteration
candidate_paths = self.select_promising_paths(expanded_paths)

# Safety check: if no paths remain, break
if not candidate_paths:
logger.warning("No candidate paths remain, terminating early")
candidate_paths = expanded_paths[:MAX_PATHS_PER_ITERATION] if expanded_paths else []
break

logger.info(f"\n{'='*60}\nSynthesizing final solution from {len(candidate_paths)} paths\n{'='*60}")
self.synthesize_solution(candidate_paths, memory_pool)
logger.info("Final solution generated.")
logger.info("IRE reasoning complete.")

return history_output_formatter(
self.conversation, self.output_type
Expand Down
Loading