-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfields.py
More file actions
133 lines (108 loc) · 4.57 KB
/
fields.py
File metadata and controls
133 lines (108 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# copyright 2016 Camptocamp
# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import json
from datetime import date, datetime
import dateutil
import lxml
from psycopg2.extras import Json as PsycopgJson
from odoo import fields, models
from odoo.tools.func import lazy
class JobSerialized(fields.Json):
"""Provide the storage for job fields stored as json
A base_type must be set, it must be dict, list or tuple.
When the field is not set, the json will be the corresponding
json string ("{}" or "[]").
Support for some custom types has been added to the json decoder/encoder
(see JobEncoder and JobDecoder).
"""
type = "job_serialized"
_column_type = ("jsonb", "jsonb")
_base_type = None
# these are the default values when we convert an empty value
_default_json_mapping = {
dict: "{}",
list: "[]",
tuple: "[]",
models.BaseModel: lambda env: json.dumps(
{"_type": "odoo_recordset", "model": "base", "ids": [], "uid": env.uid}
),
}
def __init__(self, string=fields.SENTINEL, base_type=fields.SENTINEL, **kwargs):
super().__init__(string=string, _base_type=base_type, **kwargs)
def _setup_attrs(self, model, name): # pylint: disable=missing-return
super()._setup_attrs(model, name)
if self._base_type not in self._default_json_mapping:
raise ValueError(f"{self._base_type} is not a supported base type")
def _base_type_default_json(self, env):
default_json = self._default_json_mapping.get(self._base_type)
if not isinstance(default_json, str):
default_json = default_json(env)
return default_json
def convert_to_column(self, value, record, values=None, validate=True):
value = self.convert_to_cache(value, record, validate=validate)
return PsycopgJson(value)
def convert_to_cache(self, value, record, validate=True):
# cache format: json.dumps(value) or None
if isinstance(value, self._base_type):
return json.dumps(value, cls=JobEncoder)
else:
return value or None
def convert_to_record(self, value, record):
default = self._base_type_default_json(record.env)
value = value or default
if not isinstance(value, (str | bytes | bytearray)):
value = json.dumps(value, cls=JobEncoder)
return json.loads(value, cls=JobDecoder, env=record.env)
def convert_to_export(self, value, record):
if not value:
return ""
return json.dumps(value, cls=JobEncoder)
class JobEncoder(json.JSONEncoder):
"""Encode Odoo recordsets so that we can later recompose them"""
def _get_record_context(self, obj):
return obj._job_prepare_context_before_enqueue()
def default(self, obj):
if isinstance(obj, models.BaseModel):
return {
"_type": "odoo_recordset",
"model": obj._name,
"ids": obj.ids,
"uid": obj.env.uid,
"su": obj.env.su,
"context": self._get_record_context(obj),
}
elif isinstance(obj, datetime):
return {"_type": "datetime_isoformat", "value": obj.isoformat()}
elif isinstance(obj, date):
return {"_type": "date_isoformat", "value": obj.isoformat()}
elif isinstance(obj, lxml.etree._Element):
return {
"_type": "etree_element",
"value": lxml.etree.tostring(obj, encoding=str),
}
elif isinstance(obj, lazy):
return obj._value
return json.JSONEncoder.default(self, obj)
class JobDecoder(json.JSONDecoder):
"""Decode json, recomposing recordsets"""
def __init__(self, *args, **kwargs):
env = kwargs.pop("env")
super().__init__(*args, object_hook=self.object_hook, **kwargs)
assert env
self.env = env
def object_hook(self, obj):
if "_type" not in obj:
return obj
type_ = obj["_type"]
if type_ == "odoo_recordset":
model = self.env(user=obj.get("uid"), su=obj.get("su"))[obj["model"]]
if obj.get("context"):
model = model.with_context(**obj.get("context"))
return model.browse(obj["ids"])
elif type_ == "datetime_isoformat":
return dateutil.parser.parse(obj["value"])
elif type_ == "date_isoformat":
return dateutil.parser.parse(obj["value"]).date()
elif type_ == "etree_element":
return lxml.etree.fromstring(obj["value"])
return obj