Skip to content

Commit d8ce1bb

Browse files
committed
feat: add ability to imperatively defining the mapping between input ouput
1 parent 0b79b0a commit d8ce1bb

12 files changed

+675
-27
lines changed

Diff for: README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ Communication between agents is not possible if there are discrepancies between
1515

1616
Ensuring that agents are semantically compatible, i.e., the output of the one agent contains the information needed
1717
by later agents, is an problem of composition or planning in the application. This project, the IO Mapper Agent,
18-
addresses level 2 and 3 compatibility. It is a component, implemented as an agent, that can make use of an LLM
18+
addresses level 2 and 3 compatibility. It is a component, implemented as an agent, that can make use of an LLM
1919
to transform the output of one agent to become compatible to the input of another agent. Note that this may mean
2020
many different things, for example:
2121

22-
* JSON structure transcoding: A JSON dictionary needs to be remapped into another JSON dictionary
23-
* Text summarisation: A text needs to be summarised or some information needs to be removed
24-
* Text translation: A text needs to be translated from one language to another
25-
* Text manipulation: Part of the information of one text needs to be reformulated into another text
26-
* Any combination of the above
22+
- JSON structure transcoding: A JSON dictionary needs to be remapped into another JSON dictionary
23+
- Text summarisation: A text needs to be summarised or some information needs to be removed
24+
- Text translation: A text needs to be translated from one language to another
25+
- Text manipulation: Part of the information of one text needs to be reformulated into another text
26+
- Any combination of the above
2727

2828
The IO mapper Agent can be fed the schema definitions of inputs and outputs as defined by the [Agent Connect Protocol](https://github.com/agntcy/acp-spec).
2929

Diff for: agntcy_iomapper/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@
1212
IOMapperConfig,
1313
IOModelArgs,
1414
)
15+
16+
from .imperative import ImperativeIOMapper
17+
18+
___all__ = [ "ImperativeIOMapper", "AgentIOMapper", "IOMapperOutput", "IOMapperInput"]

Diff for: agntcy_iomapper/base.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
22
# SPDX-License-Identifier: Apache-2.0
3-
from abc import abstractmethod, ABC
4-
from pydantic import BaseModel, model_validator, Field
5-
from typing import Any
3+
from abc import ABC, abstractmethod
4+
from typing import Any, TypedDict
5+
66
from openapi_pydantic import Schema
7+
from pydantic import BaseModel, Field, model_validator
78
from typing_extensions import Self
8-
from typing import TypedDict
99

1010

1111
class ArgumentsDescription(BaseModel):

Diff for: agntcy_iomapper/imperative.py

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
2+
# SPDX-License-Identifier: Apache-2.0"
3+
"""
4+
The deterministic I/O mapper is a component
5+
designed to translate specific inputs into
6+
corresponding outputs in a predictable and consistent manner.
7+
When configured with a JSONPath definition,
8+
this mapper utilizes the JSONPath query language
9+
to extract data from JSON-formatted input,
10+
transforming it into a structured output based on predefined rules.
11+
The deterministic nature of the mapper ensures that given the same input and
12+
JSONPath configuration, the output will always be the same,
13+
providing reliability and repeatability.
14+
This is particularly useful in scenarios where
15+
consistent data transformation is required.
16+
"""
17+
18+
import json
19+
import logging
20+
from typing import Any, Callable, Dict, Union
21+
22+
import jsonschema
23+
from jsonpath_ng.ext import parse
24+
25+
from agntcy_iomapper.base import BaseIOMapper, IOMapperInput, IOMapperOutput
26+
27+
logger = logging.getLogger(__name__)
28+
29+
30+
class ImperativeIOMapper(BaseIOMapper):
31+
field_mapping: Dict[str, Union[str, Callable]]
32+
"""A dictionary for where the keys are fields of the output object
33+
and values are JSONPath (strings) representing how the mapping
34+
"""
35+
36+
def __init__(self, field_mapping: Dict[str, Union[str, Callable]]) -> None:
37+
super().__init__()
38+
self.field_mapping = field_mapping
39+
40+
def invoke(self, input: IOMapperInput) -> IOMapperOutput | None:
41+
if input.data is None:
42+
return None
43+
if self.field_mapping is None:
44+
return IOMapperOutput(data=input.data)
45+
46+
data = self._imperative_map(input)
47+
return IOMapperOutput(data=data)
48+
49+
def ainvoke(self, input: IOMapperInput) -> IOMapperOutput | None:
50+
return self.invoke(input)
51+
52+
def _imperative_map(self, input_definition: IOMapperInput) -> Any:
53+
"""
54+
Converts input data to a desired output type.
55+
56+
This function attempts to convert the provided data into the specified
57+
target type. It performs validation using a JSON schema and raises a
58+
ValidationError if the data does not conform to the expected schema for
59+
the target type.
60+
61+
Parameters:
62+
----------
63+
data : Any
64+
The input data to be converted. This can be of any type.
65+
Returns:
66+
-------
67+
Any
68+
The converted data in the desired output type.
69+
Raises:
70+
------
71+
ValidationError
72+
If the input data does not conform to the expected schema for the
73+
target type.
74+
Notes:
75+
-----
76+
The function assumes that the caller provides a valid `input_schema`.
77+
Unsupported target types should be handled as needed within the function.
78+
"""
79+
data = input_definition.data
80+
input_schema = input_definition.input.json_schema
81+
82+
jsonschema.validate(
83+
instance=data,
84+
schema=input_schema.model_dump(exclude_none=True, mode="json"),
85+
)
86+
87+
mapped_output = {}
88+
89+
for output_field, json_path_or_func in self.field_mapping.items():
90+
if isinstance(json_path_or_func, str):
91+
jsonpath_expr = parse(json_path_or_func)
92+
match = jsonpath_expr.find(data)
93+
expect_value = match[0].value if match else None
94+
elif callable(json_path_or_func):
95+
expect_value = json_path_or_func(data)
96+
else:
97+
raise TypeError(
98+
"Mapping values must be strings (JSONPath) or callables (functions)."
99+
)
100+
101+
self._set_jsonpath(mapped_output, output_field, expect_value)
102+
jsonschema.validate(
103+
instance=mapped_output,
104+
schema=input_definition.output.json_schema.model_dump(
105+
exclude_none=True, mode="json"
106+
),
107+
)
108+
# return a serialized version of the object
109+
return json.dumps(mapped_output)
110+
111+
def _set_jsonpath(
112+
self, data: dict[str, Any], path: str, value: Any
113+
) -> dict[str, Any]:
114+
"""set value for field based on its json path
115+
Args:
116+
data: Data so far
117+
path: the json path
118+
value: the value to set the json path to
119+
Returns:
120+
-----
121+
dict[str,Any]
122+
The mapped filed with the value
123+
"""
124+
copy_data: dict[str, Any] = data
125+
# Split the path into parts and remove the leading root
126+
parts = path.strip("$.").split(".")
127+
# Add value to corresponding path
128+
for part in parts[:-1]:
129+
if part not in copy_data:
130+
copy_data[part] = {}
131+
132+
copy_data = copy_data[part]
133+
134+
copy_data[parts[-1]] = value
135+
136+
return copy_data

