-
Notifications
You must be signed in to change notification settings - Fork 76
Expand file tree
/
Copy pathmodel.py
More file actions
398 lines (361 loc) · 20 KB
/
model.py
File metadata and controls
398 lines (361 loc) · 20 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
"""Base model module for interfacing with Nautobot in SSoT."""
# pylint: disable=protected-access
# Diffsync relies on underscore-prefixed attributes quite heavily, which is why we disable this here.
from collections import defaultdict
from datetime import datetime
from functools import lru_cache
from diffsync import DiffSyncModel
from diffsync.exceptions import ObjectCrudException, ObjectNotCreated, ObjectNotDeleted, ObjectNotUpdated
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.db.models import ProtectedError, QuerySet
from nautobot.extras.choices import RelationshipTypeChoices
from nautobot.extras.models import Relationship, RelationshipAssociation
from nautobot.extras.models.metadata import ObjectMetadata
from nautobot_ssot.contrib.base import BaseNautobotModel
from nautobot_ssot.contrib.enums import AttributeType
from nautobot_ssot.contrib.types import (
CustomFieldAnnotation,
RelationshipSideEnum,
)
from nautobot_ssot.utils.diffsync import DiffSyncModelUtilityMixin
class NautobotModel(DiffSyncModel, DiffSyncModelUtilityMixin, BaseNautobotModel):
"""
Base model for any diffsync models interfacing with Nautobot through the ORM.
This provides the `create`, `update` and `delete` operations in a generic fashion, meaning you don't have to
implement them yourself.
In order to accomplish this, the `_model` field has to be set on subclasses to map them to the corresponding ORM
model class.
"""
@classmethod
def _get_queryset(cls) -> QuerySet:
"""Get the queryset used to load the models data from Nautobot."""
available_fields = {field.name for field in cls._model._meta.get_fields()}
parameter_names = [
parameter for parameter in cls.get_synced_attributes() if parameter.split("__")[0] in available_fields
]
# Here we identify any foreign keys (i.e. fields with '__' in them) so that we can load them directly in the
# first query if this function hasn't been overridden.
prefetch_related_parameters = [
"__".join(parameter.split("__")[:-1]) for parameter in parameter_names if "__" in parameter
]
qs = cls.get_queryset()
return qs.prefetch_related(*prefetch_related_parameters)
@classmethod
def get_queryset(cls) -> QuerySet:
"""Get the queryset used to load the models data from Nautobot."""
return cls._model.objects.all()
@classmethod
def _check_field(cls, name):
"""Check whether the given field name is defined on the diffsync (pydantic) model."""
if name not in cls.model_fields: # pylint: disable=unsupported-membership-test
raise ObjectCrudException(f"Field {name} is not defined on the model.")
def get_from_db(self):
"""Get the ORM object for this diffsync object from the database using the primary key."""
try:
return self.adapter.get_from_orm_cache({"pk": self.pk}, self._model)
except self._model.DoesNotExist as error:
raise ObjectCrudException(f"No such {self._model._meta.verbose_name} instance with PK {self.pk}") from error
def update(self, attrs):
"""Update the ORM object corresponding to this diffsync object."""
try:
obj = self.get_from_db()
self._update_obj_with_parameters(obj, attrs, self.adapter)
if getattr(self.adapter, "metadata_type", None):
self._update_obj_metadata(obj, self.adapter)
except ObjectCrudException as error:
raise ObjectNotUpdated(error) from error
return super().update(attrs)
def delete(self):
"""Delete the ORM object corresponding to this diffsync object."""
try:
obj = self.get_from_db()
except ObjectCrudException as error:
raise ObjectNotDeleted(error) from error
try:
obj.delete()
except ProtectedError as error:
raise ObjectNotDeleted(f"Couldn't delete {obj} as it is referenced by another object") from error
return super().delete()
@classmethod
def create(cls, adapter, ids, attrs):
"""Create the ORM object corresponding to this diffsync object."""
# Only diffsync cares about the distinction between ids and attrs, we do not.
# Therefore, we merge the two into parameters.
parameters = ids.copy()
parameters.update(attrs)
# This is in fact callable, because it is a model
obj = cls._model() # pylint: disable=not-callable
try:
cls._update_obj_with_parameters(obj, parameters, adapter)
except ObjectCrudException as error:
raise ObjectNotCreated(error) from error
if getattr(adapter, "metadata_type", None):
try:
cls._update_obj_metadata(obj, adapter)
except ObjectCrudException as error:
raise ObjectNotCreated(error) from error
return super().create(adapter, ids, attrs)
@classmethod
@lru_cache
def get_attr_enum(cls, attr_name: str) -> AttributeType:
"""Return `AttributeType` enum value for type hinted attribute."""
annotation = cls.get_attr_annotation(attr_name)
if isinstance(annotation, CustomFieldAnnotation):
return AttributeType.CUSTOM_FIELD
if "__" in attr_name:
if annotation:
return AttributeType.CUSTOM_FOREIGN_KEY
return AttributeType.FOREIGN_KEY
if annotation:
return AttributeType.CUSTOM_N_TO_MANY_RELATIONSHIP
django_field = cls._model._meta.get_field(attr_name)
if django_field.many_to_many or django_field.one_to_many:
return AttributeType.N_TO_MANY_RELATIONSHIP
return AttributeType.STANDARD
@classmethod
def _handle_single_field(cls, field, obj, value, relationship_fields, adapter): # pylint: disable=too-many-arguments,too-many-locals, too-many-branches
"""Set a single field on a Django object to a given value, or, for relationship fields, prepare setting.
:param field: The name of the field to set.
:param obj: The Django ORM object to set the field on.
:param value: The value to set the field to.
:param relationship_fields: Helper dictionary containing information on relationship fields.
This is mutated over the course of this function.
:param adapter: The related diffsync adapter used for looking up things in the cache.
"""
cls._check_field(field)
annotation = cls.get_attr_annotation(field)
match cls.get_attr_enum(field):
case AttributeType.STANDARD:
setattr(obj, field, value)
case AttributeType.FOREIGN_KEY:
related_model, lookup = field.split("__", maxsplit=1)
django_field = cls._model._meta.get_field(related_model)
relationship_fields["foreign_keys"][related_model][lookup] = value
# Add a special key to the dictionary to point to the related model's class
relationship_fields["foreign_keys"][related_model]["_model_class"] = django_field.related_model
case AttributeType.N_TO_MANY_RELATIONSHIP:
django_field = cls._model._meta.get_field(field)
try:
relationship_fields["many_to_many_fields"][field] = [
adapter.get_from_orm_cache(parameters, django_field.related_model) for parameters in value
]
except django_field.related_model.DoesNotExist as error:
raise ObjectCrudException(
f"Unable to populate many to many relationship '{django_field.name}' with parameters {value}, at least one related object not found."
) from error
except MultipleObjectsReturned as error:
raise ObjectCrudException(
f"Unable to populate many to many relationship '{django_field.name}' with parameters {value}, at least one related object found twice."
) from error
case AttributeType.CUSTOM_FIELD:
obj.cf[annotation.key] = value
case AttributeType.CUSTOM_FOREIGN_KEY:
related_model, lookup = field.split("__", maxsplit=1)
relationship_fields["custom_relationship_foreign_keys"][related_model][lookup] = value
relationship_fields["custom_relationship_foreign_keys"][related_model]["_annotation"] = annotation
case AttributeType.CUSTOM_N_TO_MANY_RELATIONSHIP:
relationship = adapter.get_from_orm_cache({"label": annotation.name}, Relationship)
if annotation.side == RelationshipSideEnum.DESTINATION:
related_object_content_type = relationship.source_type
else:
related_object_content_type = relationship.destination_type
related_model_class = related_object_content_type.model_class()
if (
relationship.type == RelationshipTypeChoices.TYPE_ONE_TO_MANY
and annotation.side == RelationshipSideEnum.DESTINATION
):
relationship_fields["custom_relationship_foreign_keys"][field] = {
**value,
"_annotation": annotation,
}
else:
relationship_fields["custom_relationship_many_to_many_fields"][field] = {
"annotation": annotation,
"objects": [
adapter.get_from_orm_cache(parameters, related_model_class) for parameters in value
],
}
@classmethod
def _update_obj_with_parameters(cls, obj, parameters, adapter):
"""Update a given Nautobot ORM object with the given parameters."""
# TODO: Use Dataclasses instead of dictionaries for structured data storage and tracking.
relationship_fields = {
# Example: {"group": {"name": "Group Name", "_model_class": TenantGroup}}
"foreign_keys": defaultdict(dict),
# Example: {"tags": [Tag-1, Tag-2]}
"many_to_many_fields": defaultdict(list),
# Example: TODO
"custom_relationship_foreign_keys": defaultdict(dict),
# Example: TODO
"custom_relationship_many_to_many_fields": defaultdict(dict),
}
for field, value in parameters.items():
cls._handle_single_field(field, obj, value, relationship_fields, adapter)
# Set foreign keys
cls._lookup_and_set_foreign_keys(relationship_fields["foreign_keys"], obj, adapter)
# Save the object to the database
try:
obj.validated_save()
except (ValidationError, ValueError) as error:
raise ObjectCrudException(
f"Validated save failed for Django object:\n{error}\nParameters: {parameters}"
) from error
# Handle relationship association creation. This needs to be after object creation, because relationship
# association objects rely on both sides already existing.
cls._lookup_and_set_custom_relationship_foreign_keys(
relationship_fields["custom_relationship_foreign_keys"], obj, adapter
)
cls._set_custom_relationship_to_many_fields(
relationship_fields["custom_relationship_many_to_many_fields"], obj, adapter
)
# Set many-to-many fields after saving.
cls._set_many_to_many_fields(relationship_fields["many_to_many_fields"], obj)
@classmethod
def _set_custom_relationship_to_many_fields(cls, custom_relationship_many_to_many_fields, obj, adapter):
for _, dictionary in custom_relationship_many_to_many_fields.items():
annotation = dictionary.pop("annotation")
objects = dictionary.pop("objects")
# TODO: Deduplicate this code
relationship = adapter.get_from_orm_cache({"label": annotation.name}, Relationship)
parameters = {
"relationship": relationship,
"source_type": relationship.source_type,
"destination_type": relationship.destination_type,
}
associations = []
if annotation.side == RelationshipSideEnum.SOURCE:
parameters["source_id"] = obj.id
for object_to_relate in objects:
association_parameters = parameters.copy()
association_parameters["destination_id"] = object_to_relate.id
try:
association = adapter.get_from_orm_cache(association_parameters, RelationshipAssociation)
except RelationshipAssociation.DoesNotExist:
association = RelationshipAssociation(**parameters, destination_id=object_to_relate.id)
association.validated_save()
associations.append(association)
else:
parameters["destination_id"] = obj.id
for object_to_relate in objects:
association_parameters = parameters.copy()
association_parameters["source_id"] = object_to_relate.id
try:
association = adapter.get_from_orm_cache(association_parameters, RelationshipAssociation)
except RelationshipAssociation.DoesNotExist:
association = RelationshipAssociation(**parameters, source_id=object_to_relate.id)
association.validated_save()
associations.append(association)
# Now we need to clean up any associations that we're not `get_or_create`'d in order to achieve
# declarativeness.
# TODO: This may benefit from an ORM cache with `filter` capabilities, but I guess the gain in most cases
# would be fairly minor.
for existing_association in RelationshipAssociation.objects.filter(**parameters):
if existing_association not in associations:
existing_association.delete()
@classmethod
def _set_many_to_many_fields(cls, many_to_many_fields, obj):
"""
Given a dictionary, set many-to-many relationships on an object.
This will always use `set`, thereby replacing any elements that are already part of the relationship.
Example dictionary:
{
"relationship_field_name": [<Object 1>, <Object 2>],
...
}
"""
for field_name, related_objects in many_to_many_fields.items():
many_to_many_field = getattr(obj, field_name)
many_to_many_field.set(related_objects)
@classmethod
def _lookup_and_set_custom_relationship_foreign_keys(cls, custom_relationship_foreign_keys, obj, adapter):
for related_model_dict in custom_relationship_foreign_keys.values():
annotation = related_model_dict.pop("_annotation")
# TODO: Deduplicate this code
try:
relationship = adapter.get_from_orm_cache({"label": annotation.name}, Relationship)
except Relationship.DoesNotExist as error:
raise ObjectCrudException(f"No such relationship with label '{annotation.name}'") from error
parameters = {
"relationship": relationship,
"source_type": relationship.source_type,
"destination_type": relationship.destination_type,
}
if annotation.side == RelationshipSideEnum.SOURCE:
parameters["source_id"] = obj.id
related_model_class = relationship.destination_type.model_class()
try:
destination_object = adapter.get_from_orm_cache(related_model_dict, related_model_class)
except related_model_class.DoesNotExist as error:
raise ObjectCrudException(
f"Couldn't resolve custom relationship {relationship.name}, no such {related_model_class._meta.verbose_name} object with parameters {related_model_dict}."
) from error
except related_model_class.MultipleObjectsReturned as error:
raise ObjectCrudException(
f"Couldn't resolve custom relationship {relationship.name}, multiple {related_model_class._meta.verbose_name} objects with parameters {related_model_dict}."
) from error
RelationshipAssociation.objects.update_or_create(
**parameters,
defaults={"destination_id": destination_object.id},
)
else:
parameters["destination_id"] = obj.id
source_object = adapter.get_from_orm_cache(related_model_dict, relationship.source_type.model_class())
RelationshipAssociation.objects.update_or_create(**parameters, defaults={"source_id": source_object.id})
@classmethod
def _lookup_and_set_foreign_keys(cls, foreign_keys, obj, adapter):
"""
Given a list of foreign keys as dictionaries, look up and set foreign keys on an object.
Dictionary should be in the form of:
[
{"field_1": "value_1", "field_2": "value_2"},
...
]
where each item in the list corresponds to the parameters needed to uniquely identify a foreign key object.
"""
for field_name, related_model_dict in foreign_keys.items():
related_model = related_model_dict.pop("_model_class")
# Generic foreign keys will not have this dictionary field. As such, we need to retrieve the appropriate
# model class through other means.
if not related_model:
try:
app_label = related_model_dict.pop("app_label")
model = related_model_dict.pop("model")
except KeyError as error:
raise ValueError(
f"Missing annotation for '{field_name}__app_label' or '{field_name}__model - this is required"
f"for generic foreign keys."
) from error
try:
related_model_content_type = adapter.get_from_orm_cache(
{"app_label": app_label, "model": model}, ContentType
)
related_model = related_model_content_type.model_class()
except ContentType.DoesNotExist as error:
raise ObjectCrudException(f"Unknown content type '{app_label}.{model}'.") from error
# Set the foreign key to 'None' when none of the fields are set to anything
if not any(related_model_dict.values()):
setattr(obj, field_name, None)
continue
try:
related_object = adapter.get_from_orm_cache(related_model_dict, related_model)
except related_model.DoesNotExist as error:
raise ObjectCrudException(
f"Couldn't find '{related_model._meta.verbose_name}' instance behind '{field_name}' with: {related_model_dict}."
) from error
except MultipleObjectsReturned as error:
raise ObjectCrudException(
f"Found multiple instances for {field_name} with: {related_model_dict}"
) from error
setattr(obj, field_name, related_object)
@classmethod
def _update_obj_metadata(cls, obj, adapter):
"""Update a given Nautobot ORM object with the required object metadata."""
# Get the scope_fields from the DiffSync Model
obj_metadata_scope_fields = adapter.metadata_scope_fields[cls]
obj_metadata = obj.associated_object_metadata.filter(
metadata_type=adapter.metadata_type
).first() or ObjectMetadata(metadata_type=adapter.metadata_type, assigned_object=obj)
obj_metadata.scoped_fields = obj_metadata_scope_fields
obj_metadata.value = datetime.now()
obj_metadata.validated_save()