-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjsonpatch.py
More file actions
252 lines (211 loc) · 10.7 KB
/
jsonpatch.py
File metadata and controls
252 lines (211 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""Module implementing functions to support json-patch operations on Python
objects based on RFC6902.
"""
import operator
from collections.abc import Mapping, MutableMapping, MutableSequence
from functools import reduce
from typing import TYPE_CHECKING, Any, Literal
from pydantic import AliasChoices, BaseModel, Field
type AnyMutable = MutableMapping | MutableSequence
class JSONPatchError(Exception):
"""Exception raised when a JSON patch operation cannot be completed."""
pass
class JSONPatch(BaseModel):
"""Model representing a PATCH operation using RFC6902.
This model will generally be accepted as a ``Sequence[JSONPatch]``.
"""
op: Literal["add", "remove", "replace", "move", "copy", "test"]
path: str = Field(
description="An RFC6901 JSON Pointer", pattern=r"^\/(metadata|spec|configuration|metadata_|data)\/.*$"
)
value: Any | None = None
from_: str | None = Field(
default=None,
pattern=r"^\/(metadata|spec|configuration|metadata_|data)\/.*$",
validation_alias=AliasChoices("from", "from_"),
)
def apply_json_patch[T: MutableMapping](op: JSONPatch, o: T) -> T:
"""Applies a jsonpatch to an object, returning the modified object.
Modifications are made in-place (i.e., the input object is not copied).
Notes
-----
While this JSON Patch operation nominally implements RFC6902, there are
some edge cases inappropriate to the application that are supported by the
RFC but disallowed through lack of support:
- Unsupported: JSON pointer values that refer to object/dict keys that are
numeric, e.g., {"1": "first", "2": "second"}
- Unsupported: JSON pointer values that refer to an entire object, e.g.,
"" -- the JSON Patch must have a root element ("/") per the model.
- Unsupported: JSON pointer values that refer to a nameless object, e.g.,
"/" -- JSON allows object keys to be the empty string ("") but this is
disallowed by the application.
"""
# The JSON Pointer root value is discarded as the rest of the pointer is
# split into parts
op_path = op.path.split("/")[1:]
# The terminal path part is either the name of a key or an index in a list
# FIXME this assumes that an "integer-string" in the path is always refers
# to a list index, although it could just as well be a key in a dict
# like ``{"1": "first, "2": "second"}`` which is complicated by the
# fact that Python dict keys can be either ints or strs but this is
# not allowed in JSON (i.e., object keys MUST be strings)
# FIXME this doesn't support, e.g., nested lists with multiple index values
# in the path, e.g., ``[["a", "A"], ["b", "B"]]``
target_key_or_index: str | None = op_path.pop()
if target_key_or_index is None:
raise JSONPatchError("JSON Patch operations on empty keys not allowed.")
reference_token: int | str
# the reference token is referring to a an array index if the token is
# numeric or is the single character "-"
if target_key_or_index == "-":
reference_token = target_key_or_index
elif target_key_or_index.isnumeric():
reference_token = int(target_key_or_index)
else:
reference_token = str(target_key_or_index)
# The remaining parts of the path are a pointer to the object needing
# modification, which should reduce to either a dict or a list
try:
op_target: AnyMutable = reduce(operator.getitem, op_path, o)
except KeyError:
raise JSONPatchError(f"Path {op.path} not found in object")
match op:
case JSONPatch(op="add", value=new_value):
if reference_token == "-" and isinstance(op_target, MutableSequence):
# The "-" reference token is unique to the add operation and
# means the next element beyond the end of the current list
op_target.append(new_value)
elif isinstance(reference_token, int) and isinstance(op_target, MutableSequence):
op_target.insert(reference_token, new_value)
elif isinstance(reference_token, str) and isinstance(op_target, MutableMapping):
op_target[reference_token] = new_value
case JSONPatch(op="replace", value=new_value):
# The main difference between replace and add is that replace will
# not create new properties or elements in the target
if reference_token == "-":
raise JSONPatchError("Cannot use reference token `-` with replace operation.")
elif isinstance(op_target, MutableMapping):
try:
assert reference_token in op_target.keys()
except AssertionError:
raise JSONPatchError(f"Cannot replace missing key {reference_token} in object")
elif isinstance(reference_token, int) and isinstance(op_target, MutableSequence):
try:
assert reference_token < len(op_target)
except AssertionError:
raise JSONPatchError(f"Cannot replace missing index {reference_token} in object")
if TYPE_CHECKING:
assert isinstance(op_target, MutableMapping)
op_target[reference_token] = new_value
case JSONPatch(op="remove"):
if isinstance(reference_token, str) and isinstance(op_target, MutableMapping):
if reference_token == "-":
raise JSONPatchError("Removal operations not allowed on `-` reference token")
_ = op_target.pop(reference_token, None)
elif isinstance(reference_token, int):
try:
_ = op_target.pop(reference_token)
except IndexError:
# The index we are meant to remove does not exist, but that
# is not an error (idempotence)
pass
else:
# This should be unreachable
raise ValueError("Reference token in JSON Patch must be int | str")
case JSONPatch(op="move", from_=from_location):
# the move operation is equivalent to a remove(from) + add(target)
if TYPE_CHECKING:
assert from_location is not None
# Handle the from_location with the same logic as the op.path
from_path = from_location.split("/")[1:]
# Is the last element of the from_path an index or a key?
from_target: str | int = from_path.pop()
try:
from_target = int(from_target)
except ValueError:
pass
try:
from_object = reduce(operator.getitem, from_path, o)
value = from_object[from_target]
except (KeyError, IndexError):
raise JSONPatchError(f"Path {from_location} not found in object")
# add the value to the new location
op_target[reference_token] = value # type: ignore[index]
# and remove it from the old
_ = from_object.pop(from_target)
case JSONPatch(op="copy", from_=from_location):
# The copy op is the same as the move op except the original is not
# removed
if TYPE_CHECKING:
assert from_location is not None
# Handle the from_location with the same logic as the op.path
from_path = from_location.split("/")[1:]
# Is the last element of the from_path an index or a key?
from_target = from_path.pop()
try:
from_target = int(from_target)
except ValueError:
pass
try:
from_object = reduce(operator.getitem, from_path, o)
value = from_object[from_target]
except (KeyError, IndexError):
raise JSONPatchError(f"Path {from_location} not found in object")
# add the value to the new location
op_target[reference_token] = value # type: ignore[index]
case JSONPatch(op="test", value=assert_value):
# assert that the patch value is present at the patch path
# The main difference between test and replace is that test does
# not make any modifications after its assertions
if reference_token == "-":
raise JSONPatchError("Cannot use reference token `-` with test operation.")
elif isinstance(op_target, MutableMapping):
try:
assert reference_token in op_target.keys()
except AssertionError:
raise JSONPatchError(
f"Test operation assertion failed: Key {reference_token} does not exist at {op.path}"
)
elif isinstance(reference_token, int) and isinstance(op_target, MutableSequence):
try:
assert reference_token < len(op_target)
except AssertionError:
raise JSONPatchError(
f"Test operation assertion failed: "
f"Index {reference_token} does not exist at {op.path}"
)
if TYPE_CHECKING:
assert isinstance(op_target, MutableMapping)
try:
assert op_target[reference_token] == assert_value
except AssertionError:
raise JSONPatchError(
f"Test operation assertion failed: {op.path} does not match value {assert_value}"
)
case _:
# Model validation should prevent this from ever happening
raise JSONPatchError(f"Unknown JSON Patch operation: {op.op}")
return o
def apply_json_merge[T: MutableMapping](patch: Any, o: T) -> T:
"""Applies a patch to a mapping object as per the RFC7396 JSON Merge Patch.
Notably, this operation may only target a ``MutableMapping`` as an analogue
of a JSON object. This means that any keyed value in a Mapping may be
replaced, added, or removed by a JSON Merge. This is not appropriate for
patches that need to perform more tactical updates, such as modifying
elements of a ``Sequence``.
This function does not allow setting a field value in the target to `None`;
instead, any `None` value in a patch is an instruction to remove that
field from the target completely.
This function differs from the RFC in the following ways: it will not
replace the entire target object with a new mapping (i.e., the target must
be a Mapping).
"""
if isinstance(patch, Mapping):
for k, v in patch.items():
if v is None:
_ = o.pop(k, None)
else:
o[k] = apply_json_merge(v, o.get(k, {}))
return o
else:
return patch