Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions packages/nvidia_nat_core/src/nat/utils/atif_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ def _safe_str(value: Any) -> str:
return str(value)


def _extract_tool_error(output: Any) -> dict[str, str] | None:
"""Extract error metadata from a tool output for ``step.extra["tool_errors"]``."""
# TODO: return a model instead of a plain dict once ATIF spec adds error support
status: str | None = getattr(output, "status", None) or (output.get("status") if isinstance(output, dict) else None)
if status != "error":
return None
content: str = (getattr(output, "content", None) or (output.get("content") if isinstance(output, dict) else None)
or _safe_str(output))
error_type: str = "Unknown"
error_message: str = content
if ":" in content:
candidate: str = content.split(":", 1)[0].strip()
if candidate.isidentifier():
error_type = candidate
error_message = content.split(":", 1)[1].strip()
return {
"error": content,
"error_type": error_type,
"error_message": error_message,
"status": "error",
}


def _extract_user_input(value: Any) -> str:
"""Extract the user-facing input text from a workflow start payload.

Expand Down Expand Up @@ -334,18 +357,31 @@ def _flush_pending() -> None:
tool_name = ist.name or "unknown_tool"
tool_input: dict[str, Any] = {}
tool_output = ""
raw_output: Any = None

if ist.data:
tool_input = _parse_tool_arguments(ist.data.input)
tool_output = _safe_str(ist.data.output)
raw_output = ist.data.output
tool_output = _safe_str(raw_output)
call_id = f"call_{ist.UUID}"
tc = ATIFToolCall(tool_call_id=call_id, function_name=tool_name, arguments=tool_input)
obs = ATIFObservationResult(source_call_id=call_id, content=tool_output)
tool_error: dict[str, str] | None = _extract_tool_error(raw_output)

if tool_error is not None:
tool_error["tool"] = tool_name
extra: dict[str, Any] | None = ({"tool_errors": [tool_error]} if tool_error else None)

if pending is not None:
pending.tool_calls.append(tc)
pending.observations.append(obs)
if tool_error:
pending.extra.setdefault("tool_errors", []).append(tool_error)
pending.tool_ancestry.append(_atif_ancestry_from_ist(ist))
else:
extra = _atif_step_extra_model_from_ist(ist).model_dump(exclude_none=True)
if tool_error:
extra.setdefault("tool_errors", []).append(tool_error)
atif_steps.append(
ATIFStep(
step_id=step_id,
Expand Down Expand Up @@ -508,19 +544,28 @@ def push(self, ist: IntermediateStep) -> ATIFStep | None:
tool_name = ist.name or "unknown_tool"
tool_input: dict[str, Any] = {}
tool_output = ""
raw_output: Any = None
if ist.data:
tool_input = _parse_tool_arguments(ist.data.input)
tool_output = _safe_str(ist.data.output)
raw_output = ist.data.output
tool_output = _safe_str(raw_output)
call_id = f"call_{ist.UUID}"
tc = ATIFToolCall(tool_call_id=call_id, function_name=tool_name, arguments=tool_input)
obs = ATIFObservationResult(source_call_id=call_id, content=tool_output)
tool_error: dict[str, str] | None = _extract_tool_error(raw_output)
if tool_error is not None:
tool_error["tool"] = tool_name
if self._pending is not None:
self._pending.tool_calls.append(tc)
self._pending.observations.append(obs)
if tool_error:
self._pending.extra.setdefault("tool_errors", []).append(tool_error)
self._pending.tool_ancestry.append(_atif_ancestry_from_ist(ist))
return None

extra = _atif_step_extra_model_from_ist(ist).model_dump(exclude_none=True)
if tool_error:
extra.setdefault("tool_errors", []).append(tool_error)
orphan_step = ATIFStep(
step_id=self._step_id,
source="agent",
Expand Down
127 changes: 127 additions & 0 deletions packages/nvidia_nat_core/tests/nat/utils/test_atif_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import datetime

import pytest
from langchain_core.messages import ToolMessage

from nat.builder.framework_enum import LLMFrameworkEnum
from nat.data_models.atif import ATIFTrajectory
Expand Down Expand Up @@ -705,3 +706,129 @@ def test_stream_matches_batch(
assert s_step.message == b_step.message
if b_step.tool_calls:
assert len(s_step.tool_calls) == len(b_step.tool_calls)


# ---------------------------------------------------------------------------
# Tool error → ATIF conversion tests
# ---------------------------------------------------------------------------


@pytest.fixture(name="error_trajectory")
def fixture_error_trajectory() -> list[IntermediateStep]:
"""Trajectory with one successful and one failed tool call."""
error_output: ToolMessage = ToolMessage(
content="ValueError: bad input",
name="failing_tool",
tool_call_id="failing_tool",
status="error",
)
return [
_make_step(IntermediateStepType.WORKFLOW_START, input_data="Do something", timestamp_offset=0.0),
_make_step(IntermediateStepType.LLM_END,
name="gpt-4",
output_data="calling tools",
timestamp_offset=1.0,
usage=_make_usage(100, 20)),
_make_step(IntermediateStepType.TOOL_END,
name="good_tool",
input_data={"q": "hello"},
output_data="success",
timestamp_offset=2.0,
step_uuid="tool-good"),
_make_step(IntermediateStepType.TOOL_END,
name="failing_tool",
input_data={"q": "fail"},
output_data=error_output,
timestamp_offset=3.0,
step_uuid="tool-fail"),
_make_step(IntermediateStepType.WORKFLOW_END, output_data="partial", timestamp_offset=4.0),
]


class TestToolErrorATIFConversion:
"""Verify tool errors in IntermediateStepPayload are converted to ATIF step.extra['tool_errors']."""

def test_error_dict_has_all_required_keys(
self,
batch_converter: IntermediateStepToATIFConverter,
error_trajectory: list[IntermediateStep],
):
"""Each tool_errors entry contains exactly the expected keys."""
result: ATIFTrajectory = batch_converter.convert(error_trajectory)
agent_step = result.steps[1]
errors: list = agent_step.extra["tool_errors"]
assert len(errors) == 1
assert set(errors[0].keys()) == {"tool", "error", "error_type", "error_message", "status"}

def test_error_dict_values_are_parsed_from_content(
self,
batch_converter: IntermediateStepToATIFConverter,
error_trajectory: list[IntermediateStep],
):
"""The error dict splits the exception type from the message and preserves the full error string."""
result: ATIFTrajectory = batch_converter.convert(error_trajectory)
entry: dict = result.steps[1].extra["tool_errors"][0]
assert entry["tool"] == "failing_tool"
assert entry["status"] == "error"
assert entry["error"] == "ValueError: bad input"
assert entry["error_type"] == "ValueError"
assert entry["error_message"] == "bad input"

def test_error_dict_falls_back_to_unknown_type(self):
"""Error content without a parseable exception type defaults to 'Unknown'."""
error_output: ToolMessage = ToolMessage(
content="something went wrong",
name="broken_tool",
tool_call_id="broken_tool",
status="error",
)
trajectory: list[IntermediateStep] = [
_make_step(IntermediateStepType.WORKFLOW_START, input_data="q", timestamp_offset=0.0),
_make_step(IntermediateStepType.LLM_END,
name="gpt-4",
output_data="calling",
timestamp_offset=1.0,
usage=_make_usage(10, 5)),
_make_step(IntermediateStepType.TOOL_END,
name="broken_tool",
input_data={},
output_data=error_output,
timestamp_offset=2.0,
step_uuid="tool-broken"),
_make_step(IntermediateStepType.WORKFLOW_END, output_data="done", timestamp_offset=3.0),
]
result: ATIFTrajectory = IntermediateStepToATIFConverter().convert(trajectory)
entry: dict = result.steps[1].extra["tool_errors"][0]
assert entry["error_type"] == "Unknown"
assert entry["error_message"] == "something went wrong"

def test_successful_tool_has_no_tool_errors(
self,
batch_converter: IntermediateStepToATIFConverter,
simple_trajectory: list[IntermediateStep],
):
"""Successful tool calls do not produce tool_errors entries in the ATIF output."""
result: ATIFTrajectory = batch_converter.convert(simple_trajectory)
for step in result.steps:
assert not (step.extra or {}).get("tool_errors")

def test_stream_and_batch_produce_same_errors(
self,
batch_converter: IntermediateStepToATIFConverter,
error_trajectory: list[IntermediateStep],
):
"""Both converter code paths produce identical tool_errors for the same input trajectory."""
batch_result: ATIFTrajectory = batch_converter.convert(error_trajectory)
stream_conv: ATIFStreamConverter = ATIFStreamConverter()
for ist in error_trajectory:
stream_conv.push(ist)
stream_conv.finalize()
stream_result: ATIFTrajectory = stream_conv.get_trajectory()

def _collect_errors(trajectory: ATIFTrajectory) -> list[dict]:
errors: list[dict] = []
for step in trajectory.steps:
errors.extend((step.extra or {}).get("tool_errors", []))
return errors

assert _collect_errors(batch_result) == _collect_errors(stream_result)
3 changes: 3 additions & 0 deletions packages/nvidia_nat_eval/src/nat/plugins/eval/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@
from .dataset_loader.register import register_jsonl_dataset_loader
from .dataset_loader.register import register_parquet_dataset_loader
from .dataset_loader.register import register_xls_dataset_loader

# Evaluators
from .tool_failure_evaluator.register import register_tool_failure_evaluator
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .evaluator import ToolFailureEvaluator
from .models import ToolFailureReasoning
from .models import ToolSummary
from .register import ToolFailureEvaluatorConfig

__all__ = [
"ToolFailureEvaluator",
"ToolFailureEvaluatorConfig",
"ToolFailureReasoning",
"ToolSummary",
]
Loading
Loading