Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 145 additions & 39 deletions src/country_workspace/datasources/rdi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import io
from collections.abc import Iterable
from typing import Mapping, Any, TypedDict, cast

from django.db.transaction import atomic
from hope_smart_import.readers import open_xls_multi
Expand All @@ -7,53 +9,157 @@
from country_workspace.utils.fields import clean_field_name

RDI = str | io.BytesIO
Row = Mapping[str, Any]
Sheet = Iterable[Row]

INDIVIDUAL = "individual"
HOUSEHOLD = "household"


class Config(TypedDict):
batch_name: str
household_pk_col: str
master_column_label: str
detail_column_label: str
check_before: bool


class ColumnConfigurationError(Exception):
def __init__(self, column_name: str) -> None:
super().__init__(column_name)
self.column_name = column_name

def __str__(self) -> str:
return f"Column {self.column_name} not found."


class SheetProcessingError(Exception):
def __init__(self, sheet_name: str, row_index: int) -> None:
super().__init__(sheet_name, row_index)
self.sheet_name = sheet_name
self.row_index = row_index

def __str__(self) -> str:
return f"Failed to process {self.sheet_name} sheet at row {self.row_index}"


class MissingHouseholdError(Exception):
def __init__(self, row_index: int, household_key: str) -> None:
super().__init__(row_index, household_key)
self.row_index = row_index
self.household_key = household_key

def __str__(self) -> str:
return f"Missing household {self.household_key} for individual at row {self.row_index}"


class HouseholdValidationError(Exception):
def __init__(self, household_key: int) -> None:
super().__init__(household_key)
self.household_key = household_key

def __str__(self) -> str:
return f"Failed to validate household {self.household_key}."


def normalize_row(row: Row) -> Mapping[str, Any]:
return {clean_field_name(k): v for k, v in row.items()}


def get_value(row: Row, column_name: str) -> Any:
if column_name in row:
return row[column_name]

raise ColumnConfigurationError(column_name)


def filter_rows_with_household_pk(config: Config, *sheets: Sheet) -> Iterable[Sheet]:
household_pk_col = config["household_pk_col"]

def has_household_pk(row: Row) -> bool:
return bool(get_value(row, household_pk_col))

return (filter(has_household_pk, sheet) for sheet in sheets)


def process_households(sheet: Sheet, job: AsyncJob, batch: Batch, config: Config) -> Mapping[int, Household]:
mapping = {}

for i, row in enumerate(sheet, 1):
name = get_value(row, config["master_column_label"])
household_key = get_value(row, config["household_pk_col"])

try:
mapping[household_key] = cast(
Household,
job.program.households.create(
batch=batch,
name=name,
flex_fields=normalize_row(row),
),
)
except Exception as e:
raise SheetProcessingError(HOUSEHOLD, i) from e

return mapping


def process_individuals(
sheet: Sheet, household_mapping: Mapping[int, Household], job: AsyncJob, batch: Batch, config: Config
) -> int:
processed = 0

for i, row in enumerate(sheet, 1):
name = get_value(row, config["detail_column_label"])
household_key = get_value(row, config["household_pk_col"])
household = household_mapping.get(household_key)

if not household:
raise MissingHouseholdError(i, household_key)

Check warning on line 118 in src/country_workspace/datasources/rdi.py

View check run for this annotation

Codecov / codecov/patch

src/country_workspace/datasources/rdi.py#L118

Added line #L118 was not covered by tests

try:
job.program.individuals.create(
batch=batch,
name=name,
household_id=household.pk,
flex_fields=normalize_row(row),
)
except Exception as e:
raise SheetProcessingError(INDIVIDUAL, i) from e

Check warning on line 128 in src/country_workspace/datasources/rdi.py

View check run for this annotation

Codecov / codecov/patch

src/country_workspace/datasources/rdi.py#L127-L128

Added lines #L127 - L128 were not covered by tests

processed += 1

return processed


def validate_households(config: Config, household_mapping: Mapping[int, Household]) -> None:
if config["check_before"]:
for household_key, household in household_mapping.items():
if not household.validate_with_checker():
raise HouseholdValidationError(household_key)


