Skip to content

EvaluationAgent (name TBD) #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions autogen/agents/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .deep_research import DeepResearchAgent
from .discord import DiscordAgent
from .document_agent import DocAgent, DoclingDocIngestAgent, InMemoryQueryEngine, VectorChromaQueryEngine
from .evaluation import EvaluationAgent
from .reasoning import ReasoningAgent, ThinkNode
from .slack import SlackAgent
from .telegram import TelegramAgent
Expand All @@ -15,6 +16,7 @@
"DiscordAgent",
"DocAgent",
"DoclingDocIngestAgent",
"EvaluationAgent",
"InMemoryQueryEngine",
"ReasoningAgent",
"SlackAgent",
Expand Down
7 changes: 7 additions & 0 deletions autogen/agents/experimental/evaluation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0

from .evaluation_agent import EvaluationAgent

__all__ = ["EvaluationAgent"]
363 changes: 363 additions & 0 deletions autogen/agents/experimental/evaluation/evaluation_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0

import asyncio
from copy import deepcopy
from typing import Any, Optional, Union

from pydantic import BaseModel, Field

from .... import (
Agent,
ConversableAgent,
UpdateSystemMessage,
)
from ....doc_utils import export_module
from ....oai.client import OpenAIWrapper

__all__ = ["EvaluationAgent"]


@export_module("autogen.agents.contrib")
class EvaluationAgent(ConversableAgent):
"""Utilises multiple agents, evaluating their performance then selecting and returning the best one.

The agent follows the internal process:
1. Synthesize the task from the input using an LLM
2. Ask each agent to respond to the task (asynchronously)
3. Evaluator evaluates and selects the response
4. Return the selected response

You must pass in at least two agents.
"""

# Evaluator agent system message, cannot be overridden
DEFAULT_EVALUATOR_MESSAGE = (
"You are responsible for evaluating and selecting the best response from a set of agents. "
"Each agent, identified by a name, will be given a chance to respond. "
"Remember, you are only evaluating and it's not your opinion so don't add your judgement, change responses, or decline to respond. "
"Evaluation Criteria:\n[evaluation_guidance]\n"
"[agent_outputs]"
)

# Default evaluation guidance, can be overridden
DEFAULT_EVALUATON_GUIDANCE = (
"1. Carefully review each approach and result\n"
"2. Evaluate each solution based on criteria appropriate to the task\n"
"3. Select the absolute best response\n"
"4. You must select a response as the best response"
)

# Default reply template for the EvaluationAgent, can be overridden
DEFAULT_REPLY_TEMPLATE = "AGENT '[agent_name]' RESPONSE SELECTED.\n\nREASON:\n[reason]\n\nRESPONSE:\n[response]"

def __init__(
self,
*,
llm_config: dict[str, Any],
agents: list[ConversableAgent],
response_instructions: Optional[str] = None,
evaluation_guidance: Optional[str] = None,
reply_template: Optional[str] = None,
async_responses: bool = True,
silent: bool = False,
**kwargs: Any,
) -> None:
"""Initialize the EvaluationAgent.

Args:
llm_config (dict[str, Any]): LLM Configuration for the internal synthesizer and evaluator agents.
agents (list[ConversableAgent]): List of agents that will provide their responses for evaluation.
response_instructions (str): Instructions for the agents on how to respond to the task. This will be appended to the end of the synthesized task message.
evaluation_guidance (str): Guidance on how to evaluate the agents, used by the internal evaluator agent.
Default is:
"1. Carefully review each approach and result\n2. Evaluate each solution based on criteria appropriate to the task\n3. Select the absolute best response\n4. You must select a response as the best response"
reply_template (str): Template for the reply to be generated by the EvaluationAgent.
Three placeholders are available for substitution: [agent_name], [reason], and [response].
Default is:
"AGENT '[agent_name]' RESPONSE SELECTED.\n\nREASON:\n[reason]\n\nRESPONSE:\n[response]"
async_responses (bool): Whether to gather responses asynchronously. Default is True.
silent (bool): Whether to silence the agent's internal conversations. Default is False meaning all internal conversations will be visible.
**kwargs (Any): Additional keyword arguments to pass to the base class.
"""

assert len(agents) > 1, "EvaluationAgent requires at least two agents for evaluation."
assert llm_config, "EvaluationAgent requires llm_config for the internal synthesizer and evaluator agents."

# Initialise the base class, ignoring llm_config as we'll put that on internal agents
super().__init__(**kwargs)

# Store custom parameters
self._evaluation_agents = agents
self._evaluation_response_instructions = response_instructions
self._evaluation_llm_config = llm_config
self._evaluation_guidance = evaluation_guidance if evaluation_guidance else self.DEFAULT_EVALUATON_GUIDANCE
self._evaluation_reply_template = reply_template if reply_template else self.DEFAULT_REPLY_TEMPLATE
self._evaluation_silent = silent
self._evaluation_async = async_responses

