-
Notifications
You must be signed in to change notification settings - Fork 307
/
Copy pathevaluation_agent.py
363 lines (294 loc) · 16.3 KB
/
evaluation_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
from copy import deepcopy
from typing import Any, Optional, Union
from pydantic import BaseModel, Field
from .... import (
Agent,
ConversableAgent,
UpdateSystemMessage,
)
from ....doc_utils import export_module
from ....oai.client import OpenAIWrapper
__all__ = ["EvaluationAgent"]
@export_module("autogen.agents.contrib")
class EvaluationAgent(ConversableAgent):
"""Utilises multiple agents, evaluating their performance then selecting and returning the best one.
The agent follows the internal process:
1. Synthesize the task from the input using an LLM
2. Ask each agent to respond to the task (asynchronously)
3. Evaluator evaluates and selects the response
4. Return the selected response
You must pass in at least two agents.
"""
# Evaluator agent system message, cannot be overridden
DEFAULT_EVALUATOR_MESSAGE = (
"You are responsible for evaluating and selecting the best response from a set of agents. "
"Each agent, identified by a name, will be given a chance to respond. "
"Remember, you are only evaluating and it's not your opinion so don't add your judgement, change responses, or decline to respond. "
"Evaluation Criteria:\n[evaluation_guidance]\n"
"[agent_outputs]"
)
# Default evaluation guidance, can be overridden
DEFAULT_EVALUATON_GUIDANCE = (
"1. Carefully review each approach and result\n"
"2. Evaluate each solution based on criteria appropriate to the task\n"
"3. Select the absolute best response\n"
"4. You must select a response as the best response"
)
# Default reply template for the EvaluationAgent, can be overridden
DEFAULT_REPLY_TEMPLATE = "AGENT '[agent_name]' RESPONSE SELECTED.\n\nREASON:\n[reason]\n\nRESPONSE:\n[response]"
def __init__(
self,
*,
llm_config: dict[str, Any],
agents: list[ConversableAgent],
response_instructions: Optional[str] = None,
evaluation_guidance: Optional[str] = None,
reply_template: Optional[str] = None,
async_responses: bool = True,
silent: bool = False,
**kwargs: Any,
) -> None:
"""Initialize the EvaluationAgent.
Args:
llm_config (dict[str, Any]): LLM Configuration for the internal synthesizer and evaluator agents.
agents (list[ConversableAgent]): List of agents that will provide their responses for evaluation.
response_instructions (str): Instructions for the agents on how to respond to the task. This will be appended to the end of the synthesized task message.
evaluation_guidance (str): Guidance on how to evaluate the agents, used by the internal evaluator agent.
Default is:
"1. Carefully review each approach and result\n2. Evaluate each solution based on criteria appropriate to the task\n3. Select the absolute best response\n4. You must select a response as the best response"
reply_template (str): Template for the reply to be generated by the EvaluationAgent.
Three placeholders are available for substitution: [agent_name], [reason], and [response].
Default is:
"AGENT '[agent_name]' RESPONSE SELECTED.\n\nREASON:\n[reason]\n\nRESPONSE:\n[response]"
async_responses (bool): Whether to gather responses asynchronously. Default is True.
silent (bool): Whether to silence the agent's internal conversations. Default is False meaning all internal conversations will be visible.
**kwargs (Any): Additional keyword arguments to pass to the base class.
"""
assert len(agents) > 1, "EvaluationAgent requires at least two agents for evaluation."
assert llm_config, "EvaluationAgent requires llm_config for the internal synthesizer and evaluator agents."
# Initialise the base class, ignoring llm_config as we'll put that on internal agents
super().__init__(**kwargs)
# Store custom parameters
self._evaluation_agents = agents
self._evaluation_response_instructions = response_instructions
self._evaluation_llm_config = llm_config
self._evaluation_guidance = evaluation_guidance if evaluation_guidance else self.DEFAULT_EVALUATON_GUIDANCE
self._evaluation_reply_template = reply_template if reply_template else self.DEFAULT_REPLY_TEMPLATE
self._evaluation_silent = silent
self._evaluation_async = async_responses
# Register our reply function for evaluation with the agent
# This will be the agent's only reply function
self.register_reply(
trigger=[Agent, None], reply_func=self._generate_evaluate_reply, remove_other_reply_funcs=True
)
# Class used internally to get the string result from a function or, if it fails, what we should return
class FunctionStringResult(BaseModel):
result: str = ""
success: bool = True
# SYNTHESIZING TASK
# Structured Output for the synthesizer agent
class EvaluationTask(BaseModel):
task: str = Field(description="The task to be solved by the agents.")
clarification_needed: Optional[str] = Field(
description="If the task is not clear, describe clarity needed. Only ask if absolutely critical."
)
# Consolidate messages from the outside chat for the synthesizer to determine the task from
def _consolidate_messages(self, messages: Optional[Union[list[dict[str, Any]], str]]) -> str: # type: ignore[type-arg]
"""Consolidates the external chat's messages for the Synthesizer to analyse"""
if isinstance(messages, str):
return messages
elif isinstance(messages, list) and len(messages) > 0:
# Loop through messages and consolidate, taking into account same may be tool calls and some may be tool responses, which we'll ignore.
# If the message has content and name then it should combine as "name:\ncontent\n\n"
consolidated_message = ""
for message in messages:
if "content" in message and "name" in message:
consolidated_message += f"{message['name']}:\n{message['content']}\n\n"
return consolidated_message.strip()
else:
raise NotImplementedError("Invalid messages format. Must be a list of messages or a string.")
def _create_synthesizer(self) -> None:
"""Create the internal synthesizer agent."""
# Add the response_format to the agent
synthesizer_llm_config = deepcopy(self._evaluation_llm_config)
synthesizer_llm_config["response_format"] = EvaluationAgent.EvaluationTask
self._synthesizer_agent = ConversableAgent(
name="evaluationagent_synthesizer",
llm_config=synthesizer_llm_config,
system_message=(
"Analyze the messages and determine the task being asked to be solved and reply with it, keeping it as close to word-to-word as possible. "
"If clarification is needed, provide details on the clarity needed. No other information is being be provided to the respondents."
),
)
def _synthesize_task(self, user_agent: ConversableAgent, messages: list[dict[str, Any]]) -> FunctionStringResult:
"""Synthesize the task from the outside messages."""
self._create_synthesizer()
consolidate_incoming_messages = self._consolidate_messages(messages)
sythesized_result = user_agent.initiate_chat(
recipient=self._synthesizer_agent,
message=consolidate_incoming_messages,
max_turns=1,
silent=self._evaluation_silent,
)
# Evaluate the result of the task synthesis
try:
evaluation_task = EvaluationAgent.EvaluationTask.model_validate_json(sythesized_result.summary)
except Exception as e:
return EvaluationAgent.FunctionStringResult(
result=f"EvaluationAgent was unable to determine the task: {e}", success=False
)
if not evaluation_task.task:
return EvaluationAgent.FunctionStringResult(
result="EvaluationAgent was unable to determine the task.", success=False
)
if evaluation_task.clarification_needed:
return EvaluationAgent.FunctionStringResult(
result=f"I need clarity on the task: {evaluation_task.clarification_needed}", success=False
)
return EvaluationAgent.FunctionStringResult(result=evaluation_task.task)
# GATHER RESPONSES
def _compile_responses_nested_chat(self, task: str) -> list[dict[str, Any]]:
"""Compile the nested chat for the responses part of the evaluation process."""
nested_chats = []
for i, agent in enumerate(self._evaluation_agents):
agent_dict = {
"recipient": agent,
"chat_id": agent.name,
"message": f"Please provide your response to the task:\n\n{task}",
"summary_method": "last_msg",
"max_turns": 1,
# Exclude all chat results before this one from being carried over (so the agent only sees the task)
"finished_chat_indexes_to_exclude_from_carryover": [] if i == 0 else list(range(i)),
}
nested_chats.append(agent_dict)
return nested_chats
def _compile_nested_responses(
self, sender: ConversableAgent, recipient: ConversableAgent, summary_args: dict[str, Any]
) -> str:
response: str = ""
self._evaluation_agent_responses: dict[str, str] = {}
for agent in self._evaluation_agents:
if recipient.chat_messages[agent] and len(recipient.chat_messages[agent]) == 2:
agent_response = recipient.chat_messages[agent][-1]
response += f"AGENT '{agent.name}' RESPONSE:\n{agent_response['content']}\n\n" + "-" * 50 + "\n\n"
self._evaluation_agent_responses[agent.name] = agent_response["content"]
else:
return "" # At least one of the agents didn't respond, abort.
return response
def _gather_responses(self, user_agent: ConversableAgent, task: str) -> FunctionStringResult:
"""Gather responses from all agents for the task."""
gathering_agent = ConversableAgent(
name="evaluation_gather",
)
# Create the nested chats for all the agents to respond
responses_nested_chat = self._compile_responses_nested_chat(task=task)
# Associate that with the gathering_agent
gathering_agent.register_nested_chats(
chat_queue=responses_nested_chat,
position=0,
use_async=self._evaluation_async,
trigger=Agent, # Any agent sender will trigger this
)
if self._evaluation_async:
# Asynchronously get the responses
responses_result = asyncio.run(
user_agent.a_initiate_chat(
recipient=gathering_agent,
max_turns=1,
message="", # Prevent it trying to get user input
silent=self._evaluation_silent,
summary_method=self._compile_nested_responses,
)
)
else:
# Synchronously get the responses
responses_result = user_agent.initiate_chat(
recipient=gathering_agent,
max_turns=1,
message="", # Prevent it trying to get user input
silent=self._evaluation_silent,
summary_method=self._compile_nested_responses,
)
if responses_result.summary == "":
return EvaluationAgent.FunctionStringResult(
result="EvaluationAgent was unable to gather responses from all agents.", success=False
)
# Compiled responses
return EvaluationAgent.FunctionStringResult(result=responses_result.summary)
# EVALUATOR AGENT
# Structured Output for the evaluator agent
class NominatedResponse(BaseModel):
agent_name: str = Field(description="Name of agent that provided the response.")
response: str = Field(description="Exact, word-for-word, response selected.")
reason: str = Field(description="Brief reason why it was the best response.")
def _generate_evaluator_system_message(self, agent: ConversableAgent, messages: list[dict[str, Any]]) -> str:
"""Generate the system message for the internal evaluator agent."""
# Substitute the evaluation guidance into the system message
return EvaluationAgent.DEFAULT_EVALUATOR_MESSAGE.replace("[evaluation_guidance]", self._evaluation_guidance)
def _create_evaluator(self) -> None:
"""Create the internal evaluator agent."""
# Add the response_format to the agent
evaluator_llm_config = deepcopy(self._evaluation_llm_config)
evaluator_llm_config["response_format"] = EvaluationAgent.NominatedResponse
self._evaluator_agent = ConversableAgent(
name="evaluationagent_evaluator",
llm_config=evaluator_llm_config,
update_agent_state_before_reply=[UpdateSystemMessage(self._generate_evaluator_system_message)],
)
# Inner evaluation process
def _generate_evaluate_reply(
self,
agent: ConversableAgent,
messages: Optional[list[dict[str, Any]]] = None,
sender: Optional[Agent] = None,
config: Optional[OpenAIWrapper] = None,
) -> tuple[bool, Union[str, dict[str, Any]]]:
if not messages:
return True, {"content": "EvaluationAgent requires messages to evaluate, please reply with a task."}
# Supplemental agent used for chatting with internal agents
user_agent = ConversableAgent(
name="evaluation_user",
human_input_mode="NEVER",
)
# 1. Synthesize the task from the input
synthesized_task = self._synthesize_task(user_agent, messages)
if not synthesized_task.success:
return True, {"content": synthesized_task.result}
task = synthesized_task.result
if self._evaluation_response_instructions:
task += f"\n\n{self._evaluation_response_instructions}"
# 2. Each agent gives their response using an asynchronous nested chat
gather_compiled_responses = self._gather_responses(user_agent, task)
if not gather_compiled_responses.success:
return True, {"content": gather_compiled_responses.result}
compiled_responses = gather_compiled_responses.result
# 3. Evaluator evaluates and selects the response
self._create_evaluator()
evaluation = user_agent.initiate_chat(
recipient=self._evaluator_agent, message=compiled_responses, max_turns=1, silent=self._evaluation_silent
)
# Extract the nominated response
try:
nominated_response = EvaluationAgent.NominatedResponse.model_validate_json(evaluation.summary)
except Exception as e:
return True, {"content": f"EvaluationAgent was unable to select the best response: {e}"}
if not nominated_response.response:
return True, {"content": "EvaluationAgent was unable to select a response."}
# Ensure the nominated agent name exists
if nominated_response.agent_name not in [a.name for a in self._evaluation_agents]:
return True, {"content": "EvaluationAgent provided an invalid agent name when selecting a response."}
# We'll get the response from the agent's original response, rather than this structured output one
# so that we can ensure it remains as it was originally
agent_response = self._evaluation_agent_responses[nominated_response.agent_name]
# Compile the response and return it using the self._evaluation_reply_template
compiled_reply = (
self._evaluation_reply_template.replace("[agent_name]", nominated_response.agent_name)
.replace("[reason]", nominated_response.reason)
.replace("[response]", agent_response)
)
# 4. Return the selected response in the specified compiled format
return True, compiled_reply