def import_from_rdi(job: AsyncJob) -> dict[str, int]:
ret = {"household": 0, "individual": 0}
hh_ids = {}
with atomic():
batch_name = job.config["batch_name"]
household_pk_col = job.config["household_pk_col"]
master_column_label = job.config["master_column_label"]
detail_column_label = job.config["detail_column_label"]
config: Config = job.config
rdi = job.file
batch = Batch.objects.create(
name=batch_name,
name=config["batch_name"],
program=job.program,
country_office=job.program.country_office,
imported_by=job.owner,
source=Batch.BatchSource.RDI,
)
for sheet_index, sheet_generator in open_xls_multi(rdi, sheets=[0, 1]):
for line, raw_record in enumerate(sheet_generator, 1):
record = {}
for k, v in raw_record.items():
record[clean_field_name(k)] = v
if record[household_pk_col]:
try:
if sheet_index == 0:
hh: "Household" = job.program.households.create(
batch=batch,
name=raw_record[master_column_label],
flex_fields=record,
)
hh_ids[record[household_pk_col]] = hh.pk
ret["household"] += 1
elif sheet_index == 1:
try:
name = record[detail_column_label]
except KeyError:
raise Exception(
"Error in configuration. '%s' is not a valid column name" % detail_column_label,
)
job.program.individuals.create(
batch=batch,
name=name,
household_id=hh_ids[record[household_pk_col]],
flex_fields=record,
)
ret["individual"] += 1
except Exception as e: # noqa: BLE001
raise Exception("Error processing sheet %s line %s: %s" % (1 + sheet_index, line, e))
return ret
(_, household_sheet), (_, individual_sheet) = open_xls_multi(rdi, sheets=[0, 1])

household_sheet, individual_sheet = filter_rows_with_household_pk(config, household_sheet, individual_sheet)

household_mapping = process_households(household_sheet, job, batch, config)
individuals_number = process_individuals(individual_sheet, household_mapping, job, batch, config)

validate_households(config, household_mapping)

return {
"household": len(household_mapping),
"individual": individuals_number,
}
16 changes: 9 additions & 7 deletions src/country_workspace/workspaces/admin/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ...contrib.aurora.forms import ImportAuroraForm
from ...contrib.kobo.forms import ImportKoboForm
from ...contrib.kobo.sync import import_data as import_from_kobo
from ...datasources.rdi import import_from_rdi
from ...datasources.rdi import import_from_rdi, Config as RDIConfig
from ...models import AsyncJob
from ...utils.flex_fields import get_checker_fields
from ..models import CountryProgram
Expand Down Expand Up @@ -266,19 +266,21 @@ def import_data(self, request: HttpRequest, pk: str) -> "HttpResponse":
def import_rdi(self, request: HttpRequest, program: CountryProgram) -> "ImportFileForm | None":
form = ImportFileForm(request.POST, request.FILES, prefix="rdi")
if form.is_valid():
config: RDIConfig = {
"batch_name": form.cleaned_data["batch_name"] or batch_name_default(),
"household_pk_col": form.cleaned_data["pk_column_name"],
"master_column_label": form.cleaned_data["master_column_label"],
"detail_column_label": form.cleaned_data["detail_column_label"],
"check_before": form.cleaned_data["check_before"],
}
job: AsyncJob = AsyncJob.objects.create(
description="RDI importing",
type=AsyncJob.JobType.TASK,
action=fqn(import_from_rdi),
file=request.FILES["rdi-file"],
program=program,
owner=request.user,
config={
"batch_name": form.cleaned_data["batch_name"] or batch_name_default(),
"household_pk_col": form.cleaned_data["pk_column_name"],
"master_column_label": form.cleaned_data["master_column_label"],
"detail_column_label": form.cleaned_data["detail_column_label"],
},
config=config,
)
job.queue()
self.message_user(request, _("Import scheduled"), messages.SUCCESS)
Expand Down
Loading
Loading