1+ from dataclasses import dataclass , field
12from typing import Any , Final , Mapping , NotRequired
3+ from contextlib import suppress
24
35from django .db .transaction import atomic
46
57from country_workspace .contrib .aurora .client import AuroraClient
68from country_workspace .contrib .aurora .exceptions import TooManyBeneficiaryError
7- from country_workspace .models import AsyncJob , Batch , Household , Individual
9+ from country_workspace .models import AsyncJob , Batch , Household , Individual , MappingProfile
810from country_workspace .utils .config import BatchNameConfig , FailIfAlienConfig
911from country_workspace .utils .fields import clean_field_names
1012from country_workspace .validators .beneficiaries import validate_beneficiaries
@@ -16,66 +18,69 @@ class Config(BatchNameConfig, FailIfAlienConfig):
1618 household_column_prefix : NotRequired [str ]
1719 individuals_column_prefix : str
1820 household_label_column : NotRequired [str ]
21+ mapping_profile_pk : NotRequired [int ]
1922
2023
2124RELATIONSHIP_HEAD : Final [str ] = "HEAD"
2225RELATIONSHIP_FIELDNAME : Final [str ] = "relationship"
2326
2427
25- def import_from_aurora (job : AsyncJob ) -> dict [str , int ]:
26- """Import data from the Aurora system into the database within an atomic transaction.
28+ @dataclass
29+ class AuroraImporter :
30+ """Aurora data importer with mapping profile support."""
2731
28- Args:
29- job (AsyncJob): The job instance containing the configuration and context for data import.
30- Expected keys in `job.config` correspond to the `Config` TypedDict.
32+ job : AsyncJob
33+ cfg : Config
34+ client : AuroraClient = field (default_factory = AuroraClient )
35+ mapping_profile : MappingProfile | None = field (init = False , default = None )
3136
32- Returns:
33- dict[str, int]: Counts of imported records:
34- - "households": Number of households imported (0 if `master_detail` is False or None).
35- - "individuals": Total number of individuals imported.
37+ def __post_init__ (self ) -> None :
38+ self .batch = Batch .objects .create (
39+ name = self .cfg ["batch_name" ],
40+ program = self .job .program ,
41+ country_office = self .job .program .country_office ,
42+ imported_by = self .job .owner ,
43+ source = Batch .BatchSource .AURORA ,
44+ )
45+ if self .cfg .get ("mapping_profile_pk" ):
46+ with suppress (MappingProfile .DoesNotExist ):
47+ self .mapping_profile = MappingProfile .objects .get (id = self .cfg ["mapping_profile_pk" ], is_active = True )
3648
37- Raises :
38- ValueError: If record ID is invalid or missing .
49+ def run_import ( self ) -> dict [ str , int ] :
50+ """Execute the Aurora import process .
3951
40- """
41- with atomic ():
52+ Returns:
53+ dict[str, int]: Counts of imported records:
54+ - "households": Number of households imported.
55+ - "individuals": Number of individuals imported.
56+
57+ """
4258 total = {"households" : 0 , "individuals" : 0 }
4359 records_data = []
44- cfg : Config = job .config
4560
46- batch = Batch .objects .create (
47- name = cfg ["batch_name" ],
48- program = job .program ,
49- country_office = job .program .country_office ,
50- imported_by = job .owner ,
51- source = Batch .BatchSource .AURORA ,
52- )
53-
54- client = AuroraClient ()
55- for record in client .get (f"registration/{ cfg ['registration_reference_pk' ]} /records/" ):
56- try :
57- record_id = int (record ["flatten" ]["id" ])
58- except (ValueError , TypeError , KeyError ):
59- raise ValueError (f"Invalid or missing record ID: { record .get ('flatten' , {}).get ('id' )} " )
60-
61- individuals = create_individuals (batch , record ["flatten" ], cfg )
61+ for record in self .client .get (f"registration/{ self .cfg ['registration_reference_pk' ]} /records/" ):
62+ record_id = _extract_record_id (record )
63+ individuals = create_individuals (self .batch , record ["flatten" ], self .cfg , self .mapping_profile )
6264 total ["individuals" ] += len (individuals )
63- if cfg ["master_detail" ] and individuals and individuals [0 ].household_id :
65+ if self . cfg ["master_detail" ] and individuals and individuals [0 ].household_id :
6466 total ["households" ] += 1
6567 records_data .append ((record_id , individuals ))
6668
67- validate_records (records_data , cfg )
69+ validate_records (records_data , self .cfg )
70+ return total
6871
69- return total
72+
73+ def import_from_aurora (job : AsyncJob ) -> dict [str , int ]:
74+ """Import data from the Aurora system into the database within an atomic transaction."""
75+ with atomic ():
76+ cfg : Config = job .config
77+ importer = AuroraImporter (job = job , cfg = cfg )
78+ return importer .run_import ()
7079
7180
7281def validate_records (records_data : list [tuple [int , list [Individual ]]], cfg : Config ) -> None :
7382 """Validate beneficiaries based on configuration and record data.
7483
75- Args:
76- records_data: List of tuples containing record ID and created individuals.
77- cfg: Configuration for validation and mapping.
78-
7984 Raises:
8085 TooManyBeneficiaryError: If more than one Individual is created when master_detail is False.
8186
@@ -95,14 +100,11 @@ def validate_records(records_data: list[tuple[int, list[Individual]]], cfg: Conf
95100 validate_beneficiaries (cfg , mapping )
96101
97102
98- def create_household (batch : Batch , data : dict [str , Any ], prefix : str ) -> Household :
103+ def create_household (
104+ batch : Batch , data : dict [str , Any ], prefix : str , mapping_profile : MappingProfile | None = None
105+ ) -> Household :
99106 """Create a Household object from the provided data and associate it with a batch.
100107
101- Args:
102- batch (Batch): The batch to which the household will be linked.
103- data (dict[str, Any]): A dictionary containing household-related information.
104- prefix (str): The prefix used to filter and group household-related information.
105-
106108 Returns:
107109 Household: The newly created household instance.
108110
@@ -114,22 +116,21 @@ def create_household(batch: Batch, data: dict[str, Any], prefix: str) -> Househo
114116 count = len (hh_data )
115117 if count > 1 :
116118 raise TooManyBeneficiaryError ("Household" , record_id = data ["id" ], count = count )
117- flex_fields = clean_field_names (next (iter (hh_data .values ()), {}))
119+
120+ raw_fields = clean_field_names (next (iter (hh_data .values ()), {}))
121+ flex_fields = mapping_profile .apply_all_rules (raw_fields ) if mapping_profile else raw_fields
122+
118123 return batch .program .households .create (batch = batch , flex_fields = flex_fields )
119124
120125
121126def create_individuals (
122127 batch : Batch ,
123128 data : dict [str , Any ],
124129 cfg : Config ,
130+ mapping_profile : MappingProfile | None = None ,
125131) -> list [Individual ]:
126132 """Create and associate Individual objects with an optional Household.
127133
128- Args:
129- batch (Batch): The batch to which individuals will be linked.
130- data (dict[str, Any]): A dictionary containing related information.
131- cfg (Config): Configuration dictionary containing various settings for the import process.
132-
133134 Returns:
134135 list[Individual]: A list of successfully created Individual instances.
135136
@@ -140,10 +141,11 @@ def create_individuals(
140141 inds_data = _collect_by_prefix (data , cfg .get ("individuals_column_prefix" ))
141142
142143 if inds_data and cfg ["master_detail" ] and (hh_prefix := cfg .get ("household_column_prefix" )):
143- household = create_household (batch , data , hh_prefix )
144+ household = create_household (batch , data , hh_prefix , mapping_profile )
144145
145146 for ind_data in inds_data .values ():
146- flex_fields = clean_field_names (ind_data )
147+ cleaned_data = clean_field_names (ind_data )
148+ flex_fields = mapping_profile .apply_all_rules (cleaned_data ) if mapping_profile else cleaned_data
147149 if household and (hh_label := cfg .get ("household_label_column" )) and not head_found :
148150 head_found = _update_household_label_from_individual (household , flex_fields , hh_label )
149151 individuals .append (
@@ -157,6 +159,19 @@ def create_individuals(
157159 return batch .program .individuals .bulk_create (individuals , batch_size = 1000 )
158160
159161
162+ def _extract_record_id (record : dict [str , Any ]) -> int :
163+ """Extract and validate record ID from Aurora record.
164+
165+ Raises:
166+ ValueError: If record ID is invalid or missing.
167+
168+ """
169+ try :
170+ return int (record ["flatten" ]["id" ])
171+ except (ValueError , TypeError , KeyError ):
172+ raise ValueError (f"Invalid or missing record ID: { record .get ('flatten' , {}).get ('id' )} " )
173+
174+
160175def _collect_by_prefix (data : dict [str , Any ], prefix : str ) -> dict [str , dict [str , Any ]]:
161176 """Extract and group fields from a dictionary based on a given prefix.
162177
@@ -197,13 +212,6 @@ def _update_household_label_from_individual(
197212) -> bool :
198213 """Update the household's name based on an individual's role and specified name field.
199214
200- Args:
201- household (Household): The household instance to update.
202- ind_data (dict[str, Any]): A dictionary containing the individual's data,
203- including relationship status and potential household name.
204- household_label_column (str): The key in the individual's data that stores
205- the name to assign to the household.
206-
207215 Returns:
208216 bool: True if the household name was updated (individual is head and name provided), False otherwise.
209217
0 commit comments