33# pylint: disable=protected-access
44# Diffsync relies on underscore-prefixed attributes quite heavily, which is why we disable this here.
55
6- import warnings
7- from typing import Dict , Type , get_args
6+ import re
7+ from typing import Dict , Type
88
99import pydantic
10- from diffsync import DiffSync
10+ from diffsync import DiffSync , DiffSyncModel
1111from diffsync .exceptions import ObjectCrudException
1212from django .contrib .contenttypes .models import ContentType
1313from django .db .models import Model
1414from nautobot .extras .choices import RelationshipTypeChoices
15- from nautobot .extras .models import Relationship , RelationshipAssociation
15+ from nautobot .extras .models import Relationship
1616from nautobot .extras .models .metadata import MetadataType
1717from typing_extensions import get_type_hints
1818
2222 RelationshipSideEnum ,
2323)
2424from nautobot_ssot .utils .cache import ORMCache
25- from nautobot_ssot .utils .orm import load_typed_dict , orm_attribute_lookup
25+ from nautobot_ssot .utils .orm import (
26+ get_custom_relationship_associations ,
27+ load_typed_dict ,
28+ orm_attribute_lookup ,
29+ )
30+ from nautobot_ssot .utils .typing import get_inner_type
2631
2732
2833class NautobotAdapter (DiffSync ):
@@ -167,40 +172,28 @@ def _get_diffsync_class(self, model_name):
167172 return diffsync_model
168173
169174 def _handle_custom_relationship_to_many_relationship (
170- self , database_object , diffsync_model , parameter_name , annotation
175+ self ,
176+ database_object : Model ,
177+ diffsync_model : DiffSyncModel ,
178+ parameter_name : str ,
179+ annotation : CustomRelationshipAnnotation ,
171180 ):
172181 # Introspect type annotations to deduce which fields are of interest
173182 # for this many-to-many relationship.
174- diffsync_field_type = get_type_hints (diffsync_model )[parameter_name ]
175- inner_type = get_args (diffsync_field_type )[0 ]
176- related_objects_list = []
183+ inner_type = get_inner_type (diffsync_model , parameter_name )
177184 # TODO: Allow for filtering, i.e. not taking into account all the objects behind the relationship.
178- relationship = self .get_from_orm_cache ({"label" : annotation .name }, Relationship )
179- relationship_association_parameters = self ._construct_relationship_association_parameters (
180- annotation , database_object
181- )
182- relationship_associations = RelationshipAssociation .objects .filter (** relationship_association_parameters )
183-
184- field_name = ""
185- field_name += "source" if annotation .side == RelationshipSideEnum .DESTINATION else "destination"
186- field_name += "_"
187- field_name += (
188- relationship .source_type .app_label .lower ()
189- if annotation .side == RelationshipSideEnum .DESTINATION
190- else relationship .destination_type .app_label .lower ()
191- )
192- field_name += "_"
193- field_name += (
194- relationship .source_type .model .lower ()
195- if annotation .side == RelationshipSideEnum .DESTINATION
196- else relationship .destination_type .model .lower ()
185+ relationship : Relationship = self .get_from_orm_cache ({"label" : annotation .name }, Relationship )
186+ relationship_associations , _ = get_custom_relationship_associations (
187+ relationship = relationship ,
188+ db_obj = database_object ,
189+ relationship_side = annotation .side ,
197190 )
198191
192+ related_objects_list = []
199193 for association in relationship_associations :
200194 related_object = getattr (
201195 association , "source" if annotation .side == RelationshipSideEnum .DESTINATION else "destination"
202196 )
203- # dictionary_representation = self._handle_typed_dict(inner_type, related_object)
204197 dictionary_representation = load_typed_dict (inner_type , related_object )
205198 # Only use those where there is a single field defined, all 'None's will not help us.
206199 if any (dictionary_representation .values ()):
@@ -223,33 +216,6 @@ def _handle_custom_relationship_to_many_relationship(
223216
224217 return related_objects_list
225218
226- @classmethod
227- def _handle_typed_dict (cls , inner_type , related_object ):
228- """Handle a typed dict for many to many relationships.
229-
230- TODO: Deprecated and to be removed in future version. Use `nautobot_ssot.utils.orm.load_typed_dict` instead.
231- """
232- warnings .warn (
233- "`_handle_typed_dict` is deprecated and will be removed in a future version. "
234- "Use `nautobot_ssot.utils.orm.load_typed_dict` instead." ,
235- DeprecationWarning ,
236- stacklevel = 2 ,
237- )
238- return load_typed_dict (inner_type , related_object )
239-
240- def _construct_relationship_association_parameters (self , annotation , database_object ):
241- relationship = self .get_from_orm_cache ({"label" : annotation .name }, Relationship )
242- relationship_association_parameters = {
243- "relationship" : relationship ,
244- "source_type" : relationship .source_type ,
245- "destination_type" : relationship .destination_type ,
246- }
247- if annotation .side == RelationshipSideEnum .SOURCE :
248- relationship_association_parameters ["source_id" ] = database_object .id
249- else :
250- relationship_association_parameters ["destination_id" ] = database_object .id
251- return relationship_association_parameters
252-
253219 def _handle_to_many_relationship (self , database_object , diffsync_model , parameter_name ):
254220 """Handle a single one- or many-to-many relationship field.
255221
@@ -301,11 +267,10 @@ class NautobotInterface(NautobotModel):
301267 """
302268 # Introspect type annotations to deduce which fields are of interest
303269 # for this many-to-many relationship.
304- inner_type = get_args ( get_type_hints ( diffsync_model )[ parameter_name ])[ 0 ]
270+ inner_type = get_inner_type ( diffsync_model , parameter_name )
305271 related_objects_list = []
306272 # TODO: Allow for filtering, i.e. not taking into account all the objects behind the relationship.
307273 for related_object in getattr (database_object , parameter_name ).all ():
308- # dictionary_representation = self._handle_typed_dict(inner_type, related_object)
309274 dictionary_representation = load_typed_dict (inner_type , related_object )
310275 # Only use those where there is a single field defined, all 'None's will not help us.
311276 if any (dictionary_representation .values ()):
@@ -316,36 +281,28 @@ def _handle_custom_relationship_foreign_key(
316281 self , database_object , parameter_name : str , annotation : CustomRelationshipAnnotation
317282 ):
318283 """Handle a single custom relationship foreign key field."""
319- relationship_association_parameters = self ._construct_relationship_association_parameters (
320- annotation , database_object
284+ relationship_associations , association_count = get_custom_relationship_associations (
285+ relationship = self .cache .get_from_orm (Relationship , {"label" : annotation .name }),
286+ db_obj = database_object ,
287+ relationship_side = annotation .side ,
321288 )
322289
323- relationship_association = RelationshipAssociation .objects .filter (** relationship_association_parameters )
324- amount_of_relationship_associations = relationship_association .count ()
325- if amount_of_relationship_associations == 0 :
290+ if association_count == 0 :
326291 return None
327- if amount_of_relationship_associations == 1 :
328- association = relationship_association . first ()
329- related_object = getattr (
330- association , "source" if annotation . side == RelationshipSideEnum . DESTINATION else "destination "
292+ if association_count > 1 :
293+ self . job . logger . warning (
294+ f"Foreign key ( { database_object . __name__ } . { parameter_name } ) "
295+ "custom relationship matched two associations - this shouldn't happen. "
331296 )
332- # Discard the first part as there is no actual field on the model corresponding to that part.
333- _ , * lookups = parameter_name .split ("__" )
334- for lookup in lookups [:- 1 ]:
335- related_object = getattr (related_object , lookup )
336- return getattr (related_object , lookups [- 1 ])
337- raise ValueError ("Foreign key custom relationship matched two associations - this shouldn't happen." )
338297
339- @staticmethod
340- def _handle_foreign_key (database_object , parameter_name ):
341- """Handle a single foreign key field."""
342- warnings .warn (
343- "`_handle_foreign_key` is deprecated and will be removed in a future version. "
344- "Use `nautobot_ssot.utils.orm.orm_attribute_lookup` instead." ,
345- DeprecationWarning ,
346- stacklevel = 2 ,
298+ return orm_attribute_lookup (
299+ getattr (
300+ relationship_associations .first (),
301+ "source" if annotation .side == RelationshipSideEnum .DESTINATION else "destination" ,
302+ ),
303+ # Discard the first part of the paramater name as it references the initial related object
304+ re .sub ("^(.*?)__" , "" , parameter_name ),
347305 )
348- return orm_attribute_lookup (database_object , parameter_name )
349306
350307 def get_or_create_metadatatype (self ):
351308 """Retrieve or create a MetadataType object to track the last sync time of this SSoT job."""
0 commit comments