Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions src/country_workspace/contrib/aurora/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from typing import Any
from typing import Any, Mapping

from django.db.transaction import atomic

from country_workspace.contrib.aurora.client import AuroraClient
from country_workspace.models import AsyncJob, Batch, Household, Individual
from country_workspace.utils.fields import clean_field_name, uppercase_field_value
from country_workspace.utils.config import BatchNameConfig, FailIfAlienConfig
from country_workspace.utils.fields import uppercase_field_value, RecordPreprocessor, create_json_record_preprocessor


class Config(BatchNameConfig, FailIfAlienConfig):
registration_reference_pk: str | None
household_column_prefix: str
individuals_column_prefix: str
household_label_column: str


def import_from_aurora(job: AsyncJob) -> dict[str, int]:
Expand All @@ -25,39 +33,48 @@ def import_from_aurora(job: AsyncJob) -> dict[str, int]:
- "individuals": The total number of individuals imported.

"""
config: Config = job.config
total_hh = total_ind = 0
batch = Batch.objects.create(
name=job.config["batch_name"],
name=config["batch_name"],
program=job.program,
country_office=job.program.country_office,
imported_by=job.owner,
source=Batch.BatchSource.AURORA,
)
client = AuroraClient()
individual_preprocessor = create_json_record_preprocessor(config, job.program.individual_checker)
household_preprocessor = create_json_record_preprocessor(config, job.program.household_checker)
with atomic():
for record in client.get(f"registration/{job.config['registration_reference_pk']}/records/"):
inds_data = _collect_by_prefix(record["flatten"], job.config.get("individuals_column_prefix"))
for record in client.get(f"registration/{config['registration_reference_pk']}/records/"):
inds_data = _collect_by_prefix(record["flatten"], config.get("individuals_column_prefix"))
if inds_data:
hh = create_household(batch, record["flatten"], job.config.get("household_column_prefix"))
hh = create_household(
batch, record["flatten"], config.get("household_column_prefix"), household_preprocessor
)
total_hh += 1
total_ind += len(
create_individuals(
household=hh,
data=inds_data,
household_label_column=job.config.get("household_label_column"),
household_label_column=config.get("household_label_column"),
preprocess_record=individual_preprocessor,
)
)
return {"households": total_hh, "individuals": total_ind}


def create_household(batch: Batch, data: dict[str, Any], prefix: str) -> Household:
def create_household(
batch: Batch, data: dict[str, Any], prefix: str, preprocess_record: RecordPreprocessor
) -> Household:
"""
Create a Household object from the provided data and associate it with a batch.

Args:
batch (Batch): The batch to which the household will be linked.
data (dict[str, Any]): A dictionary containing household-related information.
prefix (str): The prefix used to filter and group household-related information.
preprocess_record (RecordPreprocessor): The function normalizing field names and checking if they are valid.

Returns:
Household: The newly created household instance.
Expand All @@ -69,16 +86,20 @@ def create_household(batch: Batch, data: dict[str, Any], prefix: str) -> Househo
flex_fields = _collect_by_prefix(data, prefix)
if len(flex_fields) > 1:
raise ValueError("Multiple households found")
return batch.program.households.create(batch=batch, flex_fields=flex_fields)
flex_fields = next(iter(flex_fields.values()), {})
return batch.program.households.create(batch=batch, flex_fields=preprocess_record(flex_fields))


def create_individuals(household: Household, data: dict[str, Any], household_label_column: str) -> list[Individual]:
def create_individuals(
household: Household, data: dict[str, Any], household_label_column: str, preprocess_record: RecordPreprocessor
) -> list[Individual]:
"""Create and associate Individual objects with a given Household.

Args:
household (Household): The household to which the individuals will be linked.
data (dict[str, Any]): A dictionary mapping indices to individual details.
household_label_column (str): The key in the individual data used to determine the household label.
preprocess_record (RecordPreprocessor): The function normalizing field names and checking if they are valid.

Returns:
list[Individual]: A list of successfully created Individual instances.
Expand All @@ -87,7 +108,8 @@ def create_individuals(household: Household, data: dict[str, Any], household_lab
individuals = []
head_found = False

for individual in data.values():
for raw_individual in data.values():
individual = preprocess_record(raw_individual)
if not head_found:
head_found = _update_household_label_from_individual(household, individual, household_label_column)
individuals.append(
Expand Down Expand Up @@ -126,13 +148,12 @@ def _collect_by_prefix(data: dict[str, Any], prefix: str) -> dict[str, dict[str,
for k, v in data.items():
if (stripped := k.removeprefix(prefix)) != k:
index, field = stripped.split("_", 1)
field_clean = clean_field_name(field)
result.setdefault(index, {})[field_clean] = uppercase_field_value(field_clean, v)
result.setdefault(index, {})[field] = uppercase_field_value(field, v)
return result


def _update_household_label_from_individual(
household: Household, individual: dict[str, Any], household_label_column: str
household: Household, individual: Mapping[str, Any], household_label_column: str
) -> bool:
"""Update the household's name based on an individual's role and specified name field.

