Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion anus/core/agent/hybrid_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,52 @@ def __init__(
"executor": ToolAgent(name="executor", tools=tools),
"critic": ToolAgent(name="critic", tools=tools)
}

def add_specialized_agent(self, role: str, config: Dict[str, Any]) -> bool:
"""
Add a specialized agent for a specific role.

Args:
role: The role of the specialized agent.
config: Configuration for the specialized agent.

Returns:
True if the agent was successfully added, False otherwise.
"""
try:
# Create a new ToolAgent with the given configuration
agent_name = config.get("name", f"{role}-agent")
agent = ToolAgent(
name=agent_name,
max_iterations=config.get("max_iterations", self.max_iterations),
tools=config.get("tools", []),
**config.get("kwargs", {})
)

# Register the specialized agent
self.specialized_agents[role] = agent

self.log_action("add_specialized_agent", {"role": role, "agent_name": agent_name})

# Add easter egg log message based on role
if role == "researcher":
logging.debug(f"Added a researcher agent to probe deep into any subject matter")
elif role == "coder":
logging.debug(f"Added a coder agent to handle the backend implementation")
elif role == "planner":
logging.debug(f"Added a planner agent to ensure smooth passage through complex tasks")
elif role == "critic":
logging.debug(f"Added a critic agent to ensure everything comes out right in the end")
else:
logging.debug(f"Added a {role} agent to the ANUS collective")
Comment on lines +73 to +83
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to add dict of roles and descriptions/logs as function arguments and we can remove hardcoded roles :-)


return True

except Exception as e:
error_message = f"Error adding specialized agent for role {role}: {str(e)}"
logging.error(error_message)
self.log_action("add_specialized_agent", {"role": role, "status": "error", "error": error_message})
return False

def _assess_complexity(self, task: str) -> float:
"""
Expand All @@ -69,6 +115,8 @@ def _assess_complexity(self, task: str) -> float:
(r'all|every|each', 1.0), # Comprehensive operations
(r'most|best|optimal', 1.5) # Decision making
]



# Add complexity for each operation found
for pattern, score in operations:
Expand Down Expand Up @@ -192,4 +240,4 @@ def _execute_multi_agent(self, task: str, **kwargs) -> Dict[str, Any]:
"answer": final_result.get("answer", str(final_result)),
"agent_results": results,
"mode": "multi"
}
}