1
1
import abc
2
2
3
+ import generic_k8s_webhook .config_parser .operator_parser as op_parser
3
4
from generic_k8s_webhook import jsonpatch_helpers , utils
5
+ from generic_k8s_webhook .config_parser .common import ParsingException
4
6
5
7
6
- class IJsonPatchParser (abc .ABC ):
8
+ class ParserOp (abc .ABC ):
7
9
@abc .abstractmethod
8
- def parse (self , raw_patch : list ) -> list [ jsonpatch_helpers .JsonPatchOperator ] :
10
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
9
11
pass
10
12
13
+ def _parse_path (self , raw_elem : dict , key : str ) -> list [str ]:
14
+ raw_path = utils .must_pop (raw_elem , key , f"Missing key { key } in { raw_elem } " )
15
+ path = utils .convert_dot_string_path_to_list (raw_path )
16
+ if path [0 ] != "" :
17
+ raise ValueError (f"The first element of a path in the patch must be '.', not { path [0 ]} " )
18
+ return path [1 :]
19
+
20
+
21
+ class ParseAdd (ParserOp ):
22
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
23
+ path = self ._parse_path (raw_elem , "path" )
24
+ value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
25
+ return jsonpatch_helpers .JsonPatchAdd (path , value )
26
+
27
+
28
+ class ParseRemove (ParserOp ):
29
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
30
+ path = self ._parse_path (raw_elem , "path" )
31
+ return jsonpatch_helpers .JsonPatchRemove (path )
32
+
33
+
34
+ class ParseReplace (ParserOp ):
35
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
36
+ path = self ._parse_path (raw_elem , "path" )
37
+ value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
38
+ return jsonpatch_helpers .JsonPatchReplace (path , value )
39
+
40
+
41
+ class ParseCopy (ParserOp ):
42
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
43
+ path = self ._parse_path (raw_elem , "path" )
44
+ fromm = self ._parse_path (raw_elem , "from" )
45
+ return jsonpatch_helpers .JsonPatchCopy (path , fromm )
46
+
47
+
48
+ class ParseMove (ParserOp ):
49
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
50
+ path = self ._parse_path (raw_elem , "path" )
51
+ fromm = self ._parse_path (raw_elem , "from" )
52
+ return jsonpatch_helpers .JsonPatchMove (path , fromm )
53
+
54
+
55
+ class ParseTest (ParserOp ):
56
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
57
+ path = self ._parse_path (raw_elem , "path" )
58
+ value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
59
+ return jsonpatch_helpers .JsonPatchTest (path , value )
60
+
61
+
62
+ class ParseExpr (ParserOp ):
63
+ def __init__ (self , meta_op_parser : op_parser .MetaOperatorParser ) -> None :
64
+ self .meta_op_parser = meta_op_parser
65
+
66
+ def parse (self , raw_elem : dict , path_op : str ) -> jsonpatch_helpers .JsonPatchOperator :
67
+ path = self ._parse_path (raw_elem , "path" )
68
+ value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
69
+ operator = self .meta_op_parser .parse (value , f"{ path_op } .value" )
70
+ return jsonpatch_helpers .JsonPatchExpr (path , operator )
71
+
72
+
73
+ class IJsonPatchParser (abc .ABC ):
74
+ def parse (self , raw_patch : list , path_op : str ) -> list [jsonpatch_helpers .JsonPatchOperator ]:
75
+ patch = []
76
+ dict_parse_op = self ._get_dict_parse_op ()
77
+ for i , raw_elem in enumerate (raw_patch ):
78
+ op = utils .must_pop (raw_elem , "op" , f"Missing key 'op' in { raw_elem } " )
79
+
80
+ # Select the appropiate class needed to parse the operation "op"
81
+ if op not in dict_parse_op :
82
+ raise ParsingException (f"Unsupported patch operation { op } on { path_op } " )
83
+ parse_op = dict_parse_op [op ]
84
+ try :
85
+ parsed_elem = parse_op .parse (raw_elem , f"{ path_op } .{ i } " )
86
+ except Exception as e :
87
+ raise ParsingException (f"Error when parsing { path_op } " ) from e
88
+
89
+ # Make sure we have extracted all the keys from "raw_elem"
90
+ if len (raw_elem ) > 0 :
91
+ raise ValueError (f"Unexpected keys { raw_elem } " )
92
+ patch .append (parsed_elem )
93
+
94
+ return patch
95
+
96
+ @abc .abstractmethod
97
+ def _get_dict_parse_op (self ) -> dict [str , ParserOp ]:
98
+ """A dictionary with the classes that can parse the json patch operations
99
+ supported by this JsonPatchParser
100
+ """
101
+
11
102
12
103
class JsonPatchParserV1 (IJsonPatchParser ):
13
104
"""Class used to parse a json patch spec V1. Example:
@@ -19,45 +110,28 @@ class JsonPatchParserV1(IJsonPatchParser):
19
110
```
20
111
"""
21
112
22
- def parse (self , raw_patch : list ) -> list [jsonpatch_helpers .JsonPatchOperator ]:
23
- patch = []
24
- for raw_elem in raw_patch :
25
- op = utils .must_pop (raw_elem , "op" , f"Missing key 'op' in { raw_elem } " )
26
- if op == "add" :
27
- path = self ._parse_path (raw_elem , "path" )
28
- value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
29
- parsed_elem = jsonpatch_helpers .JsonPatchAdd (path , value )
30
- elif op == "remove" :
31
- path = self ._parse_path (raw_elem , "path" )
32
- parsed_elem = jsonpatch_helpers .JsonPatchRemove (path )
33
- elif op == "replace" :
34
- path = self ._parse_path (raw_elem , "path" )
35
- value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
36
- parsed_elem = jsonpatch_helpers .JsonPatchReplace (path , value )
37
- elif op == "copy" :
38
- path = self ._parse_path (raw_elem , "path" )
39
- fromm = self ._parse_path (raw_elem , "from" )
40
- parsed_elem = jsonpatch_helpers .JsonPatchCopy (path , fromm )
41
- elif op == "move" :
42
- path = self ._parse_path (raw_elem , "path" )
43
- fromm = self ._parse_path (raw_elem , "from" )
44
- parsed_elem = jsonpatch_helpers .JsonPatchMove (path , fromm )
45
- elif op == "test" :
46
- path = self ._parse_path (raw_elem , "path" )
47
- value = utils .must_pop (raw_elem , "value" , f"Missing key 'value' in { raw_elem } " )
48
- parsed_elem = jsonpatch_helpers .JsonPatchTest (path , value )
49
- else :
50
- raise ValueError (f"Invalid patch operation { raw_elem ['op' ]} " )
113
+ def _get_dict_parse_op (self ) -> dict [str , ParserOp ]:
114
+ return {
115
+ "add" : ParseAdd (),
116
+ "remove" : ParseRemove (),
117
+ "replace" : ParseReplace (),
118
+ "copy" : ParseCopy (),
119
+ "move" : ParseMove (),
120
+ "test" : ParseTest (),
121
+ }
51
122
52
- if len (raw_elem ) > 0 :
53
- raise ValueError (f"Unexpected keys { raw_elem } " )
54
- patch .append (parsed_elem )
55
123
56
- return patch
124
+ class JsonPatchParserV2 (JsonPatchParserV1 ):
125
+ """Class used to parse a json patch spec V2. It supports the same actions as the
126
+ json patch patch spec V1 plus the ability use expressions to create new values
127
+ """
57
128
58
- def _parse_path (self , raw_elem : dict , key : str ) -> list [str ]:
59
- raw_path = utils .must_pop (raw_elem , key , f"Missing key { key } in { raw_elem } " )
60
- path = utils .convert_dot_string_path_to_list (raw_path )
61
- if path [0 ] != "" :
62
- raise ValueError (f"The first element of a path in the patch must be '.', not { path [0 ]} " )
63
- return path [1 :]
129
+ def __init__ (self , meta_op_parser : op_parser .MetaOperatorParser ) -> None :
130
+ self .meta_op_parser = meta_op_parser
131
+
132
+ def _get_dict_parse_op (self ) -> dict [str , ParserOp ]:
133
+ dict_parse_op_v1 = super ()._get_dict_parse_op ()
134
+ dict_parse_op_v2 = {
135
+ "expr" : ParseExpr (self .meta_op_parser ),
136
+ }
137
+ return {** dict_parse_op_v1 , ** dict_parse_op_v2 }
0 commit comments