diff --git a/Makefile b/Makefile index f57ffac..8a3721c 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ setup: set_python_env # ============================ -setup_test: +setup_test: setup @poetry install --with=test --all-extras test: setup_test @@ -57,5 +57,5 @@ test_langgraph_agent: setup_test test_langgraph_software: setup_test poetry run pytest tests/test_langgraph_graph_with_io_mapper.py -unittest: +unittest: setup_test poetry run pytest tests/unittests -vvrx diff --git a/agntcy_iomapper/__init__.py b/agntcy_iomapper/__init__.py index 79a331d..b7b3c7e 100644 --- a/agntcy_iomapper/__init__.py +++ b/agntcy_iomapper/__init__.py @@ -1,16 +1,20 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 # ruff: noqa: F401 -from agntcy_iomapper.agent import FieldMetadata, IOMappingAgent, IOMappingAgentMetadata +from agntcy_iomapper.agent import IOMappingAgent +from agntcy_iomapper.base import FieldMetadata, IOMappingAgentMetadata from agntcy_iomapper.imperative import ( ImperativeIOMapper, ImperativeIOMapperInput, ImperativeIOMapperOutput, ) +from agntcy_iomapper.llamaindex import IOMappingInputEvent, IOMappingOutputEvent __all__ = [ "IOMappingAgent", "IOMappingAgentMetadata", + "IOMappingOutputEvent", + "IOMappingInputEvent", "ImperativeIOMapper", "ImperativeIOMapperInput", "ImperativeIOMapperOutput", diff --git a/agntcy_iomapper/agent/__init__.py b/agntcy_iomapper/agent/__init__.py index 2616864..447f90f 100644 --- a/agntcy_iomapper/agent/__init__.py +++ b/agntcy_iomapper/agent/__init__.py @@ -2,21 +2,15 @@ # SPDX-License-Identifier: Apache-2.0 from agntcy_iomapper.agent.agent_io_mapper import IOMappingAgent -from agntcy_iomapper.agent.base import AgentIOMapper -from agntcy_iomapper.agent.models import ( - AgentIOMapperConfig, - AgentIOMapperInput, - AgentIOMapperOutput, +from agntcy_iomapper.base.models import ( + BaseIOMapperConfig, FieldMetadata, IOMappingAgentMetadata, ) __all__ = [ - "AgentIOMapper", - "AgentIOMapperOutput", - "AgentIOMapperConfig", + "BaseIOMapperConfig", "IOMappingAgent", "IOMappingAgentMetadata", - "AgentIOMapperInput", "FieldMetadata", ] diff --git a/agntcy_iomapper/agent/agent_io_mapper.py b/agntcy_iomapper/agent/agent_io_mapper.py index 3f62335..11deceb 100644 --- a/agntcy_iomapper/agent/agent_io_mapper.py +++ b/agntcy_iomapper/agent/agent_io_mapper.py @@ -2,40 +2,58 @@ # SPDX-License-Identifier: Apache-2.0 import logging -from typing import Any, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Union from langchain_core.language_models import BaseChatModel from langchain_core.runnables import Runnable -from openapi_pydantic import Schema +from llama_index.core.base.llms.base import BaseLLM +from llama_index.core.tools import BaseTool +from llama_index.core.workflow import ( + Workflow, +) from pydantic import BaseModel, Field, model_validator from typing_extensions import Self -from agntcy_iomapper.agent.models import ( +from agntcy_iomapper.base.models import ( AgentIOMapperInput, + ArgumentsDescription, FieldMetadata, IOMappingAgentMetadata, ) -from agntcy_iomapper.base import ArgumentsDescription -from agntcy_iomapper.base.utils import create_type_from_schema, extract_nested_fields +from agntcy_iomapper.base.utils import extract_nested_fields, get_io_types +from agntcy_iomapper.imperative import ( + ImperativeIOMapper, + ImperativeIOMapperInput, +) from agntcy_iomapper.langgraph import LangGraphIOMapper, LangGraphIOMapperConfig +from agntcy_iomapper.llamaindex.llamaindex import ( + LLamaIndexIOMapper, +) logger = logging.getLogger(__name__) class IOMappingAgent(BaseModel): - metadata: IOMappingAgentMetadata = Field( + """This class exposes all + The IOMappingAgent class is designed for developers building sophisticated multi-agent software that require seamless integration and interaction between + the different agents and workflow steps. + By utilizing the methods provided, developers can construct complex workflows and softwares. + The IOMappingAgent class is intended to serve as a foundational component in applications requiring advanced IO mapping agents in multi-agent systems. + """ + + metadata: Optional[IOMappingAgentMetadata] = Field( ..., description="Details about the fields to be used in the translation and about the output", ) - llm: Optional[Union[BaseChatModel, str]] = ( - Field( - None, - description="Model to use for translation as LangChain description or model class.", - ), + llm: Optional[Union[BaseChatModel, str]] = Field( + None, + description="Model to use for translation as LangChain description or model class.", ) @model_validator(mode="after") def _validate_obj(self) -> Self: + if not self.metadata: + return self if not self.metadata.input_fields or len(self.metadata.input_fields) == 0: raise ValueError("input_fields not found in the metadata") @@ -70,38 +88,12 @@ def _validate_obj(self) -> Self: return self - def _get_io_types(self, data: Any) -> Tuple[Schema, Schema]: - data_schema = None - if isinstance(data, BaseModel): - data_schema = data.model_json_schema() - # If input schema is provided it overwrites the data schema - input_schema = ( - self.metadata.input_schema if self.metadata.input_schema else data_schema - ) - # If output schema is provided it overwrites the data schema - output_schema = ( - self.metadata.output_schema if self.metadata.output_schema else data_schema - ) - - if not input_schema or not output_schema: - raise ValueError( - "input_schema, and or output_schema are missing from the metadata, for a better accuracy you are required to provide them in this scenario, or we could not infer the type from the state" - ) - - input_type = Schema.model_validate( - create_type_from_schema(input_schema, self.metadata.input_fields) - ) - - output_type = Schema.model_validate( - create_type_from_schema(output_schema, self.metadata.output_fields) - ) - - return (input_type, output_type) - def langgraph_node(self, data: Any, config: Optional[dict] = None) -> Runnable: + """This method is used to add a language graph node to a langgraph multi-agent software. + It leverages language models for IO mapping, ensuring efficient communication between agents. + """ - # If there is a template for the output the output_schema is going to be ignored in the translation - input_type, output_type = self._get_io_types(data) + input_type, output_type = get_io_types(data, self.metadata) data_to_be_mapped = extract_nested_fields( data, fields=self.metadata.input_fields @@ -132,3 +124,64 @@ def langgraph_node(self, data: Any, config: Optional[dict] = None) -> Runnable: iomapper_config = LangGraphIOMapperConfig(llm=self.llm) return LangGraphIOMapper(iomapper_config, input).as_runnable() + + def langgraph_imperative( + self, data: Any, config: Optional[dict] = None + ) -> Runnable: + """ + Description: Similar to langgraph_node, this method adds a language graph node to a multi-agent software. + However, it does not utilize a language model for IO mapping, offering an imperative approach to agent integration. + """ + + input_type, output_type = self._get_io_types(data, self.metadata) + + data_to_be_mapped = extract_nested_fields( + data, fields=self.metadata.input_fields + ) + + input = ImperativeIOMapperInput( + input=ArgumentsDescription( + json_schema=input_type, + ), + output=ArgumentsDescription(json_schema=output_type), + data=data_to_be_mapped, + ) + + if not self.metadata.field_mapping: + raise ValueError( + "In order to use imperative mapping field_mapping must be provided in the metadata" + ) + + imperative_io_mapper = ImperativeIOMapper( + input=input, field_mapping=self.metadata.field_mapping + ) + return imperative_io_mapper.as_runnable() + + @staticmethod + def as_worfklow_step(workflow: Workflow) -> Callable: + """This static method allows for the addition of a step to a LlamaIndex workflow. + It integrates seamlessly into workflows, enabling structured progression and task execution. + """ + io_mapper_step = LLamaIndexIOMapper.llamaindex_mapper(workflow) + return io_mapper_step + + @staticmethod + def as_workflow_agent( + mapping_metadata: IOMappingAgentMetadata, + llm: BaseLLM, + name: str, + description: str, + can_handoff_to: Optional[List[str]] = None, + tools: Optional[List[Union[BaseTool, Callable]]] = [], + ): + """This static method returns an instance of an agent that can be integrated into a Multi AgentWorkflow. + It provides robust IO mapping capabilities essential for complex multi agent workflow interactions. + """ + return LLamaIndexIOMapper( + mapping_metadata=mapping_metadata, + llm=llm, + tools=tools, + name=name, + description=description, + can_handoff_to=can_handoff_to, + ) diff --git a/agntcy_iomapper/agent/base.py b/agntcy_iomapper/agent/base.py deleted file mode 100644 index 0e2c8a2..0000000 --- a/agntcy_iomapper/agent/base.py +++ /dev/null @@ -1,206 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 - - -import json -import logging -import re -from abc import abstractmethod -from typing import ClassVar, Optional - -import jsonschema -from jinja2 import Environment -from jinja2.sandbox import SandboxedEnvironment - -from agntcy_iomapper.agent.models import ( - AgentIOMapperConfig, - AgentIOMapperInput, - AgentIOMapperOutput, -) -from agntcy_iomapper.base import BaseIOMapper - -logger = logging.getLogger(__name__) - - -class AgentIOMapper(BaseIOMapper): - _json_search_pattern: ClassVar[re.Pattern] = re.compile( - r"```json\n(.*?)\n```", re.DOTALL - ) - - def __init__( - self, - config: Optional[AgentIOMapperConfig] = None, - jinja_env: Optional[Environment] = None, - jinja_env_async: Optional[Environment] = None, - ): - if config is None: - config = AgentIOMapperConfig() - super().__init__(config) - - if jinja_env is not None and jinja_env.is_async: - raise ValueError("Async Jinja env passed to jinja_env argument") - elif jinja_env_async is not None and not jinja_env_async.is_async: - raise ValueError("Sync Jinja env passed to jinja_env_async argument") - - self.jinja_env = jinja_env - self.prompt_template = None - self.user_template = None - - self.jinja_env_async = jinja_env_async - self.prompt_template_async = None - self.user_template_async = None - - # Delay init until sync or async functions called. - def _check_jinja_env(self, enable_async: bool): - if enable_async: - # Delay load of env until needed - if self.jinja_env_async is None: - # Default is sandboxed, no loader - self.jinja_env_async = SandboxedEnvironment( - loader=None, - enable_async=True, - autoescape=False, - ) - if self.prompt_template_async is None: - self.prompt_template_async = self.jinja_env_async.from_string( - self.config.system_prompt_template - ) - if self.user_template_async is None: - self.user_template_async = self.jinja_env_async.from_string( - self.config.message_template - ) - else: - if self.jinja_env is None: - self.jinja_env = SandboxedEnvironment( - loader=None, - enable_async=False, - autoescape=False, - ) - if self.prompt_template is None: - self.prompt_template = self.jinja_env.from_string( - self.config.system_prompt_template - ) - if self.user_template is None: - self.user_template = self.jinja_env.from_string( - self.config.message_template - ) - - def _get_render_env(self, input: AgentIOMapperInput) -> dict[str, str]: - return { - "input": input.input, - "output": input.output, - "data": input.data, - } - - def _get_output( - self, input: AgentIOMapperInput, outputs: str - ) -> AgentIOMapperOutput: - if input.output.json_schema is None: - # If there is no schema, quote the chars for JSON. - return AgentIOMapperOutput.model_validate_json( - f'{{"data": {json.dumps(outputs)} }}' - ) - - logger.debug(f"{outputs}") - - # Check if data is returned in JSON markdown text - matches = self._json_search_pattern.findall(outputs) - if matches: - outputs = matches[-1] - - return AgentIOMapperOutput.model_validate_json(f'{{"data": {outputs} }}') - - def _validate_input(self, input: AgentIOMapperInput) -> None: - if self.config.validate_json_input and input.input.json_schema is not None: - jsonschema.validate( - instance=input.data, - schema=input.input.json_schema.model_dump( - exclude_none=True, mode="json" - ), - ) - - def _validate_output( - self, input: AgentIOMapperInput, output: AgentIOMapperOutput - ) -> None: - if self.config.validate_json_output and input.output.json_schema is not None: - output_schema = input.output.json_schema.model_dump( - exclude_none=True, mode="json" - ) - logging.debug(f"Checking output schema: {output_schema}") - jsonschema.validate( - instance=output.data, - schema=output_schema, - ) - - def invoke(self, input: AgentIOMapperInput, **kwargs) -> AgentIOMapperOutput: - self._validate_input(input) - self._check_jinja_env(False) - render_env = self._get_render_env(input) - system_prompt = self.prompt_template.render(render_env) - - if input.message_template is not None: - logging.info(f"User template supplied on input: {input.message_template}") - user_template = self.jinja_env.from_string(input.message_template) - else: - user_template = self.user_template - user_prompt = user_template.render(render_env) - - outputs = self._invoke( - input, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - **kwargs, - ) - logging.debug(f"The LLM returned: {outputs}") - output = self._get_output(input, outputs) - - self._validate_output(input, output) - return output - - @abstractmethod - def _invoke( - self, input: AgentIOMapperInput, messages: list[dict[str, str]], **kwargs - ) -> str: - """Invoke internal model to process messages. - Args: - messages: the messages to send to the LLM - """ - - async def ainvoke(self, input: AgentIOMapperInput, **kwargs) -> AgentIOMapperOutput: - self._validate_input(input) - self._check_jinja_env(True) - render_env = self._get_render_env(input) - system_prompt = await self.prompt_template_async.render_async(render_env) - - if input.message_template is not None: - logging.info(f"User template supplied on input: {input.message_template}") - user_template_async = self.jinja_env_async.from_string( - input.message_template - ) - else: - user_template_async = self.user_template_async - user_prompt = await user_template_async.render_async(render_env) - - outputs = await self._ainvoke( - input, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - **kwargs, - ) - logging.debug(f"The LLM returned: {outputs}") - output = self._get_output(input, outputs) - self._validate_output(input, output) - return output - - @abstractmethod - async def _ainvoke( - self, input: AgentIOMapperInput, messages: list[dict[str, str]], **kwargs - ) -> str: - """Async invoke internal model to process messages. - Args: - messages: the messages to send to the LLM - """ diff --git a/agntcy_iomapper/agent/models.py b/agntcy_iomapper/agent/models.py deleted file mode 100644 index a39ba80..0000000 --- a/agntcy_iomapper/agent/models.py +++ /dev/null @@ -1,70 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -import logging -from typing import Any, List, Optional, Union - -from pydantic import BaseModel, Field - -from agntcy_iomapper.base import ( - BaseIOMapperConfig, - BaseIOMapperInput, - BaseIOMapperOutput, -) - -logger = logging.getLogger(__name__) - - -class AgentIOMapperInput(BaseIOMapperInput): - message_template: Union[str, None] = Field( - max_length=4096, - default=None, - description="Message (user) to send to LLM to effect translation.", - ) - - -AgentIOMapperOutput = BaseIOMapperOutput - - -class AgentIOMapperConfig(BaseIOMapperConfig): - system_prompt_template: str = Field( - max_length=4096, - default="You are a translation machine. You translate both natural language and object formats for computers. Response_format to { 'type': 'json_object' }", - description="System prompt Jinja2 template used with LLM service for translation.", - ) - message_template: str = Field( - max_length=4096, - default="The data is described {% if input.json_schema %}by the following JSON schema: {{ input.json_schema.model_dump(exclude_none=True) }}{% else %}as {{ input.description }}{% endif %}, and {%if output.json_schema %} the result must adhere strictly to the following JSON schema: {{ output.json_schema.model_dump(exclude_none=True) }}{% else %}as {{ output.description }}{% endif %}. The data to translate is: {{ data }}. It is absolutely crucial that each field and its type specified in the schema are followed precisely, without introducing any additional fields or altering types. Non-compliance will result in rejection of the output.", - description="Default user message template. This can be overridden by the message request.", - ) - - -class FieldMetadata(BaseModel): - json_path: str = Field(..., description="A json path to the field in the object") - description: str = Field( - ..., description="A description of what the field represents" - ) - examples: Optional[List[str]] = Field( - None, - description="A list of examples that represents how the field in json_path is normaly populated", - ) - - -class IOMappingAgentMetadata(BaseModel): - input_fields: List[Union[str, FieldMetadata]] = Field( - ..., - description="an array of json paths representing fields to be used by io mapper in the mapping", - ) - output_fields: List[Union[str, FieldMetadata]] = Field( - ..., - description="an array of json paths representing firlds to be used by io mapper in the result", - ) - input_schema: Optional[dict[str, Any]] = Field( - default=None, description="defines the schema for the input data" - ) - output_schema: Optional[dict[str, Any]] = Field( - default=None, description="defines the schema for result of the mapping" - ) - output_description_prompt: Optional[str] = Field( - default=None, - description="A prompt structured using a Jinja template that will be used by the llm for a better mapping", - ) diff --git a/agntcy_iomapper/base/__init__.py b/agntcy_iomapper/base/__init__.py index 14be08f..a9f2d06 100644 --- a/agntcy_iomapper/base/__init__.py +++ b/agntcy_iomapper/base/__init__.py @@ -2,11 +2,17 @@ # SPDX-License-Identifier: Apache-2.0 from agntcy_iomapper.base.base import ( - ArgumentsDescription, BaseIOMapper, +) +from agntcy_iomapper.base.models import ( + AgentIOMapperInput, + AgentIOMapperOutput, + ArgumentsDescription, BaseIOMapperConfig, BaseIOMapperInput, BaseIOMapperOutput, + FieldMetadata, + IOMappingAgentMetadata, ) __all__ = [ @@ -15,4 +21,8 @@ "BaseIOMapperOutput", "BaseIOMapperConfig", "BaseIOMapper", + "FieldMetadata", + "AgentIOMapperInput", + "AgentIOMapperOutput", + "IOMappingAgentMetadata", ] diff --git a/agntcy_iomapper/base/base.py b/agntcy_iomapper/base/base.py index d62fa42..b9d0192 100644 --- a/agntcy_iomapper/base/base.py +++ b/agntcy_iomapper/base/base.py @@ -1,110 +1,210 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 +import json +import logging +import re from abc import ABC, abstractmethod -from typing import Any, Optional +from typing import ClassVar, Optional -from openapi_pydantic import Schema -from pydantic import BaseModel, Field, model_validator -from typing_extensions import Self +import jsonschema +from jinja2 import Environment +from jinja2.sandbox import SandboxedEnvironment +from agntcy_iomapper.base.models import ( + AgentIOMapperInput, + AgentIOMapperOutput, + BaseIOMapperConfig, +) -class ArgumentsDescription(BaseModel): - """ - ArgumentsDescription a pydantic model that defines - the details necessary to perfom io mapping between two agents - """ +logger = logging.getLogger(__name__) - json_schema: Optional[Schema] = Field( - default=None, description="Data format JSON schema" - ) - description: Optional[str] = Field( - default=None, description="Data (semantic) natural language description" - ) - agent_manifest: Optional[dict[str, Any]] = Field( - default=None, - description="Agent Manifest definition as per https://agntcy.github.io/acp-spec/openapi.html#model/agentmanifest", - ) - - @model_validator(mode="after") - def _validate_obj(self) -> Self: - if ( - self.json_schema is None - and self.description is None - and self.agent_manifest - ): - raise ValueError( - 'Either the "schema" field and/or the "description" or agent_manifest field must be specified.' - ) - return self +class BaseIOMapper(ABC): + """Abstract base class for interfacing with io mapper. + All io mappers wrappers inherited from BaseIOMapper. + """ -class BaseIOMapperInput(BaseModel): - input: ArgumentsDescription = Field( - description="Input data descriptions", + _json_search_pattern: ClassVar[re.Pattern] = re.compile( + r"```json\n(.*?)\n```", re.DOTALL ) - output: ArgumentsDescription = Field( - description="Output data descriptions", - ) - data: Any = Field(description="Data to translate") - - @model_validator(mode="after") - def _validate_obj(self) -> Self: - if self.input.agent_manifest is not None: - # given an input agents manifest map its ouput definition - # because the data to be mapped is the result of calling the input agent - self.input.json_schema = Schema.model_validate( - self.input.agent_manifest["specs"]["output"] - ) - if self.output.agent_manifest: - # given an output agents manifest map its input definition - # because the data to be mapped would be mapped to it's input - self.output.json_schema = Schema.model_validate( - self.output.agent_manifest["specs"]["input"] + def __init__( + self, + config: Optional[BaseIOMapperConfig] = None, + jinja_env: Optional[Environment] = None, + jinja_env_async: Optional[Environment] = None, + ): + if config is None: + config = BaseIOMapperConfig() + + if jinja_env is not None and jinja_env.is_async: + raise ValueError("Async Jinja env passed to jinja_env argument") + elif jinja_env_async is not None and not jinja_env_async.is_async: + raise ValueError("Sync Jinja env passed to jinja_env_async argument") + + self.jinja_env = jinja_env + self.prompt_template = None + self.user_template = None + + self.jinja_env_async = jinja_env_async + self.prompt_template_async = None + self.user_template_async = None + self.config = config + + # Delay init until sync or async functions called. + def _check_jinja_env(self, enable_async: bool): + if enable_async: + # Delay load of env until needed + if self.jinja_env_async is None: + # Default is sandboxed, no loader + self.jinja_env_async = SandboxedEnvironment( + loader=None, + enable_async=True, + autoescape=False, + ) + if self.prompt_template_async is None: + self.prompt_template_async = self.jinja_env_async.from_string( + self.config.system_prompt_template + ) + if self.user_template_async is None: + self.user_template_async = self.jinja_env_async.from_string( + self.config.message_template + ) + else: + if self.jinja_env is None: + self.jinja_env = SandboxedEnvironment( + loader=None, + enable_async=False, + autoescape=False, + ) + if self.prompt_template is None: + self.prompt_template = self.jinja_env.from_string( + self.config.system_prompt_template + ) + if self.user_template is None: + self.user_template = self.jinja_env.from_string( + self.config.message_template + ) + + def _get_render_env(self, input: AgentIOMapperInput) -> dict[str, str]: + return { + "input": input.input, + "output": input.output, + "data": input.data, + } + + def _get_output( + self, input: AgentIOMapperInput, outputs: str + ) -> AgentIOMapperOutput: + + if input.output.json_schema is None: + # If there is no schema, quote the chars for JSON. + return AgentIOMapperOutput.model_validate_json( + f'{{"data": {json.dumps(outputs)} }}' ) - return self + logger.debug(f"{outputs}") + # Check if data is returned in JSON markdown text + matches = self._json_search_pattern.findall(outputs) + if matches: + outputs = matches[-1] -class BaseIOMapperOutput(BaseModel): - data: Any = Field(default=None, description="Data after translation") - error: str | None = Field( - max_length=4096, default=None, description="Description of error on failure." - ) - - -class BaseIOMapperConfig(BaseModel): - validate_json_input: bool = Field( - default=False, description="Validate input against JSON schema." - ) - validate_json_output: bool = Field( - default=False, description="Validate output against JSON schema." - ) + return AgentIOMapperOutput.model_validate_json(f'{{"data": {outputs} }}') + def _validate_input(self, input: AgentIOMapperInput) -> None: + if self.config.validate_json_input and input.input.json_schema is not None: + jsonschema.validate( + instance=input.data, + schema=input.input.json_schema.model_dump( + exclude_none=True, mode="json" + ), + ) -class BaseIOMapper(ABC): - """Abstract base class for interfacing with io mapper. - All io mappers wrappers inherited from BaseIOMapper. - """ + def _validate_output( + self, input: AgentIOMapperInput, output: AgentIOMapperOutput + ) -> None: + if self.config.validate_json_output and input.output.json_schema is not None: + output_schema = input.output.json_schema.model_dump( + exclude_none=True, mode="json" + ) + logging.debug(f"Checking output schema: {output_schema}") + jsonschema.validate( + instance=output.data, + schema=output_schema, + ) - def __init__( - self, - config: Optional[BaseIOMapperConfig] = None, - ): - self.config = config if config is not None else BaseIOMapperConfig() + def _invoke(self, input: AgentIOMapperInput, **kwargs) -> AgentIOMapperOutput: + self._validate_input(input) + self._check_jinja_env(False) + render_env = self._get_render_env(input) + system_prompt = self.prompt_template.render(render_env) + + if input.message_template is not None: + logging.info(f"User template supplied on input: {input.message_template}") + user_template = self.jinja_env.from_string(input.message_template) + else: + user_template = self.user_template + user_prompt = user_template.render(render_env) + + outputs = self.invoke( + input, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + **kwargs, + ) + logging.debug(f"The LLM returned: {outputs}") + output = self._get_output(input, outputs) + + self._validate_output(input, output) + return output + + async def _ainvoke( + self, input: AgentIOMapperInput, **kwargs + ) -> AgentIOMapperOutput: + self._validate_input(input) + self._check_jinja_env(True) + render_env = self._get_render_env(input) + system_prompt = await self.prompt_template_async.render_async(render_env) + + if input.message_template is not None: + logging.info(f"User template supplied on input: {input.message_template}") + user_template_async = self.jinja_env_async.from_string( + input.message_template + ) + else: + user_template_async = self.user_template_async + user_prompt = await user_template_async.render_async(render_env) + + outputs = await self.ainvoke( + input, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + **kwargs, + ) + logging.debug(f"The LLM returned: {outputs}") + output = self._get_output(input, outputs) + self._validate_output(input, output) + return output @abstractmethod - def invoke(self, input: BaseIOMapperInput) -> BaseIOMapperOutput: - """Pass input data - to be mapped and returned represented in the output schema + def invoke( + self, input: AgentIOMapperInput, messages: list[dict[str, str]], **kwargs + ) -> str: + """Invoke internal model to process messages. Args: - input: the data to be mapped + messages: the messages to send to the LLM """ @abstractmethod - async def ainvoke(self, input: BaseIOMapperInput) -> BaseIOMapperOutput: - """Pass input data - to be mapped and returned represented in the output schema + async def ainvoke( + self, input: AgentIOMapperInput, messages: list[dict[str, str]], **kwargs + ) -> str: + """Async invoke internal model to process messages. Args: - input: the data to be mapped + messages: the messages to send to the LLM """ diff --git a/agntcy_iomapper/base/models.py b/agntcy_iomapper/base/models.py new file mode 100644 index 0000000..637c16f --- /dev/null +++ b/agntcy_iomapper/base/models.py @@ -0,0 +1,141 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +import logging +from typing import Any, Callable, List, Optional, Union + +from openapi_pydantic import Schema +from pydantic import BaseModel, Field, model_validator +from typing_extensions import Self + +logger = logging.getLogger(__name__) + + +class ArgumentsDescription(BaseModel): + """ + ArgumentsDescription a pydantic model that defines + the details necessary to perfom io mapping between two agents + """ + + json_schema: Optional[Schema] = Field( + default=None, description="Data format JSON schema" + ) + description: Optional[str] = Field( + default=None, description="Data (semantic) natural language description" + ) + agent_manifest: Optional[dict[str, Any]] = Field( + default=None, + description="Agent Manifest definition as per https://agntcy.github.io/acp-spec/openapi.html#model/agentmanifest", + ) + + @model_validator(mode="after") + def _validate_obj(self) -> Self: + if ( + self.json_schema is None + and self.description is None + and self.agent_manifest + ): + raise ValueError( + 'Either the "schema" field and/or the "description" or agent_manifest field must be specified.' + ) + return self + + +class BaseIOMapperInput(BaseModel): + input: ArgumentsDescription = Field( + description="Input data descriptions", + ) + output: ArgumentsDescription = Field( + description="Output data descriptions", + ) + data: Any = Field(description="Data to translate") + + @model_validator(mode="after") + def _validate_obj(self) -> Self: + if self.input.agent_manifest is not None: + # given an input agents manifest map its ouput definition + # because the data to be mapped is the result of calling the input agent + self.input.json_schema = Schema.model_validate( + self.input.agent_manifest["specs"]["output"] + ) + + if self.output.agent_manifest: + # given an output agents manifest map its input definition + # because the data to be mapped would be mapped to it's input + self.output.json_schema = Schema.model_validate( + self.output.agent_manifest["specs"]["input"] + ) + + return self + + +class BaseIOMapperOutput(BaseModel): + data: Optional[Any] = Field(default=None, description="Data after translation") + error: Optional[str] = Field( + max_length=4096, default=None, description="Description of error on failure." + ) + + +class BaseIOMapperConfig(BaseModel): + validate_json_input: bool = Field( + default=False, description="Validate input against JSON schema." + ) + validate_json_output: bool = Field( + default=False, description="Validate output against JSON schema." + ) + system_prompt_template: str = Field( + max_length=4096, + default="You are a translation machine. You translate both natural language and object formats for computers. Response_format to { 'type': 'json_object' }", + description="System prompt Jinja2 template used with LLM service for translation.", + ) + message_template: str = Field( + max_length=4096, + default="The data is described {% if input.json_schema %}by the following JSON schema: {{ input.json_schema.model_dump(exclude_none=True) }}{% else %}as {{ input.description }}{% endif %}, and {%if output.json_schema %} the result must adhere strictly to the following JSON schema: {{ output.json_schema.model_dump(exclude_none=True) }}{% else %}as {{ output.description }}{% endif %}. The data to translate is: {{ data }}. It is absolutely crucial that each field and its type specified in the schema are followed precisely, without introducing any additional fields or altering types. Non-compliance will result in rejection of the output.", + description="Default user message template. This can be overridden by the message request.", + ) + + +class AgentIOMapperInput(BaseIOMapperInput): + message_template: Union[str, None] = Field( + max_length=4096, + default=None, + description="Message (user) to send to LLM to effect translation.", + ) + + +AgentIOMapperOutput = BaseIOMapperOutput + + +class FieldMetadata(BaseModel): + json_path: str = Field(..., description="A json path to the field in the object") + description: str = Field( + ..., description="A description of what the field represents" + ) + examples: Optional[List[str]] = Field( + None, + description="A list of examples that represents how the field in json_path is normaly populated", + ) + + +class IOMappingAgentMetadata(BaseModel): + input_fields: List[Union[str, FieldMetadata]] = Field( + ..., + description="an array of json paths representing fields to be used by io mapper in the mapping", + ) + output_fields: List[Union[str, FieldMetadata]] = Field( + ..., + description="an array of json paths representing firlds to be used by io mapper in the result", + ) + input_schema: Optional[dict[str, Any]] = Field( + default=None, description="defines the schema for the input data" + ) + output_schema: Optional[dict[str, Any]] = Field( + default=None, description="defines the schema for result of the mapping" + ) + output_description_prompt: Optional[str] = Field( + default=None, + description="A prompt structured using a Jinja template that will be used by the llm for a better mapping", + ) + field_mapping: Optional[dict[str, Union[str, Callable]]] = Field( + default=None, + description="A dictionary representing how the imperative mapping should be done where the keys are fields of the output object and values are JSONPath (strings)", + ) diff --git a/agntcy_iomapper/base/utils.py b/agntcy_iomapper/base/utils.py index 9c524b5..0a1aad5 100644 --- a/agntcy_iomapper/base/utils.py +++ b/agntcy_iomapper/base/utils.py @@ -1,11 +1,16 @@ import copy import json import logging -from typing import Any, Dict, List, Optional, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type, Union import jsonref +from openapi_pydantic import Schema +from pydantic import BaseModel -from agntcy_iomapper.agent.models import FieldMetadata +from agntcy_iomapper.base.models import ( + FieldMetadata, + IOMappingAgentMetadata, +) logger = logging.getLogger(__name__) @@ -105,7 +110,7 @@ def create_type_from_schema( else: curr_path_schema[curr_key] = {"type": "object", "properties": {}} if len(parts) == 1: - curr_path_schema[curr_key]["properties"] = flatten_json + curr_path_schema[curr_key] = flatten_json if field_description: curr_path_schema[curr_key]["description"] = field_description else: @@ -313,3 +318,29 @@ def _get_nested_value(data: Any, field_path: str) -> Optional[Any]: current = None return current + + +def get_io_types(data: Any, metadata: IOMappingAgentMetadata) -> Tuple[Schema, Schema]: + data_schema = None + + if isinstance(data, BaseModel): + data_schema = data.model_json_schema() + # If input schema is provided it overwrites the data schema + input_schema = metadata.input_schema if metadata.input_schema else data_schema + # If output schema is provided it overwrites the data schema + output_schema = metadata.output_schema if metadata.output_schema else data_schema + + if not input_schema or not output_schema: + raise ValueError( + "input_schema, and or output_schema are missing from the metadata, for a better accuracy you are required to provide them in this scenario, or we could not infer the type from the state" + ) + + input_type = Schema.model_validate( + create_type_from_schema(input_schema, metadata.input_fields) + ) + + output_type = Schema.model_validate( + create_type_from_schema(output_schema, metadata.output_fields) + ) + + return (input_type, output_type) diff --git a/agntcy_iomapper/imperative/imperative.py b/agntcy_iomapper/imperative/imperative.py index 6e136c4..6f041d0 100644 --- a/agntcy_iomapper/imperative/imperative.py +++ b/agntcy_iomapper/imperative/imperative.py @@ -21,6 +21,7 @@ import jsonschema from jsonpath_ng.ext import parse +from langgraph.utils.runnable import RunnableCallable from agntcy_iomapper.base import ( BaseIOMapper, @@ -37,34 +38,30 @@ class ImperativeIOMapper(BaseIOMapper): - field_mapping: dict[str, Union[str, Callable]] - """A dictionary for where the keys are fields of the output object - and values are JSONPath (strings) representing how the mapping - """ def __init__( self, - field_mapping: Optional[dict[str, Union[str, Callable]]], + input: ImperativeIOMapperInput, + field_mapping: dict[str, Union[str, Callable]], config: Optional[BaseIOMapperConfig] = None, ) -> None: super().__init__(config) self.field_mapping = field_mapping + self.input = input - def invoke( - self, input: ImperativeIOMapperInput - ) -> Optional[ImperativeIOMapperOutput]: - if input.data is None: + def invoke(self, data: any) -> dict: + _input = self.input if self.input else None + + if _input is None or _input.data is None: return None if self.field_mapping is None: - return ImperativeIOMapperOutput(data=input.data) + return _input.data - data = self._imperative_map(input) - return ImperativeIOMapperOutput(data=data) + data = self._imperative_map(_input) + return json.loads(data) - def ainvoke( - self, input: ImperativeIOMapperInput - ) -> Optional[ImperativeIOMapperOutput]: - return self.invoke(input) + async def ainvoke(self, state: any) -> dict: + return self.invoke() def _imperative_map(self, input_definition: ImperativeIOMapperInput) -> Any: """ @@ -75,21 +72,6 @@ def _imperative_map(self, input_definition: ImperativeIOMapperInput) -> Any: ValidationError if the data does not conform to the expected schema for the target type. - Parameters: - ---------- - data : Any - The input data to be converted. This can be of any type. - Returns: - ------- - Any - The converted data in the desired output type. - Raises: - ------ - ValidationError - If the input data does not conform to the expected schema for the - target type. - Notes: - ----- The function assumes that the caller provides a valid `input_schema`. Unsupported target types should be handled as needed within the function. """ @@ -151,3 +133,6 @@ def _set_jsonpath( copy_data[parts[-1]] = value return copy_data + + def as_runnable(self): + return RunnableCallable(self.invoke, self.ainvoke, name="extract", trace=False) diff --git a/agntcy_iomapper/langgraph/langgraph.py b/agntcy_iomapper/langgraph/langgraph.py index 8956886..3a078bf 100644 --- a/agntcy_iomapper/langgraph/langgraph.py +++ b/agntcy_iomapper/langgraph/langgraph.py @@ -9,9 +9,8 @@ from langgraph.utils.runnable import RunnableCallable from pydantic import Field -from agntcy_iomapper.agent.base import AgentIOMapper -from agntcy_iomapper.agent.models import ( - AgentIOMapperConfig, +from agntcy_iomapper.base import BaseIOMapper, BaseIOMapperConfig +from agntcy_iomapper.base.models import ( AgentIOMapperInput, AgentIOMapperOutput, ) @@ -22,7 +21,7 @@ LangGraphIOMapperOutput = AgentIOMapperOutput -class LangGraphIOMapperConfig(AgentIOMapperConfig): +class LangGraphIOMapperConfig(BaseIOMapperConfig): llm: Union[BaseChatModel, str] = ( Field( ..., @@ -31,7 +30,7 @@ class LangGraphIOMapperConfig(AgentIOMapperConfig): ) -class _LangGraphAgentIOMapper(AgentIOMapper): +class _LangGraphAgentIOMapper(BaseIOMapper): def __init__( self, config: Optional[LangGraphIOMapperConfig] = None, @@ -45,7 +44,7 @@ def __init__( else: self.llm = config.llm - def _invoke( + def invoke( self, input: LangGraphIOMapperInput, messages: list[dict[str, str]], @@ -56,7 +55,7 @@ def _invoke( response = self.llm.invoke(messages, config, **kwargs) return response.content - async def _ainvoke( + async def ainvoke( self, input: LangGraphIOMapperOutput, messages: list[dict[str, str]], @@ -79,7 +78,7 @@ def __init__( async def ainvoke(self, state: dict[str, Any], config: RunnableConfig) -> dict: input = self._input if self._input else state["input"] - response = await self._iomapper.ainvoke(input=input, config=config) + response = await self._iomapper._ainvoke(input=input, config=config) if response is not None: return response.data else: @@ -87,7 +86,7 @@ async def ainvoke(self, state: dict[str, Any], config: RunnableConfig) -> dict: def invoke(self, state: dict[str, Any], config: RunnableConfig) -> dict: input = self._input if self._input else state["input"] - response = self._iomapper.invoke(input=input, config=config) + response = self._iomapper._invoke(input=input, config=config) if response is not None: return response.data diff --git a/agntcy_iomapper/llamaindex/__init__.py b/agntcy_iomapper/llamaindex/__init__.py index aede91b..18624c0 100644 --- a/agntcy_iomapper/llamaindex/__init__.py +++ b/agntcy_iomapper/llamaindex/__init__.py @@ -1,6 +1,14 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0" -from agntcy_iomapper.agent import IOMappingWorkflow +from agntcy_iomapper.llamaindex.models import ( + IOMappingInputEvent, + IOMappingOutputEvent, + LLamaIndexIOMapperConfig, +) -__all__ = ["IOMappingWorkflow"] +__all__ = [ + "IOMappingInputEvent", + "IOMappingOutputEvent", + "LLamaIndexIOMapperConfig", +] diff --git a/agntcy_iomapper/llamaindex/create_llamaindex_iomapper.py b/agntcy_iomapper/llamaindex/create_llamaindex_iomapper.py deleted file mode 100644 index 51092a0..0000000 --- a/agntcy_iomapper/llamaindex/create_llamaindex_iomapper.py +++ /dev/null @@ -1,89 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0" -from llama_index.core.workflow import ( - StartEvent, - StopEvent, - Workflow, - step, -) -from openapi_pydantic import Schema -from pydantic import BaseModel - -from agntcy_iomapper.base import AgentIOMapperInput, ArgumentsDescription -from agntcy_iomapper.base.utils import _extract_nested_fields, create_type_from_schema -from agntcy_iomapper.llamaindex.llamaindex import ( - LLamaIndexIOMapper, - LLamaIndexIOMapperConfig, -) - - -class IOMappingWorkflow(Workflow): - @step - async def llamaindex_iomapper(self, evt: StartEvent) -> StopEvent: - """Generate a step to be included in a llamaindex workflow - ARGS: - workflow: The workflow where the step will be included at - Rerturns - a step to be included in the workflow - """ - ctx = evt.get("context", None) - - if not ctx: - return ValueError( - "A context must be present with the configuration of the llm" - ) - data = evt.get("data", None) - if not data: - return ValueError("data is required. Invalid or no data was passed") - - input_fields = evt.get("input_fields") - if not input_fields: - return ValueError("input_fields not set") - - output_fields = evt.get("output_fields") - if not output_fields: - return ValueError("output_fields not set") - - input_type = None - output_type = None - - if isinstance(data, BaseModel): - input_data_schema = data.model_json_schema() - output_type = Schema.validate_model( - create_type_from_schema(input_data_schema, output_fields) - ) - input_type = Schema.validate_model( - create_type_from_schema(input_data_schema, input_fields) - ) - else: - # Read the optional fields - input_schema = evt.get("input_schema", None) - output_schema = evt.get("output_schema", None) - if not input_schema or not output_schema: - raise ValueError( - "input_schema, and or output_schema are missing from the metadata, for a better accuracy you are required to provide them in this scenario" - ) - output_type = Schema.model_validate(output_schema) - input_type = Schema.model_validate(input_schema) - - data_to_be_mapped = _extract_nested_fields(data, fields=input_fields) - - input = AgentIOMapperInput( - input=ArgumentsDescription( - json_schema=input_type, - ), - output=ArgumentsDescription( - json_schema=output_type, - ), - data=data_to_be_mapped, - ) - - llm = await ctx.get("llm", None) - if not llm: - return StopEvent(result="You missed to config the llm") - - config = LLamaIndexIOMapperConfig(llm=llm) - io_mapping = LLamaIndexIOMapper(config=config, input=input) - mapping_res = await io_mapping.ainvoke() - - return StopEvent(result=mapping_res) diff --git a/agntcy_iomapper/llamaindex/llamaindex.py b/agntcy_iomapper/llamaindex/llamaindex.py index d47df91..69f79b8 100644 --- a/agntcy_iomapper/llamaindex/llamaindex.py +++ b/agntcy_iomapper/llamaindex/llamaindex.py @@ -1,41 +1,57 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0" import logging -from typing import Any, Optional - -from llama_index.core.base.llms.base import BaseLLM +from typing import Any, List, Optional, Sequence + +from jinja2 import Environment +from llama_index.core.agent.workflow import ( + FunctionAgent, +) +from llama_index.core.agent.workflow.workflow_events import ( + AgentOutput, +) from llama_index.core.llms import ChatMessage +from llama_index.core.memory import BaseMemory +from llama_index.core.tools import AsyncBaseTool, BaseTool +from llama_index.core.workflow import ( + Context, + Workflow, + step, +) from pydantic import Field -from agntcy_iomapper.base import AgentIOMapper, AgentIOMapperConfig, AgentIOMapperInput +from agntcy_iomapper.base import AgentIOMapperInput, ArgumentsDescription, BaseIOMapper +from agntcy_iomapper.base.models import AgentIOMapperOutput, IOMappingAgentMetadata +from agntcy_iomapper.base.utils import extract_nested_fields, get_io_types +from agntcy_iomapper.llamaindex.models import ( + IOMappingInputEvent, + IOMappingOutputEvent, + LLamaIndexIOMapperConfig, +) logger = logging.getLogger(__name__) -class LLamaIndexIOMapperConfig(AgentIOMapperConfig): - llm: BaseLLM = ( - Field( - ..., - description="Model to be used for translation as llama-index.", - ), - ) - - -class _LLmaIndexAgentIOMapper(AgentIOMapper): +class _LLmaIndexAgentIOMapper(BaseIOMapper): def __init__( self, config: Optional[LLamaIndexIOMapperConfig] = None, - **kwargs, + jinja_env: Optional[Environment] = None, + jinja_env_async: Optional[Environment] = None, ): if config is None: config = LLamaIndexIOMapperConfig() - super().__init__(config, **kwargs) + + super().__init__( + config=config, jinja_env=jinja_env, jinja_env_async=jinja_env_async + ) + if not config.llm: raise ValueError("Llm must be configured") else: self.llm = config.llm - def _invoke( + def invoke( self, input: AgentIOMapperInput, messages: list[dict[str, str]], @@ -46,7 +62,7 @@ def _invoke( response = self.llm.chat(llama_index_messages, **kwargs) return str(response) - async def _ainvoke( + async def ainvoke( self, input: AgentIOMapperInput, messages: list[dict[str, str]], @@ -54,30 +70,102 @@ async def _ainvoke( ) -> str: llama_index_messages = self._map_to_llama_index_messages(messages) response = await self.llm.achat(llama_index_messages, **kwargs) - return str(response) + return response.message.content def _map_to_llama_index_messages(self, messages: list[dict[str, str]]): return [ChatMessage(**message) for message in messages] -class LLamaIndexIOMapper: - def __init__(self, config: LLamaIndexIOMapperConfig, input: AgentIOMapperInput): - self._iomapper = _LLmaIndexAgentIOMapper(config) - self._input = input - - async def ainvoke(self) -> dict: - input = self._input - response = await self._iomapper.ainvoke(input=input) - if response is not None: - return response.data - else: - return {} +class LLamaIndexIOMapper(FunctionAgent): + mapping_metadata: IOMappingAgentMetadata = Field( + ..., + description="Object used to describe the input fields, output fields schema and any relevant information to be used in the mapping", + ) - def invoke(self, state: dict[str, Any]) -> dict: - input = self._input if self._input else state["input"] - response = self._iomapper.invoke(input=input) + def __init__(self, tools: List[BaseTool] = [], **kwargs: Any) -> None: + super().__init__( + tools=tools, + **kwargs, + ) - if response is not None: - return response.data - else: - return {} + async def take_step( + self, + ctx: Context, + llm_input: List[ChatMessage], + tools: Sequence[AsyncBaseTool], + memory: BaseMemory, + ) -> AgentOutput: + """Take a single step with the agent.""" + res = await self._run_step(llm_input=llm_input, ctx=ctx) + super_res = await super().take_step( + ctx=ctx, llm_input=llm_input, tools=tools, memory=memory + ) + # calling the super to be able to get the tools passed + res.tool_calls = super_res.tool_calls + + return res + + async def _run_step( + self, llm_input: List[ChatMessage], ctx: Context, **kwargs + ) -> AgentOutput: + """Take a single step with the function calling agent.""" + + config = LLamaIndexIOMapperConfig(llm=self.llm) + curr_state = await ctx.get("state") + mapping_result = await self._get_output( + config=config, metadata=self.mapping_metadata, input_data=curr_state + ) + + curr_state.update(mapping_result.data) + await ctx.set("state", curr_state) + + return AgentOutput( + response=mapping_result.data, + tool_calls=[], + raw=mapping_result, + current_agent_name=self.name, + ) + + @classmethod + async def _get_output( + cls, + config: LLamaIndexIOMapperConfig, + metadata: IOMappingAgentMetadata, + input_data: Any, + ) -> AgentIOMapperOutput: + """method used to invoke the llm to get the maping result""" + + _iomapper = _LLmaIndexAgentIOMapper(config) + + input_type, output_type = get_io_types(data=input_data, metadata=metadata) + + data_to_be_mapped = extract_nested_fields( + input_data, fields=metadata.input_fields + ) + + input = AgentIOMapperInput( + input=ArgumentsDescription( + json_schema=input_type, + ), + output=ArgumentsDescription(json_schema=output_type), + data=data_to_be_mapped, + ) + + mapping_result = await _iomapper._ainvoke(input) + return mapping_result + + @classmethod + def llamaindex_mapper(cls, workflow: Workflow): + """Adds a step to the given workflow""" + + @step(workflow=workflow) + async def io_mapper_step( + input_event: IOMappingInputEvent, + ) -> IOMappingOutputEvent: + mapping_res = await cls._get_output( + config=input_event.config, + metadata=input_event.metadata, + input_data=input_event.data, + ) + + return IOMappingOutputEvent(mapping_result=mapping_res.data) diff --git a/agntcy_iomapper/llamaindex/models.py b/agntcy_iomapper/llamaindex/models.py new file mode 100644 index 0000000..a1f308b --- /dev/null +++ b/agntcy_iomapper/llamaindex/models.py @@ -0,0 +1,79 @@ +from typing import Any + +from llama_index.core.base.llms.base import BaseLLM +from llama_index.core.workflow import ( + Event, +) +from pydantic import Field, model_validator +from typing_extensions import Self + +from agntcy_iomapper.base.models import ( + BaseIOMapperConfig, + FieldMetadata, + IOMappingAgentMetadata, +) + + +class LLamaIndexIOMapperConfig(BaseIOMapperConfig): + llm: BaseLLM = ( + Field( + ..., + description="Model to be used for translation as llama-index.", + ), + ) + + +class IOMappingOutputEvent(Event): + mapping_result: dict = Field( + ..., description="This is where the mapping result will be populated" + ) + + +class IOMappingInputEvent(Event): + metadata: IOMappingAgentMetadata = Field( + ..., + description="this object represents information relative to input fields output fields and other io mapping related information", + ) + config: LLamaIndexIOMapperConfig = Field( + ..., + description="this object contains information such as the llm instance that will be used to perform the translation", + ) + data: Any = Field( + ..., description="represents the input data to be used in the translation" + ) + + @model_validator(mode="after") + def _validate_obj(self) -> Self: + + if not self.metadata.input_fields or len(self.metadata.input_fields) == 0: + raise ValueError("input_fields not found in the metadata") + # input fields must have a least one non empty string + valid_input = [ + field + for field in self.metadata.input_fields + if (isinstance(field, str) and len(field.strip()) > 0) + or (isinstance(field, FieldMetadata) and len(field.json_path.strip()) > 0) + ] + + if not len(valid_input): + raise ValueError("input_fields must have at least one field") + else: + self.metadata.input_fields = valid_input + + if not self.metadata.output_fields: + raise ValueError("output_fields not found in the metadata") + + # output fields must have a least one non empty string + valid_output = [ + field + for field in self.metadata.output_fields + if (isinstance(field, str) and len(field.strip()) > 0) + or (isinstance(field, FieldMetadata) and len(field.json_path.strip()) > 0) + ] + + if not len(valid_output): + raise ValueError("output_fields must have at least one field") + else: + self.metadata.output_fields = valid_output + + return self diff --git a/agntcy_iomapper/pydantic_ai.py b/agntcy_iomapper/pydantic_ai.py index b48083a..5fd6129 100644 --- a/agntcy_iomapper/pydantic_ai.py +++ b/agntcy_iomapper/pydantic_ai.py @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 import logging -from typing import Any, Literal, Optional, TypedDict +from typing import Any, Literal, Optional, Union from openai import AsyncAzureOpenAI from pydantic import Field, model_validator @@ -16,13 +16,13 @@ ) from pydantic_ai.models import KnownModelName from pydantic_ai.models.openai import OpenAIModel -from typing_extensions import Self +from typing_extensions import Self, TypedDict -from agntcy_iomapper.agent import ( - AgentIOMapper, - AgentIOMapperConfig, +from agntcy_iomapper.base import BaseIOMapper +from agntcy_iomapper.base.models import ( AgentIOMapperInput, AgentIOMapperOutput, + BaseIOMapperConfig, ) logger = logging.getLogger(__name__) @@ -62,7 +62,7 @@ class PydanticAIAgentIOMapperInput(AgentIOMapperInput): PydanticAIAgentIOMapperOutput = AgentIOMapperOutput -class PydanticAIAgentIOMapperConfig(AgentIOMapperConfig): +class PydanticAIAgentIOMapperConfig(BaseIOMapperConfig): models: dict[str, AgentIOModelArgs] = Field( default={"azure:gpt-4o-mini": AgentIOModelArgs()}, description="LLM configuration to use for translation", @@ -90,14 +90,14 @@ def _validate_obj(self) -> Self: return self -SupportedModelName = ( - KnownModelName - | Literal[ +SupportedModelName = Union[ + KnownModelName, + Literal[ "azure:gpt-4o-mini", "azure:gpt-4o", "azure:gpt-4", - ] -) + ], +] def get_supported_agent( @@ -131,7 +131,7 @@ def get_supported_agent( return Agent(model_name, **kwargs) -class PydanticAIIOAgentIOMapper(AgentIOMapper): +class PydanticAIIOAgentIOMapper(BaseIOMapper): def __init__( self, config: PydanticAIAgentIOMapperConfig, @@ -197,7 +197,7 @@ def _get_prompts( return (system_prompt, user_prompt, message_history) - def _invoke( + def invoke( self, input: PydanticAIAgentIOMapperInput, messages: list[dict[str, str]], @@ -213,7 +213,7 @@ def _invoke( ) return response.data - async def _ainvoke( + async def ainvoke( self, input: PydanticAIAgentIOMapperInput, messages: list[dict[str, str]], diff --git a/examples/Makefile b/examples/Makefile index 4ea5aea..5d799bf 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -15,7 +15,7 @@ install: venv $(PIP) install -r $(REQUIREMENTS) run_imperative_eg: install - $(PYTHON) $(EXAMPLES)/imperative.py + $(PYTHON) langgraph/imperative.py run_lg_eg_pyd: install $(PYTHON) langgraph/langgraph_pydantic.py @@ -26,6 +26,12 @@ run_lg_eg_td: install run_llma_idx: install $(PYTHON) llamaindex/llamaindex_workflow.py + +run_llma_wfl: install + $(PYTHON) llamaindex/llamaindex_agent_workflow.py + + + run_acp: install $(PYTHON) acp/__main__.py diff --git a/examples/imperative.py b/examples/langgraph/imperative.py similarity index 51% rename from examples/imperative.py rename to examples/langgraph/imperative.py index 70d2850..918d1f5 100644 --- a/examples/imperative.py +++ b/examples/langgraph/imperative.py @@ -1,20 +1,30 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 -import json import logging from datetime import datetime +from typing import Optional from langchain_core.language_models import FakeListChatModel from langchain_core.messages import HumanMessage -from openapi_pydantic import Schema -from pydantic import BaseModel +from langgraph.graph import END, StateGraph +from pydantic import BaseModel, Field -from agntcy_iomapper import ImperativeIOMapper -from agntcy_iomapper.base import ArgumentsDescription, IOMapperInput +from agntcy_iomapper import IOMappingAgent, IOMappingAgentMetadata logger = logging.getLogger(__name__) +class InputQuiz(BaseModel): + prof_question: str + due_date: str + + +class OverallState(BaseModel): + question: Optional[str] = Field(None) + quiz: Optional[InputQuiz] = Field(None) + student_answer: Optional[str] = Field(None) + + class ProfessorAgent: """ This agent mission is to test it's students knowledges @@ -31,14 +41,9 @@ class ProfessorAgent: def __init__(self) -> None: self.model = FakeListChatModel(responses=self.predefined_questions) - def ask_question(self) -> str: + def ask_question(self, state: OverallState) -> OverallState: response = self.model.invoke([HumanMessage(content="Generate a question")]) - return str(response.content) - - -class InputQuiz(BaseModel): - prof_question: str - due_date: str + return {"question": str(response.content)} class StudentAgent: @@ -57,9 +62,11 @@ class StudentAgent: def __init__(self) -> None: self.model = FakeListChatModel(responses=self.predefined_answers) - def answer(self, quiz) -> str: - response = self.model.invoke([HumanMessage(content=quiz.prof_question)]) - return str(response.content) + def answer(self, state: OverallState) -> str: + response = self.model.invoke( + [HumanMessage(content=state.quiz["prof_question"])] + ) + return {"student_answer": str(response.content)} class MultiAgentApp: @@ -68,48 +75,34 @@ def run_app(): agent_prof = ProfessorAgent() agent_student = StudentAgent() - output_prof = agent_prof.ask_question() - - prof_agent_output_schema = {"question": {"type": "string"}} - student_agent_schema = { - "quiz": { - "type": "object", - "properties": { - "prof_question": {"type": "string"}, - "due_date": {"type": "string"}, - }, - } - } - mapping_object = { - "prof_question": "$.question", - "due_date": lambda _: datetime.now().strftime("%x"), + "quiz.prof_question": "$.question", + "quiz.due_date": lambda _: datetime.now().strftime("%x"), } - input = IOMapperInput( - input=ArgumentsDescription( - json_schema=Schema.model_validate(prof_agent_output_schema) - ), - output=ArgumentsDescription( - json_schema=Schema.model_validate(student_agent_schema) - ), - data={"question": output_prof}, - ) - - imerative_mapp = ImperativeIOMapper( + metadata = IOMappingAgentMetadata( field_mapping=mapping_object, + input_fields=["question"], + output_fields=["quiz"], ) - print(f"professors question was {output_prof}") + iomap = IOMappingAgent(metadata=metadata) + + graph = StateGraph(OverallState) + graph.add_node("student_node", agent_student.answer) + graph.add_node("professor_node", agent_prof.ask_question) + graph.add_node("mapping_node", iomap.langgraph_imperative) - mapping_result = imerative_mapp.invoke(input=input) + graph.add_edge("professor_node", "mapping_node") + graph.add_edge("mapping_node", "student_node") + graph.add_edge("student_node", END) - print(f"the mapping_result was {mapping_result}") + graph.set_entry_point("professor_node") - response = agent_student.answer(InputQuiz(**(json.loads(mapping_result.data)))) + app = graph.compile() - print(f"student response was {response}") - # map data between agents + result = app.invoke({"question": ""}) + print(result) def run(): diff --git a/examples/langgraph/langgraph_typedict.py b/examples/langgraph/langgraph_typedict.py index 05bf4fb..458f9db 100644 --- a/examples/langgraph/langgraph_typedict.py +++ b/examples/langgraph/langgraph_typedict.py @@ -10,6 +10,7 @@ from agntcy_iomapper import FieldMetadata, IOMappingAgent, IOMappingAgentMetadata from examples.llm import get_azure from examples.models import RecipeQuery, RecipeResponse +from examples.models.data import recipes class GraphState(TypedDict): @@ -19,13 +20,6 @@ class GraphState(TypedDict): formatted_output: Union[str, None] -# Small in-memory dataset of recipes stored as raw text -recipes = [ - "Pasta Primavera: Ingredients - pasta, tomato, garlic, olive oil. Instructions - Boil pasta, sauté garlic and tomatoes in olive oil, mix together.", - "Avocado Toast: Ingredients - bread, avocado, salt, pepper. Instructions - Toast bread, mash avocado, spread on toast, add salt and pepper.", - "Omelette: Ingredients - egg, cheese, onion, butter. Instructions - Beat eggs, cook in butter, add cheese and onions, fold omelette.", -] - embed = FakeEmbeddings(size=100) vector_store = FAISS.from_texts(recipes, embed) retriever = vector_store.as_retriever() diff --git a/examples/llamaindex/llamaindex_agent_workflow.py b/examples/llamaindex/llamaindex_agent_workflow.py new file mode 100644 index 0000000..b2afdf9 --- /dev/null +++ b/examples/llamaindex/llamaindex_agent_workflow.py @@ -0,0 +1,170 @@ +from typing import List, TypedDict, Union + +from llama_index.core import ( + GPTVectorStoreIndex, + StorageContext, +) +from llama_index.core.agent.workflow import ( + AgentOutput, + AgentWorkflow, + FunctionAgent, + ToolCall, + ToolCallResult, +) +from llama_index.core.schema import TextNode +from llama_index.core.workflow import Context +from llama_index.embeddings.huggingface import HuggingFaceEmbedding +from llama_index.vector_stores.docarray import DocArrayInMemoryVectorStore +from pydantic import TypeAdapter + +from agntcy_iomapper import FieldMetadata, IOMappingAgent, IOMappingAgentMetadata +from examples.llm import Framework, get_azure +from examples.models import RecipeQuery, RecipeResponse +from examples.models.data import recipes + + +class GraphState(TypedDict): + query: RecipeQuery + documents: Union[List[TextNode], None] + recipe: Union[RecipeResponse, None] + formatted_output: Union[str, None] + + +nodes = [TextNode(text=recipe) for recipe in recipes] +vector_store = DocArrayInMemoryVectorStore() +storage_context = StorageContext.from_defaults(vector_store=vector_store) + +embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") + +index = GPTVectorStoreIndex( + nodes, storage_context=storage_context, embed_model=embed_model +) + +llm = get_azure(framework=Framework.LLAMA_INDEX) + + +async def get_recipe(ctx: Context) -> str: + """Useful for getting recipe from vector store.""" + current_state = await ctx.get("state") + query = current_state["query"]["ingredients"] + + query_engine = index.as_retriever(similarity_top_k=3, llm=llm) + + response = query_engine.retrieve(",".join(query)) + current_state["documents"] = [response[0].node] + await ctx.set("state", current_state) + return "Found recipe document needs mapping" + + +async def format_recipe(ctx: Context) -> str: + """Formats the recipe for user display.""" + current_state = await ctx.get("state") + + recipe: RecipeResponse = current_state["recipe"] + title = recipe.get("title", "") + ingredients = recipe.get("ingredients", []) + instructions = recipe.get("instructions", "") + return { + "formatted_output": f"Recipe: {title}\n" + f"Ingredients: {', '.join(ingredients)}\n" + f"Instructions: {instructions}" + } + + +async def got_to_format(ctx: Context) -> str: + return "Got to format recipe" + + +recipie_library_agent = FunctionAgent( + name="RecipeAgent", + description="Expert in finding recipe in a in memory database", + tools=[get_recipe], + llm=llm, + verbose=True, + can_handoff_to=["IOMapperAgent"], + system_prompt=""" \ +You are an agent designed to answer queries over a set of recipes. +Please use the tools provided to answer a question as possible. +""", +) + +mapping_metadata = IOMappingAgentMetadata( + input_fields=["documents.0.text"], + output_fields=[ + FieldMetadata( + json_path="recipe", + description="this is a recipe for the ingredients you've provided", + ) + ], + input_schema=TypeAdapter(GraphState).json_schema(), + output_schema={ + "type": "object", + "properties": { + "title": {"type": "string"}, + "ingredients": {"type": "array", "items": {"type": "string"}}, + "instructions": {"type": "string"}, + }, + "required": ["title", "ingredients, instructions"], + }, +) + +io_mapping_agent = IOMappingAgent.as_workflow_agent( + mapping_metadata=mapping_metadata, + llm=llm, + name="IOMapperAgent", + description="Useful for mapping a recipe document into recipe object", + can_handoff_to=["Formatter_Agent"], + tools=[got_to_format], +) + +formatter_agent = FunctionAgent( + name="Formatter_Agent", + llm=llm, + description="Useful for formatting a recipe object and return it as a string", + sytems_prompt="""\ + """, + tools=[format_recipe], +) + +agent_workflow = AgentWorkflow( + agents=[recipie_library_agent, io_mapping_agent, formatter_agent], + root_agent=recipie_library_agent.name, + initial_state={"query": {"ingredients": ["pasta", "tomato"]}}, +) + + +async def main(): + handler = agent_workflow.run(user_msg="pasta, tomato output in a readable format") + + current_agent = None + + async for event in handler.stream_events(): + if ( + hasattr(event, "current_agent_name") + and event.current_agent_name != current_agent + ): + current_agent = event.current_agent_name + print(f"\n{'='*50}") + print(f"🤖 Agent: {current_agent}") + print(f"{'='*50}\n") + elif isinstance(event, AgentOutput): + if event.response.content: + print("📤 Output:", event.response.content) + if event.tool_calls: + print( + "🛠️ Planning to use tools:", + [call.tool_name for call in event.tool_calls], + ) + elif isinstance(event, ToolCallResult): + print(f"🔧 Tool Result ({event.tool_name}):") + print(f" Arguments: {event.tool_kwargs}") + print(f" Output: {event.tool_output}") + elif isinstance(event, ToolCall): + print(f"🔨 Calling Tool: {event.tool_name}") + print(f" With arguments: {event.tool_kwargs}") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/llamaindex/llamaindex_workflow.py b/examples/llamaindex/llamaindex_workflow.py index c4784f3..d61a255 100644 --- a/examples/llamaindex/llamaindex_workflow.py +++ b/examples/llamaindex/llamaindex_workflow.py @@ -1,6 +1,6 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 -from typing import Any, List, TypedDict +from typing import List, Optional from llama_index.core.output_parsers import PydanticOutputParser from llama_index.core.workflow import ( @@ -11,11 +11,16 @@ Workflow, step, ) -from pydantic import TypeAdapter +from pydantic import BaseModel, Field -from agntcy_iomapper.llamaindex import IOMappingWorkflow +from agntcy_iomapper import IOMappingAgent, IOMappingAgentMetadata +from agntcy_iomapper.llamaindex import ( + IOMappingInputEvent, + IOMappingOutputEvent, + LLamaIndexIOMapperConfig, +) from examples.llm import Framework, get_azure -from examples.models import Campaign, User +from examples.models import Campaign, Statistics, User from examples.models.data import users @@ -31,9 +36,9 @@ class CampaignCreatedEvent(Event): campaign: Campaign -class OverallState(TypedDict): +class OverallState(BaseModel): campaign_details: Campaign - stats: Any + stats: Optional[Statistics] = Field(None) selected_users: List[User] @@ -51,8 +56,8 @@ async def pick_users_step( @step async def create_campaign( - self, ctx: Context, ev: CreateCampaignEvent, io_mapping_workflow: Workflow - ) -> StopEvent: + self, ctx: Context, ev: CreateCampaignEvent + ) -> IOMappingInputEvent: prompt = f""" You are a campaign builder for company XYZ. Given a list of selected users and a user prompt, create an engaging campaign. Return the campaign details as a JSON object with the following structure: @@ -70,27 +75,37 @@ async def create_campaign( llm_response = llm.complete(prompt) try: campaign_details = parser.parse(str(llm_response)) - result = await io_mapping_workflow.run( - context=ctx, - data={ - "campaign_details": campaign_details, - "stats": None, - "selected_users": ev.list_users, - }, + metadata = IOMappingAgentMetadata( input_fields=["selected_users", "campaign_details.name"], output_fields=["stats"], ) - return StopEvent(result=result) + config = LLamaIndexIOMapperConfig(llm=llm) + + io_mapping_input_event = IOMappingInputEvent( + metadata=metadata, + config=config, + data=OverallState( + campaign_details=campaign_details, + selected_users=ev.list_users, + ), + ) + return io_mapping_input_event except Exception as e: print(f"Error parsing campaign details: {e}") return StopEvent(result=f"{e}") + @step + async def after_translation(self, evt: IOMappingOutputEvent) -> StopEvent: + return StopEvent(result="Done") + async def main(): llm = get_azure(framework=Framework.LLAMA_INDEX) w = CampaignWorkflow() - w.add_workflows(io_mapping_workflow=IOMappingWorkflow()) + + IOMappingAgent.as_worfklow_step(workflow=w) result = await w.run(prompt="Create a campaign for all users", llm=llm) + print(result) diff --git a/examples/models/data.py b/examples/models/data.py index 12ae389..bb3b116 100644 --- a/examples/models/data.py +++ b/examples/models/data.py @@ -6,3 +6,10 @@ User(name="Alice", email="alice@example.com", phone="123-456-7890"), User(name="Bob", email="bob@example.com", phone="987-654-3210"), ] + +# Small in-memory dataset of recipes stored as raw text +recipes = [ + "Pasta Primavera: Ingredients - pasta, tomato, garlic, olive oil. Instructions - Boil pasta, sauté garlic and tomatoes in olive oil, mix together.", + "Avocado Toast: Ingredients - bread, avocado, salt, pepper. Instructions - Toast bread, mash avocado, spread on toast, add salt and pepper.", + "Omelette: Ingredients - egg, cheese, onion, butter. Instructions - Beat eggs, cook in butter, add cheese and onions, fold omelette.", +] diff --git a/examples/requirements.txt b/examples/requirements.txt index 1c55dac..bff92c4 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -8,5 +8,6 @@ faiss-cpu==1.10.0 llama-index-core==0.12.22 llama-index-utils-workflow==0.3.0 llama-index-llms-azure-openai==0.3.1 +llama-index-vector-stores-docarray>=0.3.0 +llama-index-embeddings-huggingface>=0.5.2 -e ../ --e ../../acp-sdk/acp-sdk/ diff --git a/tests/agentio_data.py b/tests/agentio_data.py index 55c9476..a0449ba 100644 --- a/tests/agentio_data.py +++ b/tests/agentio_data.py @@ -2,11 +2,11 @@ # SPDX-License-Identifier: Apache-2.0 from openapi_pydantic import DataType, Schema -from agntcy_iomapper.agent import ( +from agntcy_iomapper.base import ( AgentIOMapperInput, AgentIOMapperOutput, + ArgumentsDescription, ) -from agntcy_iomapper.base import ArgumentsDescription AGENTIO_TEST_PARAMETERS_TRANSLATIONS = [ ( diff --git a/tests/test_agent_iomapper_from_manifest.py b/tests/test_agent_iomapper_from_manifest.py index 48e40b8..bf8ea94 100644 --- a/tests/test_agent_iomapper_from_manifest.py +++ b/tests/test_agent_iomapper_from_manifest.py @@ -7,11 +7,9 @@ import pytest from langgraph.graph import END, START, StateGraph -from agntcy_iomapper.agent.models import ( +from agntcy_iomapper.base import ( AgentIOMapperInput, AgentIOMapperOutput, -) -from agntcy_iomapper.base import ( ArgumentsDescription, ) from agntcy_iomapper.langgraph import create_langraph_iomapper diff --git a/tests/test_imperative_iomapper.py b/tests/test_imperative_iomapper.py index d9aa9ad..23c7a88 100644 --- a/tests/test_imperative_iomapper.py +++ b/tests/test_imperative_iomapper.py @@ -177,21 +177,19 @@ def test_imperative_iomapp( input_schema, output_schema, field_mapping, input_value, expected ) -> None: """Test imperative io mapping""" - - io_mapp = ImperativeIOMapper( - field_mapping=field_mapping, - ) - input = ImperativeIOMapperInput( input=ArgumentsDescription(json_schema=Schema.model_validate(input_schema)), output=ArgumentsDescription(json_schema=Schema.model_validate(output_schema)), data=input_value, ) - actual = io_mapp.invoke(input=input) + + io_mapp = ImperativeIOMapper(field_mapping=field_mapping, input=input) + + actual = io_mapp.invoke(data=json.dumps(input_value)) # When test returns none fail the test if actual is None: assert True is False return - assert expected == json.loads(actual.data) + assert expected == actual diff --git a/tests/test_langgraph_agent_iomapper.py b/tests/test_langgraph_agent_iomapper.py index df4feb8..8ba7d48 100644 --- a/tests/test_langgraph_agent_iomapper.py +++ b/tests/test_langgraph_agent_iomapper.py @@ -6,7 +6,7 @@ import pytest from langgraph.graph import END, START, StateGraph -from agntcy_iomapper.agent import ( +from agntcy_iomapper.base import ( AgentIOMapperInput, ) from agntcy_iomapper.langgraph import ( diff --git a/tests/test_langgraph_graph_with_io_mapper.py b/tests/test_langgraph_graph_with_io_mapper.py index 092b83a..a9367a2 100644 --- a/tests/test_langgraph_graph_with_io_mapper.py +++ b/tests/test_langgraph_graph_with_io_mapper.py @@ -6,7 +6,7 @@ from langgraph.graph import END, StateGraph from pydantic import BaseModel, Field -from agntcy_iomapper import IOMappingAgent, IOMappingAgentMetadata +from agntcy_iomapper import FieldMetadata, IOMappingAgent, IOMappingAgentMetadata from examples.models import Campaign, Communication, Statistics, User from examples.models.data import users @@ -31,8 +31,12 @@ def test_langgraph_agent_in_a_graph_application(llm_instance): metadata = IOMappingAgentMetadata( input_fields=["selected_users", "campaign_details.name"], - output_fields=["stats.status"], - output_description_prompt="the status value must contain value yes or no, if selected_users is not empty, than it should have yes otherwise no", + output_fields=[ + FieldMetadata( + json_path="stats.status", + description="the status value must contain value yes or no, if selected_users is not empty, than it should have yes otherwise no", + ) + ], ) mapping_agent = IOMappingAgent(metadata=metadata, llm=llm_instance) diff --git a/tests/test_llamaindex_agent_iomapper.py b/tests/test_llamaindex_agent_iomapper.py deleted file mode 100644 index 7c2e44f..0000000 --- a/tests/test_llamaindex_agent_iomapper.py +++ /dev/null @@ -1 +0,0 @@ -def test_llamaindex_io_mapper(): ... diff --git a/tests/test_pydantic_ai_agent_iomapper.py b/tests/test_pydantic_ai_agent_iomapper.py index 15df7d5..ff895df 100644 --- a/tests/test_pydantic_ai_agent_iomapper.py +++ b/tests/test_pydantic_ai_agent_iomapper.py @@ -6,7 +6,7 @@ from deepdiff import diff from jinja2.sandbox import SandboxedEnvironment -from agntcy_iomapper.agent import AgentIOMapper +from agntcy_iomapper.base import BaseIOMapper from agntcy_iomapper.pydantic_ai import ( AgentIOModelArgs, AgentModelSettings, @@ -66,7 +66,7 @@ async def test_agent_mapping_async( llmIOMapper = PydanticAIIOAgentIOMapper( llm_iomapper_config, jinja_env_async=jinja_env_async ) - output = await llmIOMapper.ainvoke(input) + output = await llmIOMapper._ainvoke(input) if isinstance(output.data, str): equalp = await compare_outputs_async( llmIOMapper, output.data, expected_output.data @@ -87,7 +87,7 @@ async def test_agent_mapping_async( @pytest.mark.llm def test_agent_mapping(llm_iomapper_config, jinja_env, input, expected_output): llmIOMapper = PydanticAIIOAgentIOMapper(llm_iomapper_config, jinja_env=jinja_env) - output = llmIOMapper.invoke(input) + output = llmIOMapper._invoke(input) if isinstance(output.data, str): equalp = compare_outputs(llmIOMapper, output.data, expected_output.data) assert equalp @@ -120,9 +120,7 @@ def test_agent_mapping(llm_iomapper_config, jinja_env, input, expected_output): """ -async def compare_outputs_async( - iomapper: AgentIOMapper, text1: str, text2: str -) -> bool: +async def compare_outputs_async(iomapper: BaseIOMapper, text1: str, text2: str) -> bool: model_name = iomapper.config.default_model agent = get_supported_agent( model_name, @@ -139,7 +137,7 @@ async def compare_outputs_async( return match is not None and match.startswith("t") -def compare_outputs(iomapper: AgentIOMapper, text1: str, text2: str) -> bool: +def compare_outputs(iomapper: BaseIOMapper, text1: str, text2: str) -> bool: model_name = iomapper.config.default_model agent = get_supported_agent( model_name, diff --git a/tests/unittests/test_langgraph_node.py b/tests/unittests/test_langgraph_node.py index 8b80f5d..b890087 100644 --- a/tests/unittests/test_langgraph_node.py +++ b/tests/unittests/test_langgraph_node.py @@ -1,3 +1,5 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 import pytest from langchain_core.language_models import FakeListChatModel diff --git a/tests/unittests/test_schema_mapping.py b/tests/unittests/test_schema_mapping.py index 1650023..dbffd00 100644 --- a/tests/unittests/test_schema_mapping.py +++ b/tests/unittests/test_schema_mapping.py @@ -1,3 +1,5 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 from typing import List, Optional from pydantic import BaseModel, Field diff --git a/tests/unittests/test_schema_union_types.py b/tests/unittests/test_schema_union_types.py index d36b824..c255a2e 100644 --- a/tests/unittests/test_schema_union_types.py +++ b/tests/unittests/test_schema_union_types.py @@ -1,9 +1,12 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 + from enum import Enum from typing import List, Optional from pydantic import BaseModel, Field -from agntcy_iomapper.agent.models import FieldMetadata +from agntcy_iomapper.base.models import FieldMetadata from agntcy_iomapper.base.utils import create_type_from_schema