# Register our reply function for evaluation with the agent
# This will be the agent's only reply function
self.register_reply(
trigger=[Agent, None], reply_func=self._generate_evaluate_reply, remove_other_reply_funcs=True
)

# Class used internally to get the string result from a function or, if it fails, what we should return
class FunctionStringResult(BaseModel):
result: str = ""
success: bool = True

# SYNTHESIZING TASK

# Structured Output for the synthesizer agent
class EvaluationTask(BaseModel):
task: str = Field(description="The task to be solved by the agents.")
clarification_needed: Optional[str] = Field(
description="If the task is not clear, describe clarity needed. Only ask if absolutely critical."
)

# Consolidate messages from the outside chat for the synthesizer to determine the task from
def _consolidate_messages(self, messages: Optional[Union[list[dict[str, Any]], str]]) -> str: # type: ignore[type-arg]
"""Consolidates the external chat's messages for the Synthesizer to analyse"""
if isinstance(messages, str):
return messages
elif isinstance(messages, list) and len(messages) > 0:
# Loop through messages and consolidate, taking into account same may be tool calls and some may be tool responses, which we'll ignore.
# If the message has content and name then it should combine as "name:\ncontent\n\n"
consolidated_message = ""
for message in messages:
if "content" in message and "name" in message:
consolidated_message += f"{message['name']}:\n{message['content']}\n\n"
return consolidated_message.strip()
else:
raise NotImplementedError("Invalid messages format. Must be a list of messages or a string.")

def _create_synthesizer(self) -> None:
"""Create the internal synthesizer agent."""

# Add the response_format to the agent
synthesizer_llm_config = deepcopy(self._evaluation_llm_config)
synthesizer_llm_config["response_format"] = EvaluationAgent.EvaluationTask

self._synthesizer_agent = ConversableAgent(
name="evaluationagent_synthesizer",
llm_config=synthesizer_llm_config,
system_message=(
"Analyze the messages and determine the task being asked to be solved and reply with it, keeping it as close to word-to-word as possible. "
"If clarification is needed, provide details on the clarity needed. No other information is being be provided to the respondents."
),
)

def _synthesize_task(self, user_agent: ConversableAgent, messages: list[dict[str, Any]]) -> FunctionStringResult:
"""Synthesize the task from the outside messages."""
self._create_synthesizer()

consolidate_incoming_messages = self._consolidate_messages(messages)

sythesized_result = user_agent.initiate_chat(
recipient=self._synthesizer_agent,
message=consolidate_incoming_messages,
max_turns=1,
silent=self._evaluation_silent,
)

# Evaluate the result of the task synthesis
try:
evaluation_task = EvaluationAgent.EvaluationTask.model_validate_json(sythesized_result.summary)
except Exception as e:
return EvaluationAgent.FunctionStringResult(
result=f"EvaluationAgent was unable to determine the task: {e}", success=False
)

if not evaluation_task.task:
return EvaluationAgent.FunctionStringResult(
result="EvaluationAgent was unable to determine the task.", success=False
)

if evaluation_task.clarification_needed:
return EvaluationAgent.FunctionStringResult(
result=f"I need clarity on the task: {evaluation_task.clarification_needed}", success=False
)

return EvaluationAgent.FunctionStringResult(result=evaluation_task.task)

# GATHER RESPONSES

def _compile_responses_nested_chat(self, task: str) -> list[dict[str, Any]]:
"""Compile the nested chat for the responses part of the evaluation process."""

nested_chats = []

for i, agent in enumerate(self._evaluation_agents):
agent_dict = {
"recipient": agent,
"chat_id": agent.name,
"message": f"Please provide your response to the task:\n\n{task}",
"summary_method": "last_msg",
"max_turns": 1,
# Exclude all chat results before this one from being carried over (so the agent only sees the task)
"finished_chat_indexes_to_exclude_from_carryover": [] if i == 0 else list(range(i)),
}

nested_chats.append(agent_dict)

return nested_chats

def _compile_nested_responses(
self, sender: ConversableAgent, recipient: ConversableAgent, summary_args: dict[str, Any]
) -> str:
response: str = ""
self._evaluation_agent_responses: dict[str, str] = {}

for agent in self._evaluation_agents:
if recipient.chat_messages[agent] and len(recipient.chat_messages[agent]) == 2:
agent_response = recipient.chat_messages[agent][-1]
response += f"AGENT '{agent.name}' RESPONSE:\n{agent_response['content']}\n\n" + "-" * 50 + "\n\n"
self._evaluation_agent_responses[agent.name] = agent_response["content"]
else:
return "" # At least one of the agents didn't respond, abort.

return response

