diff --git a/sdk/ai/azure-ai-projects/README.md b/sdk/ai/azure-ai-projects/README.md index c668b8ea19d0..5878b014d9da 100644 --- a/sdk/ai/azure-ai-projects/README.md +++ b/sdk/ai/azure-ai-projects/README.md @@ -1484,4 +1484,4 @@ additional questions or comments. [azure_sub]: https://azure.microsoft.com/free/ [evaluators]: https://learn.microsoft.com/azure/ai-studio/how-to/develop/evaluate-sdk [azure_ai_evaluation]: https://learn.microsoft.com/python/api/overview/azure/ai-evaluation-readme -[evaluator_library]: https://learn.microsoft.com/azure/ai-studio/how-to/evaluate-generative-ai-app#view-and-manage-the-evaluators-in-the-evaluator-library \ No newline at end of file +[evaluator_library]: https://learn.microsoft.com/azure/ai-studio/how-to/evaluate-generative-ai-app#view-and-manage-the-evaluators-in-the-evaluator-library diff --git a/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_ai_agents_instrumentor.py b/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_ai_agents_instrumentor.py index 2d4dcb1d1b54..2cab106a6f8b 100644 --- a/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_ai_agents_instrumentor.py +++ b/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_ai_agents_instrumentor.py @@ -9,6 +9,7 @@ import json import logging import os +from datetime import datetime from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast from urllib.parse import urlparse @@ -23,9 +24,11 @@ RequiredFunctionToolCall, RunStep, RunStepDeltaChunk, + RunStepError, RunStepFunctionToolCall, RunStepToolCallDetails, - SubmitToolOutputsAction, + RunStepCodeInterpreterToolCall, + RunStepBingGroundingToolCall, ThreadMessage, ThreadRun, ToolDefinition, @@ -50,6 +53,12 @@ GEN_AI_THREAD_RUN_STATUS, GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, + GEN_AI_RUNSTEP_START_TIMESTAMP, + GEN_AI_RUNSTEP_END_TIMESTAMP, + GEN_AI_RUNSTEP_CANCEL_TIMESTAMP, + GEN_AI_RUNSTEP_FAIL_TIMESTAMP, + GEN_AI_RUN_STEP_STATUS, + ERROR_MESSAGE, OperationName, start_span, ) @@ -254,6 +263,12 @@ def _create_event_attributes( thread_run_id: Optional[str] = None, message_id: Optional[str] = None, message_status: Optional[str] = None, + run_step_status: Optional[str] = None, + created_at: Optional[datetime] = None, + completed_at: Optional[datetime] = None, + cancelled_at: Optional[datetime] = None, + failed_at: Optional[datetime] = None, + run_step_last_error: Optional[RunStepError] = None, usage: Optional[_models.RunStepCompletionUsage] = None, ) -> Dict[str, Any]: attrs: Dict[str, Any] = {GEN_AI_SYSTEM: AZ_AI_AGENT_SYSTEM} @@ -272,6 +287,41 @@ def _create_event_attributes( if message_status: attrs[GEN_AI_MESSAGE_STATUS] = self._status_to_string(message_status) + if run_step_status: + attrs[GEN_AI_RUN_STEP_STATUS] = self._status_to_string(run_step_status) + + if created_at: + if isinstance(created_at, datetime): + attrs[GEN_AI_RUNSTEP_START_TIMESTAMP] = created_at.isoformat() + else: + # fallback in case int or string gets passed + attrs[GEN_AI_RUNSTEP_START_TIMESTAMP] = str(created_at) + + if completed_at: + if isinstance(completed_at, datetime): + attrs[GEN_AI_RUNSTEP_END_TIMESTAMP] = completed_at.isoformat() + else: + # fallback in case int or string gets passed + attrs[GEN_AI_RUNSTEP_END_TIMESTAMP] = str(completed_at) + + if cancelled_at: + if isinstance(cancelled_at, datetime): + attrs[GEN_AI_RUNSTEP_CANCEL_TIMESTAMP] = cancelled_at.isoformat() + else: + # fallback in case int or string gets passed + attrs[GEN_AI_RUNSTEP_CANCEL_TIMESTAMP] = str(cancelled_at) + + if failed_at: + if isinstance(failed_at, datetime): + attrs[GEN_AI_RUNSTEP_FAIL_TIMESTAMP] = failed_at.isoformat() + else: + # fallback in case int or string gets passed + attrs[GEN_AI_RUNSTEP_FAIL_TIMESTAMP] = failed_at.isoformat() + + if run_step_last_error: + attrs[ERROR_MESSAGE] = run_step_last_error.message + attrs[ERROR_TYPE] = run_step_last_error.code + if usage: attrs[GEN_AI_USAGE_INPUT_TOKENS] = usage.prompt_tokens attrs[GEN_AI_USAGE_OUTPUT_TOKENS] = usage.completion_tokens @@ -306,6 +356,159 @@ def add_thread_message_event( usage=usage, ) + def _get_tool_details(self, tool: Any) -> Dict[str, Any]: + """ + Extracts tool details from a tool object dynamically and ensures the result is JSON-serializable. + + :param tool: The tool object (e.g., RunStepToolCallDetails). + :return: A dictionary containing the tool details. + """ + tool_details = {} + + # Dynamically extract attributes from the tool object + for attr in dir(tool): + # Skip private or special attributes + if not attr.startswith("_") and not callable(getattr(tool, attr)): + try: + value = getattr(tool, attr) + # Ensure the value is JSON-serializable + if isinstance(value, (str, int, float, bool, list, dict, type(None))): + tool_details[attr] = value + else: + # Convert non-serializable objects to strings + tool_details[attr] = str(value) + except Exception as e: + logging.warning("Error extracting attribute '%s': %s", attr, str(e)) + + return tool_details + + def _process_tool_calls(self, step: RunStep) -> List[Dict[str, Any]]: + """ + Helper method to process tool calls and return a list of tool call dictionaries. + """ + tool_calls = [] + for t in cast(RunStepToolCallDetails, step.step_details).tool_calls: + if not _trace_agents_content: + tool_call = { + "id": t.id, + "type": t.type, + } + elif isinstance(t, RunStepFunctionToolCall): + try: + parsed_arguments = json.loads(t.function.arguments) + except json.JSONDecodeError: + parsed_arguments = {} + + tool_call = { + "id": t.id, + "type": t.type, + "function": { + "name": t.function.name, + "arguments": parsed_arguments, + }, + } + elif isinstance(t, RunStepCodeInterpreterToolCall): + try: + parsed_outputs = json.dumps(t.code_interpreter.outputs) + except Exception as e: + logging.warning("Error parsing code interpreter outputs: '%s'", str(e)) + parsed_outputs = {} + + tool_call = { + "id": t.id, + "type": t.type, + "code_interpreter": { + "input": t.code_interpreter.input, + "output": parsed_outputs, + }, + } + elif isinstance(t, RunStepBingGroundingToolCall): + bing_grounding = {} + try: + bing_grounding = json.dumps(t.bing_grounding) + except Exception as e: + logging.warning("Error parsing Bing grounding: '%s'", str(e)) + parsed_outputs = {} + + tool_call = { + "id": t.id, + "type": t.type, + t.type: { + "bing_grounding": bing_grounding + }, + } + else: + try: + tool_details = json.dumps(self._get_tool_details(t)) + except Exception as e: + logging.warning("Error parsing step details: '%s'", str(e)) + tool_details = "" + + tool_call = { + "id": t.id, + "type": t.type, + t.type: tool_details, + } + tool_calls.append(tool_call) + return tool_calls + + def _add_tool_call_event( + self, + span, + step: RunStep, + event_name: str + ) -> None: + """ + Helper method to add a tool call event to a span. + """ + tool_calls = self._process_tool_calls(step) + attributes = self._create_event_attributes( + thread_id=step.thread_id, + agent_id=step.agent_id, + thread_run_id=step.run_id, + run_step_status=step.status, + created_at=step.created_at, + completed_at=step.completed_at, + cancelled_at=step.cancelled_at, + failed_at=step.failed_at, + run_step_last_error=step.last_error, + usage=step.usage, + ) + + if tool_calls: + attributes[GEN_AI_EVENT_CONTENT] = json.dumps({"tool_calls": tool_calls}, ensure_ascii=False) + + span.span_instance.add_event(name=event_name, attributes=attributes) + + def add_run_step_event(self, span, step: RunStep) -> None: + """ + Adds a run step event to the span. + """ + if step["type"] == "message_creation": + self._add_message_creation_run_step_event(span, step) + elif step["type"] == "tool_calls": + self._add_tool_call_event(span, step, "gen_ai.run_step.tool_calls") + + def _add_message_creation_run_step_event( + self, + span, + step: RunStep + ) -> None: + attributes = self._create_event_attributes( + thread_id=step.thread_id, + agent_id=step.agent_id, + thread_run_id=step.run_id, + message_id=step["step_details"]["message_creation"]["message_id"], + run_step_status=step.status, + created_at=step.created_at, + completed_at=step.completed_at, + cancelled_at=step.cancelled_at, + failed_at=step.failed_at, + run_step_last_error=step.last_error, + usage=step.usage + ) + span.span_instance.add_event(name="gen_ai.run_step.message_creation", attributes=attributes) + def _add_message_event( self, span, @@ -392,33 +595,7 @@ def _status_to_string(self, status: Any) -> str: return status.value if hasattr(status, "value") else status def _add_tool_assistant_message_event(self, span, step: RunStep) -> None: - tool_calls = [ - { - "id": t.id, - "type": t.type, - "function": ( - {"name": t.function.name, "arguments": json.loads(t.function.arguments)} - if isinstance(t, RunStepFunctionToolCall) - else None - ), - } - for t in cast(RunStepToolCallDetails, step.step_details).tool_calls - ] - - attributes = self._create_event_attributes( - thread_id=step.thread_id, - agent_id=step.agent_id, - thread_run_id=step.run_id, - message_status=step.status, - usage=step.usage, - ) - - if _trace_agents_content: - attributes[GEN_AI_EVENT_CONTENT] = json.dumps({"tool_calls": tool_calls}, ensure_ascii=False) - else: - tool_calls_non_recording = self._remove_function_call_names_and_arguments(tool_calls=tool_calls) - attributes[GEN_AI_EVENT_CONTENT] = json.dumps({"tool_calls": tool_calls_non_recording}, ensure_ascii=False) - span.span_instance.add_event(name="gen_ai.assistant.message", attributes=attributes) + self._add_tool_call_event(span, step, "gen_ai.assistant.message") def _add_tool_event_from_thread_run(self, span, run: ThreadRun) -> None: tool_calls = [] @@ -608,6 +785,9 @@ def start_create_thread_span( def start_list_messages_span(self, project_name: str, thread_id: Optional[str] = None) -> "Optional[AbstractSpan]": return start_span(OperationName.LIST_MESSAGES, project_name, thread_id=thread_id) + def start_list_run_steps_span(self, project_name: str, run_id: Optional[str] = None, thread_id: Optional[str] = None) -> "Optional[AbstractSpan]": + return start_span(OperationName.LIST_RUN_STEPS, project_name, run_id=run_id, thread_id=thread_id) + def trace_create_agent(self, function, *args, **kwargs): project_name = args[ # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] 0 @@ -1248,6 +1428,74 @@ def trace_list_messages(self, function, *args, **kwargs): return result + def trace_list_run_steps(self, function, *args, **kwargs): + project_name = args[ # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] + 0 + ]._config.project_name + run_id = kwargs.get("run_id") + thread_id = kwargs.get("thread_id") + + span = self.start_list_run_steps_span(project_name=project_name, run_id=run_id, thread_id=thread_id) + + if span is None: + return function(*args, **kwargs) + + with span: + try: + result = function(*args, **kwargs) + if hasattr(result, "data") and result.data is not None: + for step in result.data: + self.add_run_step_event(span, step) + + except Exception as exc: + # Set the span status to error + if isinstance(span.span_instance, Span): # pyright: ignore [reportPossiblyUnboundVariable] + span.span_instance.set_status( + StatusCode.ERROR, # pyright: ignore [reportPossiblyUnboundVariable] + description=str(exc), + ) + module = getattr(exc, "__module__", "") + module = module if module != "builtins" else "" + error_type = f"{module}.{type(exc).__name__}" if module else type(exc).__name__ + self._set_attributes(span, ("error.type", error_type)) + raise + + return result + + async def trace_list_run_steps_async(self, function, *args, **kwargs): + project_name = args[ # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] + 0 + ]._config.project_name + run_id = kwargs.get("run_id") + thread_id = kwargs.get("thread_id") + + span = self.start_list_run_steps_span(project_name=project_name, run_id=run_id, thread_id=thread_id) + + if span is None: + return function(*args, **kwargs) + + with span: + try: + result = await function(*args, **kwargs) + if hasattr(result, "data") and result.data is not None: + for step in result.data: + self.add_run_step_event(span, step) + + except Exception as exc: + # Set the span status to error + if isinstance(span.span_instance, Span): # pyright: ignore [reportPossiblyUnboundVariable] + span.span_instance.set_status( + StatusCode.ERROR, # pyright: ignore [reportPossiblyUnboundVariable] + description=str(exc), + ) + module = getattr(exc, "__module__", "") + module = module if module != "builtins" else "" + error_type = f"{module}.{type(exc).__name__}" if module else type(exc).__name__ + self._set_attributes(span, ("error.type", error_type)) + raise + + return result + async def trace_list_messages_async(self, function, *args, **kwargs): project_name = args[ # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] 0 @@ -1403,6 +1651,8 @@ def inner(*args, **kwargs): # pylint: disable=R0911 if class_function_name.startswith("AgentsOperations.list_messages"): kwargs.setdefault("merge_span", True) return self.trace_list_messages(function, *args, **kwargs) + if class_function_name.startswith("AgentsOperations.list_run_steps"): + return self.trace_list_run_steps(function, *args, **kwargs) if class_function_name.startswith("AgentRunStream.__exit__"): return self.handle_run_stream_exit(function, *args, **kwargs) # Handle the default case (if the function name does not match) @@ -1471,6 +1721,9 @@ async def inner(*args, **kwargs): # pylint: disable=R0911 if class_function_name.startswith("AgentsOperations.list_messages"): kwargs.setdefault("merge_span", True) return await self.trace_list_messages_async(function, *args, **kwargs) + if class_function_name.startswith("AgentsOperations.list_run_steps"): + kwargs.setdefault("merge_span", True) + return await self.trace_list_run_steps_async(function, *args, **kwargs) if class_function_name.startswith("AsyncAgentRunStream.__aexit__"): return self.handle_run_stream_exit(function, *args, **kwargs) # Handle the default case (if the function name does not match) @@ -1524,6 +1777,7 @@ def _agents_apis(self): ), ("azure.ai.projects.operations", "AgentsOperations", "create_stream", TraceType.AGENTS, "create_stream"), ("azure.ai.projects.operations", "AgentsOperations", "list_messages", TraceType.AGENTS, "list_messages"), + ("azure.ai.projects.operations", "AgentsOperations", "list_run_steps", TraceType.AGENTS, "list_run_steps"), ("azure.ai.projects.models", "AgentRunStream", "__exit__", TraceType.AGENTS, "__exit__"), ) async_apis = ( @@ -1585,6 +1839,13 @@ def _agents_apis(self): TraceType.AGENTS, "list_messages", ), + ( + "azure.ai.projects.aio.operations", + "AgentsOperations", + "list_run_steps", + TraceType.AGENTS, + "list_run_steps", + ), ("azure.ai.projects.models", "AsyncAgentRunStream", "__aexit__", TraceType.AGENTS, "__aexit__"), ) return sync_apis, async_apis @@ -1742,11 +2003,6 @@ def on_thread_message(self, message: "ThreadMessage") -> None: # type: ignore[f def on_thread_run(self, run: "ThreadRun") -> None: # type: ignore[func-returns-value] retval = None - if run.status == "requires_action" and isinstance(run.required_action, SubmitToolOutputsAction): - self.instrumentor._add_tool_event_from_thread_run( # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] - self.span, run - ) - if self.inner_handler: retval = self.inner_handler.on_thread_run(run) # type: ignore self.last_run = run @@ -1758,8 +2014,11 @@ def on_run_step(self, step: "RunStep") -> None: # type: ignore[func-returns-val if self.inner_handler: retval = self.inner_handler.on_run_step(step) # type: ignore - # todo - report errors for failure statuses here and in run ? - if step.type == "message_creation" and step.status == RunStepStatus.COMPLETED: + if step.type == "tool_calls" and isinstance(step.step_details, RunStepToolCallDetails) and step.status == RunStepStatus.COMPLETED: + self.instrumentor._add_tool_assistant_message_event( # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] + self.span, step + ) + elif step.type == "message_creation" and step.status == RunStepStatus.COMPLETED: self.instrumentor.add_thread_message_event(self.span, cast(ThreadMessage, self.last_message), step.usage) self.last_message = None @@ -1849,11 +2108,6 @@ async def on_thread_message(self, message: "ThreadMessage") -> None: # type: ig async def on_thread_run(self, run: "ThreadRun") -> None: # type: ignore[func-returns-value] retval = None - if run.status == "requires_action" and isinstance(run.required_action, SubmitToolOutputsAction): - self.instrumentor._add_tool_event_from_thread_run( # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] - self.span, run - ) - if self.inner_handler: retval = await self.inner_handler.on_thread_run(run) # type: ignore self.last_run = run @@ -1865,8 +2119,11 @@ async def on_run_step(self, step: "RunStep") -> None: # type: ignore[func-retur if self.inner_handler: retval = await self.inner_handler.on_run_step(step) # type: ignore - # todo - report errors for failure statuses here and in run ? - if step.type == "message_creation" and step.status == RunStepStatus.COMPLETED: + if step.type == "tool_calls" and isinstance(step.step_details, RunStepToolCallDetails) and step.status == RunStepStatus.COMPLETED: + self.instrumentor._add_tool_assistant_message_event( # pylint: disable=protected-access # pyright: ignore [reportFunctionMemberAccess] + self.span, step + ) + elif step.type == "message_creation" and step.status == RunStepStatus.COMPLETED: self.instrumentor.add_thread_message_event(self.span, cast(ThreadMessage, self.last_message), step.usage) self.last_message = None diff --git a/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_utils.py b/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_utils.py index bdc18e1381e8..48bdd89a20e8 100644 --- a/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_utils.py +++ b/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/agents/_utils.py @@ -42,7 +42,13 @@ GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens" GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" GEN_AI_EVENT_CONTENT = "gen_ai.event.content" +GEN_AI_RUNSTEP_START_TIMESTAMP = "gen_ai.run_step.start.timestamp" +GEN_AI_RUNSTEP_END_TIMESTAMP = "gen_ai.run_step.end.timestamp" +GEN_AI_RUNSTEP_CANCEL_TIMESTAMP = "gen_ai.run_step.cancel.timestamp" +GEN_AI_RUNSTEP_FAIL_TIMESTAMP = "gen_ai.run_step.fail.timestamp" +GEN_AI_RUN_STEP_STATUS = "gen_ai.run_step.status" ERROR_TYPE = "error.type" +ERROR_MESSAGE = "error.message" class OperationName(Enum): @@ -52,6 +58,7 @@ class OperationName(Enum): START_THREAD_RUN = "start_thread_run" EXECUTE_TOOL = "execute_tool" LIST_MESSAGES = "list_messages" + LIST_RUN_STEPS = "list_run_steps" SUBMIT_TOOL_OUTPUTS = "submit_tool_outputs" PROCESS_THREAD_RUN = "process_thread_run"