Skip to content

Feat/iomapper llamaindex #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ setup: set_python_env

# ============================

setup_test: set_python_env
setup_test:
@poetry install --with=test --all-extras

test: setup_test
poetry run pytest -vvrx
test_manifest: setup_test
poetry run pytest tests/test_agent_iomapper_from_manifest.py

test_langgraph_agent: setup_test
poetry run pytest tests/test_langgraph_agent_iomapper.py
27 changes: 25 additions & 2 deletions agntcy_iomapper/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
from .agent_iomapper import *
from .base import *
from .agent_iomapper import (
AgentIOMapper,
AgentIOMapperConfig,
AgentIOMapperInput,
AgentIOMapperOutput,
)
from .base import (
ArgumentsDescription,
BaseIOMapper,
BaseIOMapperConfig,
BaseIOMapperInput,
BaseIOMapperOutput,
)

__all__ = [
"AgentIOMapperConfig",
"ArgumentsDescription",
"AgentIOMapperInput",
"AgentIOMapperOutput",
"AgentIOMapper",
"BaseIOMapperInput",
"BaseIOMapperOutput",
"BaseIOMapperConfig",
"BaseIOMapper",
]
2 changes: 2 additions & 0 deletions agntcy_iomapper/base/agent_iomapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def _get_output(
f'{{"data": {json.dumps(outputs)} }}'
)

logger.debug(f"{outputs}")

# Check if data is returned in JSON markdown text
matches = self._json_search_pattern.findall(outputs)
if matches:
Expand Down
178 changes: 178 additions & 0 deletions agntcy_iomapper/base/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from typing import Any, Dict, List, Optional, Type

from openapi_pydantic import Schema


def _create_type_from_schema(
json_schema: Dict[str, Any], pick_fields: List[str]
) -> Optional[Type]:
"""
Creates a new Pydantic model with only the specified fields from a JSON schema.

Args:
schema: The JSON schema of the original object.
fields: A list of field names to include in the new model.

Returns:
A new Pydantic model class containing only the specified fields.
"""
defs = json_schema.get("$defs", {})
properties = json_schema.get("properties", {})
filtered_properties = {}

for path in pick_fields:
parts = path.split(".")
root_item = parts[0]
prop = properties[root_item]

if "anyOf" in prop:
final_schema = []
filtered_properties[root_item] = {}
_extract_schema(prop, defs, final_schema)
filtered_properties[root_item]["anyOf"] = final_schema

elif "items" in prop:
final_schema = []
_extract_schema(prop, defs, final_schema)
filtered_properties[root_item] = {"type": "array", "items": final_schema}

elif "$ref" in prop:
resolved_def = resolve_ref(prop.get("$ref"), defs)
filtered_properties[root_item] = resolved_def

else:
final_schema = []
filtered_properties[root_item] = {}
_extract_schema(prop, defs, final_schema)
filtered_properties[root_item] = final_schema
# TODO - remove fields not selected from the output
# filtered_properties = _refine_schema(filtered_properties, pick_fields)

return Schema.model_validate(filtered_properties)


def _extract_schema(json_schema, defs, schema):
if "anyOf" in json_schema:
for val in json_schema.get("anyOf"):
_extract_schema(val, defs, schema)
elif "items" in json_schema:
item = json_schema.get("items")
_extract_schema(item, defs, schema)
elif "$ref" in json_schema:
ref = json_schema.get("$ref")
schema.append(resolve_ref(ref, defs))
elif "type" in json_schema:
schema.append(json_schema)
else:
return


def _extract_nested_fields(data: Any, fields: List[str]) -> dict:
"""Extracts specified fields from a potentially nested data structure
Args:
data: The input data (can be any type)
fields: A list of fields path (e.g.. "fielda.fieldb")
Returns:
A dictionary containing the extracted fields and their values.
Returns empty dictionary if there are errors
"""
if not fields:
return {}

results = {}

for field_path in fields:
try:
value = _get_nested_value(data, field_path)
results[field_path] = value
except (KeyError, TypeError, AttributeError, ValueError) as e:
print(f"Error extracting field {field_path}: {e}")
return results


def _get_nested_value(data: Any, field_path: str) -> Optional[Any]:
"""
Recursively retrieves a value from a nested data structure
"""
current = data
parts = field_path.split(".")

for part in parts:
if isinstance(current, dict):
current = current[part]
elif isinstance(current, list) and part.isdigit():
current = current[int(part)]
elif hasattr(current, part):
current = getattr(current, part)
else:
current = None

return current


def resolve_ref(ref, current_defs):
ref_parts = ref.split("/")
current = current_defs
for part in ref_parts[2:]:
current = current.get(part)
return current


def _refine_schema(schema, paths):
filtered_schema = {}
print(f"{paths}-{schema}")

for path in paths:
path_parts = path.split(".")
if path_parts[0] in schema:
key = path_parts[0]
root_schema = schema.get(key)
properties = {}
if "anyOf" in root_schema:
filtered_schema[key] = {}
sub_schemas = root_schema.get("anyOf")

for sub_schema in sub_schemas:
if "properties" in sub_schema:
curr_properties = sub_schema.get("properties")
properties = _filter_properties(
sub_schema, curr_properties, path_parts[1:]
)
if key in filtered_schema:
if "anyOf" not in filtered_schema:
filtered_schema[key]["anyOf"] = [
{"properties": properties}
]
else:
filtered_schema[key]["anyOf"]

elif "items" in root_schema:
sub_schemas = root_schema.get("items")
elif "properties" in root_schema:
curr_properties = root_schema.get("properties")
properties = _filter_properties(
root_schema, curr_properties, path_parts[1:]
)
print(f" after {paths}-{filtered_schema}")
return filtered_schema


