-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathfields.py
More file actions
190 lines (144 loc) · 5.39 KB
/
fields.py
File metadata and controls
190 lines (144 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from datetime import datetime
import pytz
from django.conf import settings
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from rest_framework.fields import empty
from rest_framework.utils import html
from src.accounts.enums import (
UserDateFormat,
)
from src.accounts.models import Account
from src.processes.models.templates.template import Template
from src.generics.messages import (
MSG_GE_0002,
MSG_GE_0007,
MSG_GE_0020,
)
class AccountQstMixin:
def _get_account(self) -> Account:
account = self.context.get('account')
if account is None:
request = self.context.get('request')
if request:
account = request.user.account
if not account:
raise Exception(
'Account not provided for AccountQstMixin',
)
return account
class TemplateQstMixin:
def _get_template(self) -> Template:
template = self.context.get('template')
if not template:
raise Exception(
'Template not provided for TemplateQstMixin',
)
return template
class AccountPrimaryKeyRelatedField(
AccountQstMixin,
serializers.PrimaryKeyRelatedField,
):
def get_queryset(self):
queryset = super().get_queryset()
account = self._get_account()
if queryset is None:
raise Exception(MSG_GE_0002)
return queryset.filter(account=account)
def to_internal_value(self, data):
# When create_or_update_related passes already-validated data
# (e.g. from a parent serializer's validated_data), the value
# may already be a model instance rather than a raw PK.
# In that case, verify it belongs to the correct account and
# return it directly, avoiding a redundant DB lookup.
queryset = self.get_queryset()
if queryset is not None and isinstance(data, queryset.model):
if not queryset.filter(pk=data.pk).exists():
self.fail('does_not_exist', pk_value=data.pk)
return data
return super().to_internal_value(data)
class RelatedApiNameField(
AccountQstMixin,
TemplateQstMixin,
serializers.SlugRelatedField,
):
def __init__(self, **kwargs):
super().__init__(slug_field='api_name', **kwargs)
def get_queryset(self):
queryset = super().get_queryset()
account = self._get_account()
template = self._get_template()
if queryset is None:
raise Exception(MSG_GE_0002)
return queryset.filter(account=account, template=template)
def to_internal_value(self, data):
"""Convert api_name -> to object before saving """
return super().to_internal_value(data)
class AnyField(serializers.Field):
def to_internal_value(self, data):
return data
def to_representation(self, value):
return value
class RelatedListField(serializers.ListField):
def to_representation(self, objects):
""" List of objects -> List of objects ids """
return [
self.child.to_representation(item.id)
for item in objects.all()
]
class RelatedApiNameListField(serializers.ListField):
child = serializers.CharField()
def to_representation(self, data):
""" List of objects -> List of objects api_name's """
if hasattr(data, 'all'):
return [field.api_name for field in data.all()]
return [field.api_name for field in data if hasattr(field, 'api_name')]
class CommaSeparatedListField(serializers.ListField):
def get_value(self, dictionary):
if (
self.field_name not in dictionary
and getattr(self.root, 'partial', False)
):
return empty
if html.is_html_input(dictionary):
val = dictionary.get(self.field_name, '')
# Split result by comma
val = val.split(',') if val else []
if len(val) > 0:
return val
return html.parse_html_list(
dictionary,
prefix=self.field_name,
default=empty,
)
return dictionary.get(self.field_name, empty)
class TimeStampField(serializers.DateTimeField):
def to_representation(self, value):
if not value:
return None
return value.timestamp()
def to_internal_value(self, value: float):
if isinstance(value, str):
try:
value = float(value)
except ValueError as ex:
raise ValidationError(detail=MSG_GE_0007) from ex
elif not isinstance(value, int) and not isinstance(value, float):
raise ValidationError(detail=MSG_GE_0007)
tz = pytz.timezone(settings.TIME_ZONE)
try:
return datetime.fromtimestamp(value, tz=tz)
except (OverflowError, OSError, ValueError) as ex:
raise ValidationError(detail=MSG_GE_0020) from ex
class DateFormatField(serializers.ChoiceField):
def __init__(self, **kwargs):
super().__init__(
choices=UserDateFormat.API_CHOICES,
**kwargs,
)
def to_internal_value(self, data):
value = super().to_internal_value(data)
return UserDateFormat.MAP_TO_PYTHON[value]
def to_representation(self, value):
value = str(value)
return UserDateFormat.MAP_TO_API[value]