def _gather_responses(self, user_agent: ConversableAgent, task: str) -> FunctionStringResult:
"""Gather responses from all agents for the task."""
gathering_agent = ConversableAgent(
name="evaluation_gather",
)

# Create the nested chats for all the agents to respond
responses_nested_chat = self._compile_responses_nested_chat(task=task)

# Associate that with the gathering_agent
gathering_agent.register_nested_chats(
chat_queue=responses_nested_chat,
position=0,
use_async=self._evaluation_async,
trigger=Agent, # Any agent sender will trigger this
)

if self._evaluation_async:
# Asynchronously get the responses
responses_result = asyncio.run(
user_agent.a_initiate_chat(
recipient=gathering_agent,
max_turns=1,
message="", # Prevent it trying to get user input
silent=self._evaluation_silent,
summary_method=self._compile_nested_responses,
)
)
else:
# Synchronously get the responses
responses_result = user_agent.initiate_chat(
recipient=gathering_agent,
max_turns=1,
message="", # Prevent it trying to get user input
silent=self._evaluation_silent,
summary_method=self._compile_nested_responses,
)

if responses_result.summary == "":
return EvaluationAgent.FunctionStringResult(
result="EvaluationAgent was unable to gather responses from all agents.", success=False
)

# Compiled responses
return EvaluationAgent.FunctionStringResult(result=responses_result.summary)

# EVALUATOR AGENT

# Structured Output for the evaluator agent
class NominatedResponse(BaseModel):
agent_name: str = Field(description="Name of agent that provided the response.")
response: str = Field(description="Exact, word-for-word, response selected.")
reason: str = Field(description="Brief reason why it was the best response.")

def _generate_evaluator_system_message(self, agent: ConversableAgent, messages: list[dict[str, Any]]) -> str:
"""Generate the system message for the internal evaluator agent."""
# Substitute the evaluation guidance into the system message
return EvaluationAgent.DEFAULT_EVALUATOR_MESSAGE.replace("[evaluation_guidance]", self._evaluation_guidance)

def _create_evaluator(self) -> None:
"""Create the internal evaluator agent."""

# Add the response_format to the agent
evaluator_llm_config = deepcopy(self._evaluation_llm_config)
evaluator_llm_config["response_format"] = EvaluationAgent.NominatedResponse

self._evaluator_agent = ConversableAgent(
name="evaluationagent_evaluator",
llm_config=evaluator_llm_config,
update_agent_state_before_reply=[UpdateSystemMessage(self._generate_evaluator_system_message)],
)

# Inner evaluation process
def _generate_evaluate_reply(
self,
agent: ConversableAgent,
messages: Optional[list[dict[str, Any]]] = None,
sender: Optional[Agent] = None,
config: Optional[OpenAIWrapper] = None,
) -> tuple[bool, Union[str, dict[str, Any]]]:
if not messages:
return True, {"content": "EvaluationAgent requires messages to evaluate, please reply with a task."}

# Supplemental agent used for chatting with internal agents
user_agent = ConversableAgent(
name="evaluation_user",
human_input_mode="NEVER",
)

# 1. Synthesize the task from the input
synthesized_task = self._synthesize_task(user_agent, messages)

if not synthesized_task.success:
return True, {"content": synthesized_task.result}

task = synthesized_task.result

if self._evaluation_response_instructions:
task += f"\n\n{self._evaluation_response_instructions}"

# 2. Each agent gives their response using an asynchronous nested chat
gather_compiled_responses = self._gather_responses(user_agent, task)

if not gather_compiled_responses.success:
return True, {"content": gather_compiled_responses.result}

compiled_responses = gather_compiled_responses.result

# 3. Evaluator evaluates and selects the response
self._create_evaluator()

evaluation = user_agent.initiate_chat(
recipient=self._evaluator_agent, message=compiled_responses, max_turns=1, silent=self._evaluation_silent
)

# Extract the nominated response
try:
nominated_response = EvaluationAgent.NominatedResponse.model_validate_json(evaluation.summary)
except Exception as e:
return True, {"content": f"EvaluationAgent was unable to select the best response: {e}"}

if not nominated_response.response:
return True, {"content": "EvaluationAgent was unable to select a response."}

# Ensure the nominated agent name exists
if nominated_response.agent_name not in [a.name for a in self._evaluation_agents]:
return True, {"content": "EvaluationAgent provided an invalid agent name when selecting a response."}

# We'll get the response from the agent's original response, rather than this structured output one
# so that we can ensure it remains as it was originally
agent_response = self._evaluation_agent_responses[nominated_response.agent_name]

# Compile the response and return it using the self._evaluation_reply_template
compiled_reply = (
self._evaluation_reply_template.replace("[agent_name]", nominated_response.agent_name)
.replace("[reason]", nominated_response.reason)
.replace("[response]", agent_response)
)

# 4. Return the selected response in the specified compiled format
return True, compiled_reply
Loading