Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deepeval/integrations/ag2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .handler import instrument_ag2, reset_ag2_instrumentation

__all__ = [
"instrument_ag2",
"reset_ag2_instrumentation",
]
100 changes: 100 additions & 0 deletions deepeval/integrations/ag2/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
from typing import Optional

import deepeval
from deepeval.config.settings import get_settings
from deepeval.telemetry import capture_tracing_integration

logger = logging.getLogger(__name__)

try:
from autogen import ConversableAgent # noqa: F401
from autogen.oai.client import OpenAIWrapper # noqa: F401

ag2_installed = True
except ImportError as e:
if get_settings().DEEPEVAL_VERBOSE_MODE:
if isinstance(e, ModuleNotFoundError):
logger.warning(
"Optional ag2 dependency not installed: %s",
e.name,
stacklevel=2,
)
else:
logger.warning(
"Optional ag2 import failed: %s",
e,
stacklevel=2,
)
ag2_installed = False

IS_WRAPPED_ALL = False


def is_ag2_installed():
if not ag2_installed:
raise ImportError(
"AG2 is not installed. Please install it with `pip install ag2[openai]`."
)


def instrument_ag2(api_key: Optional[str] = None):
"""Instrument AG2 agents to capture traces for DeepEval evaluation.

Call this before running any AG2 conversations. It patches
ConversableAgent methods to capture LLM calls, tool executions,
and agent interactions as DeepEval trace spans.

Args:
api_key: Optional Confident AI API key for cloud tracing.
If not provided, uses DEEPEVAL_API_KEY env var.

Example:
from deepeval.integrations.ag2 import instrument_ag2
instrument_ag2()

# Now run your AG2 agents as usual - traces are captured automatically
executor.run(assistant, message="...").process()
"""
is_ag2_installed()

with capture_tracing_integration("ag2"):
if api_key:
deepeval.login(api_key)
wrap_all()


def reset_ag2_instrumentation():
"""Remove AG2 instrumentation and restore original methods."""
global IS_WRAPPED_ALL

if not IS_WRAPPED_ALL:
return

from deepeval.integrations.ag2.wrapper import (
unwrap_all,
)

unwrap_all()
IS_WRAPPED_ALL = False


def wrap_all():
global IS_WRAPPED_ALL

if not IS_WRAPPED_ALL:
from deepeval.integrations.ag2.wrapper import (
wrap_generate_reply,
wrap_a_generate_reply,
wrap_execute_function,
wrap_a_execute_function,
wrap_openai_wrapper_create,
)

wrap_generate_reply()
wrap_a_generate_reply()
wrap_execute_function()
wrap_a_execute_function()
wrap_openai_wrapper_create()

IS_WRAPPED_ALL = True
232 changes: 232 additions & 0 deletions deepeval/integrations/ag2/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import json
import logging
from functools import wraps

from autogen import ConversableAgent
from autogen.oai.client import OpenAIWrapper

from deepeval.tracing.tracing import Observer, trace_manager
from deepeval.tracing.types import LlmSpan, ToolSpan, TraceSpanStatus

logger = logging.getLogger(__name__)

_ORIGINAL_METHODS = {}


def wrap_generate_reply():
original = ConversableAgent.generate_reply
_ORIGINAL_METHODS["generate_reply"] = original

@wraps(original)
def wrapper(self, *args, **kwargs):
agent_name = getattr(self, "name", "unknown_agent")

with Observer(
span_type="agent",
func_name=agent_name,
observe_kwargs={"name": agent_name},
) as observer:
result = original(self, *args, **kwargs)
observer.result = str(result) if result else None

return result

ConversableAgent.generate_reply = wrapper


def wrap_a_generate_reply():
original = ConversableAgent.a_generate_reply
_ORIGINAL_METHODS["a_generate_reply"] = original

@wraps(original)
async def wrapper(self, *args, **kwargs):
agent_name = getattr(self, "name", "unknown_agent")

with Observer(
span_type="agent",
func_name=agent_name,
observe_kwargs={"name": agent_name},
) as observer:
result = await original(self, *args, **kwargs)
observer.result = str(result) if result else None

return result

ConversableAgent.a_generate_reply = wrapper


def wrap_execute_function():
original = ConversableAgent.execute_function
_ORIGINAL_METHODS["execute_function"] = original

@wraps(original)
def wrapper(self, func_call, call_id=None, verbose=False):
func_name = func_call.get("name", "unknown_tool")
raw_args = func_call.get("arguments", "{}")
try:
tool_input = (
json.loads(raw_args) if isinstance(raw_args, str) else raw_args
)
except (json.JSONDecodeError, TypeError):
tool_input = {"raw": str(raw_args)}

exec_failed = False

with Observer(
span_type="tool",
func_name=func_name,
observe_kwargs={"name": func_name},
function_kwargs=tool_input,
) as observer:
is_exec_success, result_dict = original(
self, func_call, call_id=call_id, verbose=verbose
)

content = result_dict.get("content", "")
observer.result = content
exec_failed = not is_exec_success

span = trace_manager.get_span_by_uuid(observer.uuid)
if span and isinstance(span, ToolSpan):
span.input = tool_input
span.output = str(content)[:2000] if content else ""

def _update_on_failure(s):
if exec_failed:
s.status = TraceSpanStatus.ERRORED

