-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimperative.py
136 lines (115 loc) · 4.66 KB
/
imperative.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0"
"""
The deterministic I/O mapper is a component
designed to translate specific inputs into
corresponding outputs in a predictable and consistent manner.
When configured with a JSONPath definition,
this mapper utilizes the JSONPath query language
to extract data from JSON-formatted input,
transforming it into a structured output based on predefined rules.
The deterministic nature of the mapper ensures that given the same input and
JSONPath configuration, the output will always be the same,
providing reliability and repeatability.
This is particularly useful in scenarios where
consistent data transformation is required.
"""
import json
import logging
from typing import Any, Callable, Union
import jsonschema
from jsonpath_ng.ext import parse
from agntcy_iomapper.base import BaseIOMapper, IOMapperInput, IOMapperOutput
logger = logging.getLogger(__name__)
class ImperativeIOMapper(BaseIOMapper):
field_mapping: dict[str, Union[str, Callable]]
"""A dictionary for where the keys are fields of the output object
and values are JSONPath (strings) representing how the mapping
"""
def __init__(self, field_mapping: dict[str, Union[str, Callable]] | None) -> None:
super().__init__()
self.field_mapping = field_mapping
def invoke(self, input: IOMapperInput) -> IOMapperOutput | None:
if input.data is None:
return None
if self.field_mapping is None:
return IOMapperOutput(data=input.data)
data = self._imperative_map(input)
return IOMapperOutput(data=data)
def ainvoke(self, input: IOMapperInput) -> IOMapperOutput | None:
return self.invoke(input)
def _imperative_map(self, input_definition: IOMapperInput) -> Any:
"""
Converts input data to a desired output type.
This function attempts to convert the provided data into the specified
target type. It performs validation using a JSON schema and raises a
ValidationError if the data does not conform to the expected schema for
the target type.
Parameters:
----------
data : Any
The input data to be converted. This can be of any type.
Returns:
-------
Any
The converted data in the desired output type.
Raises:
------
ValidationError
If the input data does not conform to the expected schema for the
target type.
Notes:
-----
The function assumes that the caller provides a valid `input_schema`.
Unsupported target types should be handled as needed within the function.
"""
data = input_definition.data
input_schema = input_definition.input.json_schema
jsonschema.validate(
instance=data,
schema=input_schema.model_dump(exclude_none=True, mode="json"),
)
mapped_output = {}
for output_field, json_path_or_func in self.field_mapping.items():
if isinstance(json_path_or_func, str):
jsonpath_expr = parse(json_path_or_func)
match = jsonpath_expr.find(data)
expect_value = match[0].value if match else None
elif callable(json_path_or_func):
expect_value = json_path_or_func(data)
else:
raise TypeError(
"Mapping values must be strings (JSONPath) or callables (functions)."
)
self._set_jsonpath(mapped_output, output_field, expect_value)
jsonschema.validate(
instance=mapped_output,
schema=input_definition.output.json_schema.model_dump(
exclude_none=True, mode="json"
),
)
# return a serialized version of the object
return json.dumps(mapped_output)
def _set_jsonpath(
self, data: dict[str, Any], path: str, value: Any
) -> dict[str, Any]:
"""set value for field based on its json path
Args:
data: Data so far
path: the json path
value: the value to set the json path to
Returns:
-----
dict[str,Any]
The mapped filed with the value
"""
copy_data: dict[str, Any] = data
# Split the path into parts and remove the leading root
parts = path.strip("$.").split(".")
# Add value to corresponding path
for part in parts[:-1]:
if part not in copy_data:
copy_data[part] = {}
copy_data = copy_data[part]
copy_data[parts[-1]] = value
return copy_data