Diff for: examples/Makefile

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
.PHONY: run_imperative_example
3+
4+
EXAMPLES ?= .
5+
6+
run_imperative_example:
7+
python ./imperative.py

Diff for: examples/__init__.py

Whitespace-only changes.

Diff for: examples/imperative.py

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import json
4+
import logging
5+
from datetime import datetime
6+
7+
from langchain_core.language_models import FakeListChatModel
8+
from langchain_core.messages import HumanMessage
9+
from pydantic import BaseModel
10+
11+
from agntcy_iomapper import ImperativeIOMapper
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class ProfessorAgent:
17+
"""
18+
This agent mission is to test it's students knowledges
19+
"""
20+
21+
predefined_questions = [
22+
"What is the capital of France?",
23+
"Who wrote 'To Kill a Mockingbird'?",
24+
"What is the largest planet in our solar system?",
25+
"What is the chemical symbol for gold?",
26+
"Who painted the Mona Lisa?",
27+
]
28+
29+
def __init__(self) -> None:
30+
self.model = FakeListChatModel(responses=self.predefined_questions)
31+
32+
def ask_question(self) -> str:
33+
response = self.model.invoke([HumanMessage(content="Generate a question")])
34+
return str(response.content)
35+
36+
37+
class InputQuiz(BaseModel):
38+
prof_question: str
39+
due_date: str
40+
41+
42+
class StudentAgent:
43+
"""
44+
This agent mission is to answer questions
45+
"""
46+
47+
predefined_answers = [
48+
"The capital of France is Paris.",
49+
"Harper Lee wrote 'To Kill a Mockingbird'.",
50+
"The largest planet in our solar system is Jupiter.",
51+
"The chemical symbol for gold is Au.",
52+
"Leonardo da Vinci painted the Mona Lisa.",
53+
]
54+
55+
def __init__(self) -> None:
56+
self.model = FakeListChatModel(responses=self.predefined_answers)
57+
58+
def answer(self, quiz) -> str:
59+
# q = InputQuiz.parse_raw(quiz)
60+
# print(f"here is the quiz {q}")
61+
response = self.model.invoke([HumanMessage(content=quiz.prof_question)])
62+
return str(response.content)
63+
64+
65+
class MultiAgentApp:
66+
@staticmethod
67+
def run_app():
68+
agent_prof = ProfessorAgent()
69+
agent_student = StudentAgent()
70+
71+
output_prof = agent_prof.ask_question()
72+
73+
prof_agent_output_schema = {"question": {"type": "string"}}
74+
student_agent_schema = {
75+
"quiz": {
76+
"type": "object",
77+
"properties": {
78+
"prof_question": {"type": "string"},
79+
"due_date": {"type": "string"},
80+
},
81+
}
82+
}
83+
84+
mapping_object = {
85+
"prof_question": "$.question",
86+
"due_date": lambda _: datetime.now().strftime("%x"),
87+
}
88+
89+
imerative_mapp = ImperativeIOMapper(
90+
input_schema=prof_agent_output_schema,
91+
output_schema=student_agent_schema,
92+
field_mapping=mapping_object,
93+
)
94+
95+
print(f"professors question was {output_prof}")
96+
97+
mapping_result = imerative_mapp.invoke({"question": output_prof})
98+
99+
print(f"the mapping_result was {mapping_result}")
100+
101+
response = agent_student.answer(InputQuiz(**(json.loads(mapping_result))))
102+
103+
print(f"student response was {response}")
104+
# map data between agents
105+
106+
107+
def run():
108+
MultiAgentApp.run_app()

Diff for: examples/requirements.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
langchain-core==0.3.35
2+
langgraph==0.2.72
3+
pydantic==2.10.6

0 commit comments

Comments
 (0)