Skip to content

Commit 813c287

Browse files
committed
feat(jsonpatch): Implement jsonpatch module for RFC6902 update ops
1 parent ba26d4a commit 813c287

2 files changed

Lines changed: 377 additions & 0 deletions

File tree

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
"""Module implementing functions to support json-patch operations on Python
2+
objects based on RFC6902.
3+
"""
4+
5+
import operator
6+
from collections.abc import MutableMapping, MutableSequence
7+
from functools import reduce
8+
from typing import TYPE_CHECKING, Any, Literal
9+
10+
from pydantic import AliasChoices, BaseModel, Field
11+
12+
type AnyMutable = MutableMapping | MutableSequence
13+
14+
15+
class JSONPatchError(Exception):
16+
"""Exception raised when a JSON patch operation cannot be completed."""
17+
18+
pass
19+
20+
21+
class JSONPatch(BaseModel):
22+
"""Model representing a PATCH operation using RFC6902.
23+
24+
This model will generally be accepted as a ``Sequence[JSONPatch]``.
25+
"""
26+
27+
op: Literal["add", "remove", "replace", "move", "copy", "test"]
28+
path: str = Field(
29+
description="An RFC6901 JSON Pointer", pattern=r"^\/(metadata|spec|configuration|metadata_|data)\/.*$"
30+
)
31+
value: Any | None = None
32+
from_: str | None = Field(
33+
default=None,
34+
pattern=r"^\/(metadata|spec|configuration|metadata_|data)\/.*$",
35+
validation_alias=AliasChoices("from", "from_"),
36+
)
37+
38+
39+
def apply_json_patch[T: MutableMapping](op: JSONPatch, o: T) -> T:
40+
"""Applies a jsonpatch to an object, returning the modified object.
41+
42+
Modifications are made in-place (i.e., the input object is not copied).
43+
44+
Notes
45+
-----
46+
While this JSON Patch operation nominally implements RFC6902, there are
47+
some edge cases inappropriate to the application that are supported by the
48+
RFC but disallowed through lack of support:
49+
50+
- Unsupported: JSON pointer values that refer to object/dict keys that are
51+
numeric, e.g., {"1": "first", "2": "second"}
52+
- Unsupported: JSON pointer values that refer to an entire object, e.g.,
53+
"" -- the JSON Patch must have a root element ("/") per the model.
54+
- Unsupported: JSON pointer values taht refer to a nameless object, e.g.,
55+
"/" -- JSON allows object keys to be the empty string ("") but this is
56+
disallowed by the application.
57+
"""
58+
# The JSON Pointer root value is discarded as the rest of the pointer is
59+
# split into parts
60+
op_path = op.path.split("/")[1:]
61+
62+
# The terminal path part is either the name of a key or an index in a list
63+
# FIXME this assumes that an "integer-string" in the path is always refers
64+
# to a list index, although it could just as well be a key in a dict
65+
# like ``{"1": "first, "2": "second"}`` which is complicated by the
66+
# fact that Python dict keys can be either ints or strs but this is
67+
# not allowed in JSON (i.e., object keys MUST be strings)
68+
# FIXME this doesn't support, e.g., nested lists with multiple index values
69+
# in the path, e.g., ``[["a", "A"], ["b", "B"]]``
70+
target_key_or_index: str | None = op_path.pop()
71+
if target_key_or_index is None:
72+
raise JSONPatchError("JSON Patch operations on empty keys not allowed.")
73+
74+
reference_token: int | str
75+
# the reference token is referring to a an array index if the token is
76+
# numeric or is the single character "-"
77+
if target_key_or_index == "-":
78+
reference_token = target_key_or_index
79+
elif target_key_or_index.isnumeric():
80+
reference_token = int(target_key_or_index)
81+
else:
82+
reference_token = str(target_key_or_index)
83+
84+
# The remaining parts of the path are a pointer to the object needing
85+
# modification, which should reduce to either a dict or a list
86+
try:
87+
op_target: AnyMutable = reduce(operator.getitem, op_path, o)
88+
except KeyError:
89+
raise JSONPatchError(f"Path {op.path} not found in object")
90+
91+
match op:
92+
case JSONPatch(op="add", value=new_value):
93+
if reference_token == "-" and isinstance(op_target, MutableSequence):
94+
# The "-" reference token is unique to the add operation and
95+
# means the next element beyond the end of the current list
96+
op_target.append(new_value)
97+
elif isinstance(reference_token, int) and isinstance(op_target, MutableSequence):
98+
op_target.insert(reference_token, new_value)
99+
elif isinstance(reference_token, str) and isinstance(op_target, MutableMapping):
100+
op_target[reference_token] = new_value
101+
102+
case JSONPatch(op="replace", value=new_value):
103+
# The main difference between replace and add is that replace will
104+
# not create new properties or elements in the target
105+
if reference_token == "-":
106+
raise JSONPatchError("Cannot use reference token `-` with replace operation.")
107+
elif isinstance(op_target, MutableMapping):
108+
try:
109+
assert reference_token in op_target.keys()
110+
except AssertionError:
111+
raise JSONPatchError(f"Cannot replace missing key {reference_token} in object")
112+
elif isinstance(reference_token, int) and isinstance(op_target, MutableSequence):
113+
try:
114+
assert reference_token < len(op_target)
115+
except AssertionError:
116+
raise JSONPatchError(f"Cannot replace missing index {reference_token} in object")
117+
118+
if TYPE_CHECKING:
119+
assert isinstance(op_target, MutableMapping)
120+
op_target[reference_token] = new_value
121+
122+
case JSONPatch(op="remove"):
123+
if isinstance(reference_token, str) and isinstance(op_target, MutableMapping):
124+
if reference_token == "-":
125+
raise JSONPatchError("Removal operations not allowed on `-` reference token")
126+
_ = op_target.pop(reference_token, None)
127+
elif isinstance(reference_token, int):
128+
try:
129+
_ = op_target.pop(reference_token)
130+
except IndexError:
131+
# The index we are meant to remove does not exist, but that
132+
# is not an error (idempotence)
133+
pass
134+
else:
135+
# This should be unreachable
136+
raise ValueError("Reference token in JSON Patch must be int | str")
137+
138+
case JSONPatch(op="move", from_=from_location):
139+
# the move operation is equivalent to a remove(from) + add(target)
140+
if TYPE_CHECKING:
141+
assert from_location is not None
142+
143+
# Handle the from_location with the same logic as the op.path
144+
from_path = from_location.split("/")[1:]
145+
146+
# Is the last element of the from_path an index or a key?
147+
from_target: str | int = from_path.pop()
148+
try:
149+
from_target = int(from_target)
150+
except ValueError:
151+
pass
152+
153+
try:
154+
from_object = reduce(operator.getitem, from_path, o)
155+
value = from_object[from_target]
156+
except (KeyError, IndexError):
157+
raise JSONPatchError(f"Path {from_location} not found in object")
158+
159+
# add the value to the new location
160+
op_target[reference_token] = value # type: ignore[index]
161+
# and remove it from the old
162+
_ = from_object.pop(from_target)
163+
164+
case JSONPatch(op="copy", from_=from_location):
165+
# The copy op is the same as the move op except the original is not
166+
# removed
167+
if TYPE_CHECKING:
168+
assert from_location is not None
169+
170+
# Handle the from_location with the same logic as the op.path
171+
from_path = from_location.split("/")[1:]
172+
173+
# Is the last element of the from_path an index or a key?
174+
from_target = from_path.pop()
175+
try:
176+
from_target = int(from_target)
177+
except ValueError:
178+
pass
179+
180+
try:
181+
from_object = reduce(operator.getitem, from_path, o)
182+
value = from_object[from_target]
183+
except (KeyError, IndexError):
184+
raise JSONPatchError(f"Path {from_location} not found in object")
185+
186+
# add the value to the new location
187+
op_target[reference_token] = value # type: ignore[index]
188+
189+
case JSONPatch(op="test", value=assert_value):
190+
# assert that the patch value is present at the patch path
191+
# The main difference between test and replace is that test does
192+
# not make any modifications after its assertions
193+
if reference_token == "-":
194+
raise JSONPatchError("Cannot use reference token `-` with test operation.")
195+
elif isinstance(op_target, MutableMapping):
196+
try:
197+
assert reference_token in op_target.keys()
198+
except AssertionError:
199+
raise JSONPatchError(
200+
f"Test operation assertion failed: Key {reference_token} does not exist at {op.path}"
201+
)
202+
elif isinstance(reference_token, int) and isinstance(op_target, MutableSequence):
203+
try:
204+
assert reference_token < len(op_target)
205+
except AssertionError:
206+
raise JSONPatchError(
207+
f"Test operation assertion failed: "
208+
f"Index {reference_token} does not exist at {op.path}"
209+
)
210+
211+
if TYPE_CHECKING:
212+
assert isinstance(op_target, MutableMapping)
213+
try:
214+
assert op_target[reference_token] == assert_value
215+
except AssertionError:
216+
raise JSONPatchError(
217+
f"Test operation assertion failed: {op.path} does not match value {assert_value}"
218+
)
219+
220+
case _:
221+
# Model validation should prevent this from ever happening
222+
raise JSONPatchError(f"Unknown JSON Patch operation: {op.op}")
223+
224+
return o