def _filter_properties(schema, properties, paths):
filtered_schema = {}

if len(paths) == 0:
return schema

for path in paths:
if path in properties:
filtered_schema[path] = properties.get(path)
if "properties" in filtered_schema[path]:
return _filter_properties(
schema, filtered_schema[path].get("properties"), paths[1:]
)
elif path in schema:
filtered_schema[path] = schema.get(path)
else:
continue

return filtered_schema
10 changes: 9 additions & 1 deletion agntcy_iomapper/langgraph/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0"

from .create_langraph_iomapper import create_langraph_iomapper, io_mapper_node
from .langgraph import (
LangGraphIOMapper,
LangGraphIOMapperConfig,
LangGraphIOMapperInput,
LangGraphIOMapperOutput,
)

from .create_langraph_iomapper import create_langraph_iomapper
__all__ = [
"create_langraph_iomapper",
"io_mapper_node",
"LangGraphIOMapper",
"LangGraphIOMapperConfig",
"LangGraphIOMapperInput",
"LangGraphIOMapperOutput",
]
80 changes: 80 additions & 0 deletions agntcy_iomapper/langgraph/create_langraph_iomapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0"
from typing import Any

from langchain_core.runnables import Runnable
from pydantic import BaseModel

from agntcy_iomapper.base import AgentIOMapperInput, ArgumentsDescription
from agntcy_iomapper.base.utils import _create_type_from_schema, _extract_nested_fields

from .langgraph import (
LangGraphIOMapper,
Expand All @@ -20,3 +26,77 @@ def create_langraph_iomapper(
A runnable representing an agent. It returns as output the mapping result
"""
return LangGraphIOMapper(config).as_runnable()


def io_mapper_node(data: Any, config: dict) -> Runnable:
"""Creates a langgraph node
Args:
data: represents the state of the graph
config: is the runnable config inject by langgraph framework
metadata has the following structure
- input_fields: Required, it expects an array of fields to be used in the mapping, this fields must be in the state eg: ["name", "address.street"]
- input_fields: Required, it expects an array of fields to include in the mapping result, eg: ["full_name", "full_address"]
- input_schema: Optional, defines the schema of the input_data, this is useful if your state is not a pydantic model, not required if you state is a pydantic model.
- output_schema: Optional, defines the schema of the output_data, this is useful if your output is not a pydantic model, not required if your output model is a pydantic model.
To understand better how and when to use any of these options check the examples folder
Returns:
A runnable, that can be included in the langgraph node
"""
metadata = config.get("metadata", None)
if not metadata:
return ValueError(
"A metadata must be present with at least the configuration for input_fields and output_fields"
)
if not data:
return ValueError("data is required. Invalid or no data was passed")

input_fields = metadata.get("input_fields", None)
if not input_fields:
return ValueError("input_fields not found in the metadata")

output_fields = metadata.get("output_fields", None)
if not output_fields:
return ValueError("output_fields not found in the metadata")

configurable = config.get("configurable", None)
if not configurable:
return ValueError(
"to use io_mapper_node an llm config must be passed via langgraph runnable config"
)

llm = configurable.get("llm", None)
if not llm:
return ValueError(
"to use io_mapper_node an llm config must be passed via langgraph runnable config"
)
input_type = None
output_type = None

if isinstance(data, BaseModel):
input_schema = data.model_json_schema()
else:
# Read the optional fields
input_schema = metadata["input_schema"]
output_schema = metadata["output_schema"]
if not input_schema or not output_schema:
raise ValueError(
"input_schema, and or output_schema are missing from the metadata, for a better accuracy you are required to provide them in this scenario"
)

output_type = _create_type_from_schema(input_schema, output_fields)
input_type = _create_type_from_schema(input_schema, input_fields)

data_to_be_mapped = _extract_nested_fields(data, fields=input_fields)

input = AgentIOMapperInput(
input=ArgumentsDescription(
json_schema=input_type,
),
output=ArgumentsDescription(
json_schema=output_type,
),
data=data_to_be_mapped,
)

iomapper_config = LangGraphIOMapperConfig(llm=llm)
return LangGraphIOMapper(iomapper_config, input).as_runnable()
26 changes: 18 additions & 8 deletions agntcy_iomapper/langgraph/langgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@


class LangGraphIOMapperConfig(AgentIOMapperConfig):
llm: BaseChatModel | str = Field(
default="anthropic:claude-3-5-sonnet-latest",
description="Model to use for translation as LangChain description or model class.",
llm: BaseChatModel | str = (
Field(
...,
description="Model to use for translation as LangChain description or model class.",
),
)


Expand Down Expand Up @@ -67,20 +69,28 @@ async def _ainvoke(


class LangGraphIOMapper:
def __init__(self, config: LangGraphIOMapperConfig):
def __init__(
self,
config: LangGraphIOMapperConfig,
input: LangGraphIOMapperInput | None = None,
):
self._iomapper = _LangGraphAgentIOMapper(config)
self._input = input

async def ainvoke(self, state: dict[str, Any], config: RunnableConfig) -> dict:
response = await self._iomapper.ainvoke(input=state["input"], config=config)
input = self._input if self._input else state["input"]
response = await self._iomapper.ainvoke(input=input, config=config)
if response is not None:
return {"output": response}
return response.data
else:
return {}

def invoke(self, state: dict[str, Any], config: RunnableConfig) -> dict:
response = self._iomapper.invoke(input=state["input"], config=config)
input = self._input if self._input else state["input"]
response = self._iomapper.invoke(input=input, config=config)

if response is not None:
return {"output": response}
return response.data
else:
return {}

Expand Down
Loading