Expand All @@ -147,7 +168,7 @@ def _update_household_label_from_individual(
bool: True if the household name was updated (individual is head and name provided), False otherwise.

"""
is_head = any(individual.get(k) == "HEAD" for k in individual if k.startswith("relationship"))
is_head = any(individual.get(k, "").upper() == "HEAD" for k in individual if k.startswith("relationship"))
name = individual.get(household_label_column)
if is_head and name:
household.name = name
Expand Down
55 changes: 35 additions & 20 deletions src/country_workspace/contrib/kobo/sync.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
from typing import Any, TypedDict, cast

from constance import config
from constance import config as constance_config
from django.core.cache import cache

from country_workspace.contrib.kobo.api.client.main import Client
from country_workspace.contrib.kobo.api.data.asset import Asset
from country_workspace.contrib.kobo.api.data.submission import Submission
from country_workspace.contrib.kobo.models import KoboSubmission
from country_workspace.models import AsyncJob, Batch, Household, Individual
from country_workspace.utils.fields import clean_field_name
from country_workspace.models import AsyncJob, Batch, Household, Individual, Program
from country_workspace.utils.config import FailIfAlienConfig, BatchNameConfig
from country_workspace.utils.fields import create_json_record_preprocessor, RecordPreprocessor


class Config(BatchNameConfig, FailIfAlienConfig):
project_id: str
individual_records_field: str


def make_client(country_code: str | None) -> Client:
token = config.KOBO_MASTER_API_TOKEN or config.KOBO_API_TOKEN
project_view_id = config.KOBO_PROJECT_VIEW_ID if config.KOBO_MASTER_API_TOKEN else None
token = constance_config.KOBO_MASTER_API_TOKEN or constance_config.KOBO_API_TOKEN
project_view_id = constance_config.KOBO_PROJECT_VIEW_ID if constance_config.KOBO_MASTER_API_TOKEN else None
return Client(
base_url=config.KOBO_KF_URL,
base_url=constance_config.KOBO_KF_URL,
token=token,
country_code=country_code,
project_view_id=project_view_id,
Expand All @@ -27,30 +33,35 @@ def extract_household_data(submission: Submission, individual_records_field: str


def create_individuals(
batch: Batch, household: Household, submission: Submission, individual_records_field: str
batch: Batch, household: Household, submission: Submission, config: Config, preprocess_record: RecordPreprocessor
) -> int:
individuals = []
for raw_individual in submission.get(individual_records_field, []):
individual = {key.lstrip(f"{individual_records_field}/"): value for key, value in raw_individual.items()}
for raw_individual in submission.get(config["individual_records_field"], []):
individual = {
key.replace(f"{config['individual_records_field']}/", ""): value for key, value in raw_individual.items()
}
fullname = next((key for key in individual if key.startswith("full_name")), None)
individuals.append(
Individual(
batch=batch,
household=household,
name=individual.get(fullname, ""),
flex_fields={clean_field_name(key): value for key, value in individual.items()},
flex_fields=preprocess_record(individual),
),
)
household.program.individuals.bulk_create(individuals)
return len(individuals)


def create_household(batch: Batch, submission: Submission, individual_records_field: str) -> Household:
household_fields = extract_household_data(submission, individual_records_field)
def create_household(
batch: Batch, submission: Submission, config: Config, preprocess_record: RecordPreprocessor
) -> Household:
household_fields = extract_household_data(submission, config["individual_records_field"])
return cast(
Household,
batch.program.households.create(
batch=batch, flex_fields={clean_field_name(key): value for key, value in household_fields.items()}
batch=batch,
flex_fields=preprocess_record(household_fields),
),
)

Expand All @@ -63,40 +74,44 @@ class ImportResult(TypedDict):
individuals: int


def import_asset(batch: Batch, asset: Asset, individual_records_field: str) -> ImportResult:
def import_asset(batch: Batch, asset: Asset, config: Config, program: Program) -> ImportResult:
household_counter = 0
individual_counter = 0

individual_preprocessor = create_json_record_preprocessor(config, program.individual_checker)
household_preprocessor = create_json_record_preprocessor(config, program.household_checker)

with cache.lock(ASSET_CACHE_KEY.format(asset_id=asset.uid)):
submission_ids = set(KoboSubmission.objects.filter(asset_uid=asset.uid).values_list("submission_id", flat=True))
for submission in asset.submissions:
if submission.id in submission_ids:
continue
household = create_household(batch, submission, individual_records_field)
household = create_household(batch, submission, config, household_preprocessor)
household_counter += 1
individual_counter += create_individuals(batch, household, submission, individual_records_field)
individual_counter += create_individuals(batch, household, submission, config, individual_preprocessor)

return ImportResult(households=household_counter, individuals=individual_counter)


def import_data(job: AsyncJob) -> ImportResult:
config: Config = job.config

batch = Batch.objects.create(
name=job.config["batch_name"],
name=config["batch_name"],
program=job.program,
country_office=job.program.country_office,
imported_by=job.owner,
source=Batch.BatchSource.KOBO,
)
individual_records_field = job.config["individual_records_field"]
client = make_client(job.program.country_office.kobo_country_code)

household_counter = 0
individual_counter = 0

for asset in client.assets:
# TODO: fetch specific asset
if job.config["project_id"] == asset.uid:
import_result = import_asset(batch, asset, individual_records_field)
if config["project_id"] == asset.uid:
import_result = import_asset(batch, asset, config, job.program)
household_counter += import_result["households"]
individual_counter += import_result["individuals"]

Expand Down
31 changes: 17 additions & 14 deletions src/country_workspace/datasources/rdi.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import io
from collections.abc import Iterable
from typing import Mapping, Any, TypedDict, cast
from typing import Mapping, Any, cast

from django.db.transaction import atomic
from hope_smart_import.readers import open_xls_multi

from country_workspace.models import AsyncJob, Batch, Household
from country_workspace.utils.fields import clean_field_name
from country_workspace.utils.config import FailIfAlienConfig, BatchNameConfig
from country_workspace.utils.fields import create_json_record_preprocessor, Record

RDI = str | io.BytesIO
Row = Mapping[str, Any]
Sheet = Iterable[Row]
Sheet = Iterable[Record]

INDIVIDUAL = "individual"
HOUSEHOLD = "household"


class Config(TypedDict):
batch_name: str
class Config(BatchNameConfig, FailIfAlienConfig):
household_pk_col: str
master_column_label: str
detail_column_label: str
Expand Down Expand Up @@ -62,11 +61,7 @@ def __str__(self) -> str:
return f"Failed to validate household {self.household_key}."


def normalize_row(row: Row) -> Mapping[str, Any]:
return {clean_field_name(k): v for k, v in row.items()}


def get_value(row: Row, column_name: str) -> Any:
def get_value(row: Record, column_name: str) -> Any:
if column_name in row:
return row[column_name]

Expand All @@ -76,7 +71,7 @@ def get_value(row: Row, column_name: str) -> Any:
def filter_rows_with_household_pk(config: Config, *sheets: Sheet) -> Iterable[Sheet]:
household_pk_col = config["household_pk_col"]

def has_household_pk(row: Row) -> bool:
def has_household_pk(row: Record) -> bool:
return bool(get_value(row, household_pk_col))

return (filter(has_household_pk, sheet) for sheet in sheets)
Expand All @@ -85,17 +80,21 @@ def has_household_pk(row: Row) -> bool:
def process_households(sheet: Sheet, job: AsyncJob, batch: Batch, config: Config) -> Mapping[int, Household]:
mapping = {}

preprocess_json_record = create_json_record_preprocessor(config, job.program.household_checker)

for i, row in enumerate(sheet, 1):
name = get_value(row, config["master_column_label"])
household_key = get_value(row, config["household_pk_col"])

preprocessed_row = preprocess_json_record(row)

try:
mapping[household_key] = cast(
Household,
job.program.households.create(
batch=batch,
name=name,
flex_fields=normalize_row(row),
flex_fields=preprocessed_row,
),
)
except Exception as e:
Expand All @@ -109,6 +108,8 @@ def process_individuals(
) -> int:
processed = 0

preprocess_json_record = create_json_record_preprocessor(config, job.program.individual_checker)

for i, row in enumerate(sheet, 1):
name = get_value(row, config["detail_column_label"])
household_key = get_value(row, config["household_pk_col"])
Expand All @@ -117,12 +118,14 @@ def process_individuals(
if not household:
raise MissingHouseholdError(i, household_key)

preprocessed_row = preprocess_json_record(row)

try:
job.program.individuals.create(
batch=batch,
name=name,
household_id=household.pk,
flex_fields=normalize_row(row),
flex_fields=preprocessed_row,
)
except Exception as e:
raise SheetProcessingError(INDIVIDUAL, i) from e
Expand Down
9 changes: 9 additions & 0 deletions src/country_workspace/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import TypedDict


class FailIfAlienConfig(TypedDict):
fail_if_alien: bool


class BatchNameConfig(TypedDict):
batch_name: str
Loading
Loading