tests/common/test_jsonpatch.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
from typing import Any
2+
3+
import pytest
4+
5+
from lsst.cmservice.common.jsonpatch import JSONPatch, JSONPatchError, apply_json_patch
6+
7+
TARGET_OBJECT: dict[str, Any] = {
8+
"apiVersion": "io.lsst.cmservice/v1",
9+
"spec": {
10+
"one": 1,
11+
"two": 2,
12+
"three": 4,
13+
"a_list": ["a", "b", "c", "e"],
14+
"tag_list": ["yes", "yeah", "yep"],
15+
},
16+
"metadata": {
17+
"owner": "bob_loblaw",
18+
},
19+
}
20+
21+
22+
def test_jsonpatch_add() -> None:
23+
"""Tests the use of an add operation with a JSON Patch."""
24+
global TARGET_OBJECT
25+
26+
# Fail to add a value to an element that does not exist
27+
op = JSONPatch(op="add", path="/spec/b_list/0", value="a")
28+
with pytest.raises(JSONPatchError):
29+
_ = apply_json_patch(op, TARGET_OBJECT)
30+
31+
# Fix the missing "four" property in the spec
32+
op = JSONPatch(op="add", path="/spec/four", value=4)
33+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
34+
assert TARGET_OBJECT["spec"].get("four") == 4
35+
36+
# Insert the missing "d" value in the spec's a_list property
37+
op = JSONPatch(op="add", path="/spec/a_list/3", value="d")
38+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
39+
assert TARGET_OBJECT["spec"].get("a_list")[3] == "d"
40+
assert TARGET_OBJECT["spec"].get("a_list")[4] == "e"
41+
42+
# Append to an existing list using "-"
43+
op = JSONPatch(op="add", path="/spec/a_list/-", value="f")
44+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
45+
assert len(TARGET_OBJECT["spec"]["a_list"]) == 6
46+
assert TARGET_OBJECT["spec"]["a_list"][-1] == "f"
47+
48+
49+
def test_jsonpatch_replace() -> None:
50+
"""Tests the use of a replace operation with a JSON Patch."""
51+
global TARGET_OBJECT
52+
53+
# Fail to replace a value for a missing key
54+
op = JSONPatch(op="replace", path="/spec/five", value=5)
55+
with pytest.raises(JSONPatchError):
56+
_ = apply_json_patch(op, TARGET_OBJECT)
57+
58+
# Fail to replace a value for a missing index
59+
op = JSONPatch(op="replace", path="/spec/a_list/4", value="e")
60+
with pytest.raises(JSONPatchError):
61+
_ = apply_json_patch(op, TARGET_OBJECT)
62+
63+
# Fix the incorrect "three" property in the spec
64+
op = JSONPatch(op="replace", path="/spec/three", value=3)
65+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
66+
assert TARGET_OBJECT["spec"]["three"] == 3
67+
68+
69+
def test_jsonpatch_remove() -> None:
70+
"""Tests the use of a remove operation with a JSON Patch."""
71+
global TARGET_OBJECT
72+
73+
# Remove the first element ("a") of the "a_list" property in the spec
74+
op = JSONPatch(op="remove", path="/spec/a_list/0")
75+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
76+
assert TARGET_OBJECT["spec"]["a_list"][0] == "b"
77+
78+
# Remove the a non-existent index from the same list (not an error)
79+
op = JSONPatch(op="remove", path="/spec/a_list/8")
80+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
81+
assert len(TARGET_OBJECT["spec"]["a_list"]) == 3
82+
83+
# Remove the previously added key "four" element in the spec
84+
op = JSONPatch(op="remove", path="/spec/four")
85+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
86+
assert "four" not in TARGET_OBJECT["spec"].keys()
87+
88+
# Repeat the previous removal (not an error)
89+
op = JSONPatch(op="remove", path="/spec/four")
90+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
91+
92+
93+
def test_jsonpatch_move() -> None:
94+
"""Tests the use of a move operation with a JSON Patch."""
95+
global TARGET_OBJECT
96+
97+
# move the tags list from spec to metadata
98+
op = JSONPatch(op="move", path="/metadata/tag_list", from_="/spec/tag_list")
99+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
100+
assert "tag_list" not in TARGET_OBJECT["spec"].keys()
101+
assert "tag_list" in TARGET_OBJECT["metadata"].keys()
102+
103+
# Fail to move a nonexistent object
104+
op = JSONPatch(op="move", path="/spec/yes_such_list", from_="/spec/no_such_list")
105+
with pytest.raises(JSONPatchError):
106+
_ = apply_json_patch(op, TARGET_OBJECT)
107+
108+
109+
def test_jsonpatch_copy() -> None:
110+
"""Tests the use of a copy operation with a JSON Patch."""
111+
global TARGET_OBJECT
112+
113+
# copy the owner from metadata to spec as the name "pilot"
114+
op = JSONPatch(op="copy", path="/spec/pilot", from_="/metadata/owner")
115+
TARGET_OBJECT = apply_json_patch(op, TARGET_OBJECT)
116+
assert TARGET_OBJECT["spec"]["pilot"] == TARGET_OBJECT["metadata"]["owner"]
117+
118+
# Fail to copy a nonexistent object
119+
op = JSONPatch(op="copy", path="/spec/yes_such_list", from_="/spec/no_such_list")
120+
with pytest.raises(JSONPatchError):
121+
_ = apply_json_patch(op, TARGET_OBJECT)
122+
123+
124+
def test_jsonpatch_test() -> None:
125+
"""Tests the use of a test/assert operation with a JSON Patch."""
126+
global TARGET_OBJECT
127+
128+
# test successful assertion
129+
op = JSONPatch(op="test", path="/metadata/owner", value="bob_loblaw")
130+
_ = apply_json_patch(op, TARGET_OBJECT)
131+
132+
op = JSONPatch(op="test", path="/spec/a_list/0", value="a")
133+
_ = apply_json_patch(op, TARGET_OBJECT)
134+
135+
# test value mismatch
136+
op = JSONPatch(op="test", path="/metadata/owner", value="bob_alice")
137+
with pytest.raises(JSONPatchError):
138+
_ = apply_json_patch(op, TARGET_OBJECT)
139+
140+
# test missing key
141+
op = JSONPatch(op="test", path="/metadata/pilot", value="bob_alice")
142+
with pytest.raises(JSONPatchError):
143+
_ = apply_json_patch(op, TARGET_OBJECT)
144+
145+
# test missing index
146+
op = JSONPatch(op="test", path="/spec/a_list/8", value="bob_alice")
147+
with pytest.raises(JSONPatchError):
148+
_ = apply_json_patch(op, TARGET_OBJECT)
149+
150+
# test bad reference token
151+
op = JSONPatch(op="test", path="/spec/a_list/-", value="bob_alice")
152+
with pytest.raises(JSONPatchError):
153+
_ = apply_json_patch(op, TARGET_OBJECT)

0 commit comments

Comments
 (0)