22from base64 import b64encode
33from collections import defaultdict
44from collections .abc import Iterable , Generator
5- from typing import Any , Mapping , cast
5+ from contextlib import suppress
6+ from typing import Any , Mapping , cast , NotRequired
67
78import openpyxl
89from PIL import Image
1112from openpyxl .drawing .image import Image as RDIImage
1213
1314from country_workspace .contrib .kobo .api .data .helpers import VALUE_FORMAT
14- from country_workspace .models import AsyncJob , Batch , Household
15+ from country_workspace .models import AsyncJob , Batch , Household , MappingProfile
1516from country_workspace .utils .config import BatchNameConfig , FailIfAlienConfig
1617from country_workspace .utils .fields import Record , clean_field_names
1718from country_workspace .validators .beneficiaries import validate_beneficiaries
@@ -31,6 +32,7 @@ class Config(BatchNameConfig, FailIfAlienConfig):
3132 household_pk_col : str
3233 master_column_label : str
3334 detail_column_label : str
35+ mapping_profile_id : NotRequired [int ]
3436
3537
3638class ColumnConfigurationError (Exception ):
@@ -74,7 +76,23 @@ def postprocess_cell(sheets: MultiSheet) -> MultiSheet:
7476 yield sheet_idx , formated_rows
7577
7678
77- def process_households (sheet : Sheet , job : AsyncJob , batch : Batch , config : Config ) -> Mapping [int , Household ]:
79+ def apply_mapping_rules (row : Record , mapping_profile : MappingProfile | None ) -> dict [str , Any ]:
80+ result = clean_field_names (row )
81+ if not mapping_profile :
82+ return result
83+
84+ rules = mapping_profile .get_all_rules ()
85+ for rule in sorted (rules , key = lambda r : r .order ):
86+ if rule .is_active :
87+ mapped_data = rule .apply (result )
88+ result .update (mapped_data )
89+
90+ return result
91+
92+
93+ def process_households (
94+ sheet : Sheet , job : AsyncJob , batch : Batch , config : Config , mapping_profile : MappingProfile | None = None
95+ ) -> Mapping [int , Household ]:
7896 mapping = {}
7997
8098 for i , row in enumerate (sheet , 1 ):
@@ -87,7 +105,7 @@ def process_households(sheet: Sheet, job: AsyncJob, batch: Batch, config: Config
87105 job .program .households .create (
88106 batch = batch ,
89107 name = label ,
90- flex_fields = clean_field_names (row ),
108+ flex_fields = apply_mapping_rules (row , mapping_profile ),
91109 ),
92110 )
93111 except Exception as e :
@@ -104,22 +122,25 @@ def full_name_column(row: Record) -> str | None:
104122
105123
106124def process_individuals (
107- sheet : Sheet , household_mapping : Mapping [int , Household ], job : AsyncJob , batch : Batch , config : Config
125+ sheet : Sheet ,
126+ household_mapping : Mapping [int , Household ],
127+ job : AsyncJob ,
128+ mapping_profile : MappingProfile | None = None ,
108129) -> int :
109130 processed = 0
110131
111132 for i , row in enumerate (sheet , 1 ):
112133 name_column = full_name_column (row )
113134 name = get_value (row , name_column ) if name_column else None
114- household_key = get_value (row , config ["master_column_label" ])
135+ household_key = get_value (row , job . config ["master_column_label" ])
115136 household = household_mapping .get (household_key )
116137
117138 try :
118139 job .program .individuals .create (
119- batch = batch ,
140+ batch = job . batch ,
120141 name = name ,
121142 household_id = household .pk ,
122- flex_fields = clean_field_names (row ),
143+ flex_fields = apply_mapping_rules (row , mapping_profile ),
123144 )
124145 except Exception as e :
125146 raise SheetProcessingError (INDIVIDUAL , i ) from e
@@ -173,6 +194,13 @@ def import_from_rdi(job: AsyncJob) -> dict[str, int]:
173194 with atomic ():
174195 config : Config = job .config
175196 rdi = job .file
197+
198+ config ["mapping_profile_id" ] = 1
199+ mapping_profile = None
200+ if config .get ("mapping_profile_id" ):
201+ with suppress (MappingProfile .DoesNotExist ):
202+ mapping_profile = MappingProfile .objects .get (id = config ["mapping_profile_id" ], is_active = True )
203+
176204 batch = Batch .objects .create (
177205 name = config ["batch_name" ],
178206 program = job .program ,
@@ -183,8 +211,10 @@ def import_from_rdi(job: AsyncJob) -> dict[str, int]:
183211
184212 household_sheet , individual_sheet = read_sheets (config , rdi , 0 , 1 )
185213
186- household_mapping = process_households (household_sheet , job , batch , config )
187- individuals_number = process_individuals (individual_sheet , household_mapping , job , batch , config )
214+ household_mapping = process_households (household_sheet , job , batch , config , mapping_profile )
215+ individuals_number = process_individuals (
216+ individual_sheet , household_mapping , job , batch , config , mapping_profile
217+ )
188218
189219 validate_beneficiaries (config , household_mapping )
190220
0 commit comments