Skip to content

Commit 85cdc76

Browse files
Merge pull request #59 from unicef/bugfix/241567-check-before-options-is-not-working
AB#241567: Validate records if check before is selected
2 parents 22ef82c + 955bf63 commit 85cdc76

File tree

4 files changed

+405
-47
lines changed

4 files changed

+405
-47
lines changed
Lines changed: 145 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import io
2+
from collections.abc import Iterable
3+
from typing import Mapping, Any, TypedDict, cast
24

35
from django.db.transaction import atomic
46
from hope_smart_import.readers import open_xls_multi
@@ -7,53 +9,157 @@
79
from country_workspace.utils.fields import clean_field_name
810

911
RDI = str | io.BytesIO
12+
Row = Mapping[str, Any]
13+
Sheet = Iterable[Row]
14+
15+
INDIVIDUAL = "individual"
16+
HOUSEHOLD = "household"
17+
18+
19+
class Config(TypedDict):
20+
batch_name: str
21+
household_pk_col: str
22+
master_column_label: str
23+
detail_column_label: str
24+
check_before: bool
25+
26+
27+
class ColumnConfigurationError(Exception):
28+
def __init__(self, column_name: str) -> None:
29+
super().__init__(column_name)
30+
self.column_name = column_name
31+
32+
def __str__(self) -> str:
33+
return f"Column {self.column_name} not found."
34+
35+
36+
class SheetProcessingError(Exception):
37+
def __init__(self, sheet_name: str, row_index: int) -> None:
38+
super().__init__(sheet_name, row_index)
39+
self.sheet_name = sheet_name
40+
self.row_index = row_index
41+
42+
def __str__(self) -> str:
43+
return f"Failed to process {self.sheet_name} sheet at row {self.row_index}"
44+
45+
46+
class MissingHouseholdError(Exception):
47+
def __init__(self, row_index: int, household_key: str) -> None:
48+
super().__init__(row_index, household_key)
49+
self.row_index = row_index
50+
self.household_key = household_key
51+
52+
def __str__(self) -> str:
53+
return f"Missing household {self.household_key} for individual at row {self.row_index}"
54+
55+
56+
class HouseholdValidationError(Exception):
57+
def __init__(self, household_key: int) -> None:
58+
super().__init__(household_key)
59+
self.household_key = household_key
60+
61+
def __str__(self) -> str:
62+
return f"Failed to validate household {self.household_key}."
63+
64+
65+
def normalize_row(row: Row) -> Mapping[str, Any]:
66+
return {clean_field_name(k): v for k, v in row.items()}
67+
68+
69+
def get_value(row: Row, column_name: str) -> Any:
70+
if column_name in row:
71+
return row[column_name]
72+
73+
raise ColumnConfigurationError(column_name)
74+
75+
76+
def filter_rows_with_household_pk(config: Config, *sheets: Sheet) -> Iterable[Sheet]:
77+
household_pk_col = config["household_pk_col"]
78+
79+
def has_household_pk(row: Row) -> bool:
80+
return bool(get_value(row, household_pk_col))
81+
82+
return (filter(has_household_pk, sheet) for sheet in sheets)
83+
84+
85+
def process_households(sheet: Sheet, job: AsyncJob, batch: Batch, config: Config) -> Mapping[int, Household]:
86+
mapping = {}
87+
88+
for i, row in enumerate(sheet, 1):
89+
name = get_value(row, config["master_column_label"])
90+
household_key = get_value(row, config["household_pk_col"])
91+
92+
try:
93+
mapping[household_key] = cast(
94+
Household,
95+
job.program.households.create(
96+
batch=batch,
97+
name=name,
98+
flex_fields=normalize_row(row),
99+
),
100+
)
101+
except Exception as e:
102+
raise SheetProcessingError(HOUSEHOLD, i) from e
103+
104+
return mapping
105+
106+
107+
def process_individuals(
108+
sheet: Sheet, household_mapping: Mapping[int, Household], job: AsyncJob, batch: Batch, config: Config
109+
) -> int:
110+
processed = 0
111+
112+
for i, row in enumerate(sheet, 1):
113+
name = get_value(row, config["detail_column_label"])
114+
household_key = get_value(row, config["household_pk_col"])
115+
household = household_mapping.get(household_key)
116+
117+
if not household:
118+
raise MissingHouseholdError(i, household_key)
119+
120+
try:
121+
job.program.individuals.create(
122+
batch=batch,
123+
name=name,
124+
household_id=household.pk,
125+
flex_fields=normalize_row(row),
126+
)
127+
except Exception as e:
128+
raise SheetProcessingError(INDIVIDUAL, i) from e
129+
130+
processed += 1
131+
132+
return processed
133+
134+
135+
def validate_households(config: Config, household_mapping: Mapping[int, Household]) -> None:
136+
if config["check_before"]:
137+
for household_key, household in household_mapping.items():
138+
if not household.validate_with_checker():
139+
raise HouseholdValidationError(household_key)
10140