observer.update_span_properties = _update_on_failure

return is_exec_success, result_dict

ConversableAgent.execute_function = wrapper


def wrap_a_execute_function():
original = ConversableAgent.a_execute_function
_ORIGINAL_METHODS["a_execute_function"] = original

@wraps(original)
async def wrapper(self, func_call, call_id=None, verbose=False):
func_name = func_call.get("name", "unknown_tool")
raw_args = func_call.get("arguments", "{}")
try:
tool_input = (
json.loads(raw_args) if isinstance(raw_args, str) else raw_args
)
except (json.JSONDecodeError, TypeError):
tool_input = {"raw": str(raw_args)}

exec_failed = False

with Observer(
span_type="tool",
func_name=func_name,
observe_kwargs={"name": func_name},
function_kwargs=tool_input,
) as observer:
is_exec_success, result_dict = await original(
self, func_call, call_id=call_id, verbose=verbose
)

content = result_dict.get("content", "")
observer.result = content
exec_failed = not is_exec_success

span = trace_manager.get_span_by_uuid(observer.uuid)
if span and isinstance(span, ToolSpan):
span.input = tool_input
span.output = str(content)[:2000] if content else ""

def _update_on_failure(s):
if exec_failed:
s.status = TraceSpanStatus.ERRORED

observer.update_span_properties = _update_on_failure

return is_exec_success, result_dict

ConversableAgent.a_execute_function = wrapper


def wrap_openai_wrapper_create():
original = OpenAIWrapper.create
_ORIGINAL_METHODS["openai_wrapper_create"] = original

@wraps(original)
def wrapper(self, **config):
messages = config.get("messages", None)
model = None
config_list = getattr(self, "_config_list", None)
if config_list:
first = config_list[0]
if isinstance(first, dict):
model = first.get("model", None)
else:
model = getattr(first, "model", None)

with Observer(
span_type="llm",
func_name="llm_call",
observe_kwargs={"model": model},
) as observer:
response = original(self, **config)
observer.result = None

span = trace_manager.get_span_by_uuid(observer.uuid)
if span and isinstance(span, LlmSpan):
if messages:
span.input = messages

# Extract model name from response
response_model = getattr(response, "model", None)
if response_model:
span.model = response_model

# Extract output text
try:
extracted = self.extract_text_or_completion_object(response)
if extracted:
output = extracted[0]
if hasattr(output, "model_dump"):
span.output = output.model_dump()
else:
span.output = str(output)
except Exception:
pass

# Extract token usage
usage = getattr(response, "usage", None)
if usage:
span.input_token_count = getattr(
usage, "prompt_tokens", None
)
span.output_token_count = getattr(
usage, "completion_tokens", None
)

return response

OpenAIWrapper.create = wrapper


def unwrap_all():
"""Restore all original methods."""
if "generate_reply" in _ORIGINAL_METHODS:
ConversableAgent.generate_reply = _ORIGINAL_METHODS["generate_reply"]
if "a_generate_reply" in _ORIGINAL_METHODS:
ConversableAgent.a_generate_reply = _ORIGINAL_METHODS[
"a_generate_reply"
]
if "execute_function" in _ORIGINAL_METHODS:
ConversableAgent.execute_function = _ORIGINAL_METHODS[
"execute_function"
]
if "a_execute_function" in _ORIGINAL_METHODS:
ConversableAgent.a_execute_function = _ORIGINAL_METHODS[
"a_execute_function"
]
if "openai_wrapper_create" in _ORIGINAL_METHODS:
OpenAIWrapper.create = _ORIGINAL_METHODS["openai_wrapper_create"]

_ORIGINAL_METHODS.clear()
12 changes: 4 additions & 8 deletions deepeval/metrics/arena_g_eval/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def generate_arena_winner(
"Be specific and grounded in the evaluation steps."
)

return textwrap.dedent(
f"""
return textwrap.dedent(f"""
You are a judge. Given the following evaluation steps, select the single contestant that best aligns with the evaluation steps.

{ArenaGEvalTemplate.multimodal_rules if multimodal else ""}
Expand Down Expand Up @@ -88,16 +87,14 @@ def generate_arena_winner(
}}

JSON:
"""
)
""")

@staticmethod
def rewrite_reason(
reason: str,
dummy_to_real_names: Dict[str, str],
):
return textwrap.dedent(
f"""
return textwrap.dedent(f"""
Given the following reason that explains which contestant is the winner, rewrite the reason to REPLACE all contestant names with their real names.

The contestant names are wrapped in $name$ format (e.g., $Alice$, $Bob$, $Charlie$).
Expand Down Expand Up @@ -129,5 +126,4 @@ def rewrite_reason(
}}

JSON:
"""
)
""")
6 changes: 2 additions & 4 deletions deepeval/metrics/argument_correctness/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ def generate_verdicts(

stringified_tools_called = repr(tools_called)

return textwrap.dedent(
f"""
return textwrap.dedent(f"""
For the provided list of tool calls, determine whether each tool call input parameter is relevantly and correctly addresses the input.

Please generate a list of JSON with two keys: `verdict` and `reason`.
Expand Down Expand Up @@ -99,8 +98,7 @@ def generate_verdicts(
{stringified_tools_called}

JSON:
"""
)
""")

@staticmethod
def generate_reason(
Expand Down
Loading
Loading