11141

12142
def import_from_rdi(job: AsyncJob) -> dict[str, int]:
13-
ret = {"household": 0, "individual": 0}
14-
hh_ids = {}
15143
with atomic():
16-
batch_name = job.config["batch_name"]
17-
household_pk_col = job.config["household_pk_col"]
18-
master_column_label = job.config["master_column_label"]
19-
detail_column_label = job.config["detail_column_label"]
144+
config: Config = job.config
20145
rdi = job.file
21146
batch = Batch.objects.create(
22-
name=batch_name,
147+
name=config["batch_name"],
23148
program=job.program,
24149
country_office=job.program.country_office,
25150
imported_by=job.owner,
26151
source=Batch.BatchSource.RDI,
27152
)
28-
for sheet_index, sheet_generator in open_xls_multi(rdi, sheets=[0, 1]):
29-
for line, raw_record in enumerate(sheet_generator, 1):
30-
record = {}
31-
for k, v in raw_record.items():
32-
record[clean_field_name(k)] = v
33-
if record[household_pk_col]:
34-
try:
35-
if sheet_index == 0:
36-
hh: "Household" = job.program.households.create(
37-
batch=batch,
38-
name=raw_record[master_column_label],
39-
flex_fields=record,
40-
)
41-
hh_ids[record[household_pk_col]] = hh.pk
42-
ret["household"] += 1
43-
elif sheet_index == 1:
44-
try:
45-
name = record[detail_column_label]
46-
except KeyError:
47-
raise Exception(
48-
"Error in configuration. '%s' is not a valid column name" % detail_column_label,
49-
)
50-
job.program.individuals.create(
51-
batch=batch,
52-
name=name,
53-
household_id=hh_ids[record[household_pk_col]],
54-
flex_fields=record,
55-
)
56-
ret["individual"] += 1
57-
except Exception as e: # noqa: BLE001
58-
raise Exception("Error processing sheet %s line %s: %s" % (1 + sheet_index, line, e))
59-
return ret
153+
(_, household_sheet), (_, individual_sheet) = open_xls_multi(rdi, sheets=[0, 1])
154+
155+
household_sheet, individual_sheet = filter_rows_with_household_pk(config, household_sheet, individual_sheet)
156+
157+
household_mapping = process_households(household_sheet, job, batch, config)
158+
individuals_number = process_individuals(individual_sheet, household_mapping, job, batch, config)
159+
160+
validate_households(config, household_mapping)
161+
162+
return {
163+
"household": len(household_mapping),
164+
"individual": individuals_number,
165+
}

src/country_workspace/workspaces/admin/program.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from ...contrib.aurora.forms import ImportAuroraForm
2020
from ...contrib.kobo.forms import ImportKoboForm
2121
from ...contrib.kobo.sync import import_data as import_from_kobo
22-
from ...datasources.rdi import import_from_rdi
22+
from ...datasources.rdi import import_from_rdi, Config as RDIConfig
2323
from ...models import AsyncJob
2424
from ...utils.flex_fields import get_checker_fields
2525
from ..models import CountryProgram
@@ -266,19 +266,21 @@ def import_data(self, request: HttpRequest, pk: str) -> "HttpResponse":
266266
def import_rdi(self, request: HttpRequest, program: CountryProgram) -> "ImportFileForm | None":
267267
form = ImportFileForm(request.POST, request.FILES, prefix="rdi")
268268
if form.is_valid():
269+
config: RDIConfig = {
270+
"batch_name": form.cleaned_data["batch_name"] or batch_name_default(),
271+
"household_pk_col": form.cleaned_data["pk_column_name"],
272+
"master_column_label": form.cleaned_data["master_column_label"],
273+
"detail_column_label": form.cleaned_data["detail_column_label"],
274+
"check_before": form.cleaned_data["check_before"],
275+
}
269276
job: AsyncJob = AsyncJob.objects.create(
270277
description="RDI importing",
271278
type=AsyncJob.JobType.TASK,
272279
action=fqn(import_from_rdi),
273280
file=request.FILES["rdi-file"],
274281
program=program,
275282
owner=request.user,
276-
config={
277-
"batch_name": form.cleaned_data["batch_name"] or batch_name_default(),
278-
"household_pk_col": form.cleaned_data["pk_column_name"],
279-
"master_column_label": form.cleaned_data["master_column_label"],
280-
"detail_column_label": form.cleaned_data["detail_column_label"],
281-
},
283+
config=config,
282284
)
283285
job.queue()
284286
self.message_user(request, _("Import scheduled"), messages.SUCCESS)

0 commit comments

Comments
 (0)