Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/src/import_data/aurora.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ menu, then press the **[Import Data]** button and select the **[Aurora]** tab. H
By default, will be used: *<"Batch " + the current datetime>*

- **Registration** – Select the specific Aurora registration to import. If needed, [synchronize](../interfaces.md#synchronize-unified-classifiers) unified classifiers before proceeding.
- **Household Name Column** – Specify which Individual's column contains the Household's name.

- **Household column prefix** - A string added at the beginning of column names to indicate household-related data. It can appear in various forms (e.g., "household_" or "household-info") and is used to group these columns.

- **Individuals column prefix** - A string added at the beginning of column names to indicate individual-related data. It can appear in various forms (e.g., "individual-details_" or "personas_") and is used to group these columns.

- **Household label column** – Specify which Individual's column should be used as label for the household.

By default, this is set to *family_name*.

Expand Down
15 changes: 13 additions & 2 deletions src/country_workspace/contrib/aurora/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@ class ImportAuroraForm(forms.Form):
help_text="What type of registrations are being imported.",
)

household_name_column = forms.CharField(
household_column_prefix = forms.CharField(
initial="household_",
help_text="Household's column group prefix",
)

individuals_column_prefix = forms.CharField(
initial="individuals_",
help_text="Individuals' column group prefix",
)

household_label_column = forms.CharField(
required=False,
initial="family_name",
help_text="Which Individual's column contains the Household's name.",
help_text="Which Individual's column should be used as label for the household.",
)

check_before = forms.BooleanField(
required=False, help_text="Prevent import if errors if data is not valid against data checker."
)

fail_if_alien = forms.BooleanField(
required=False, help_text="Fails if it finds fields which do not exists in data checker."
)
Expand Down
121 changes: 44 additions & 77 deletions src/country_workspace/contrib/aurora/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ def import_from_aurora(job: AsyncJob) -> dict[str, int]:
"""Import data from the Aurora system into the database within an atomic transaction.
Args:
job (AsyncJob): The job instance containing the configuration and context for data synchronization.
job (AsyncJob): The job instance containing the configuration and context for data import.
Expected keys in `job.config`:
- "batch_name" (str): The name for the newly created batch.
- "registration_reference_pk" (int): The unique identifier of the registration to import.
- "household_name_column" (str, optional): The column name used to determine the household head.
- "household_column_prefix" (str, optional): The prefix for household-related columns.
- "individuals_column_prefix" (str, optional): The prefix for individual-related columns.
- "household_label_column" (str, optional): The column name used to determine the household label.
Returns:
dict[str, int]: A dictionary with the counts of successfully created records:
Expand All @@ -24,40 +26,38 @@ def import_from_aurora(job: AsyncJob) -> dict[str, int]:
"""
total_hh = total_ind = 0
batch_name = job.config["batch_name"]
batch = Batch.objects.create(
name=batch_name,
name=job.config["batch_name"],
program=job.program,
country_office=job.program.country_office,
imported_by=job.owner,
source=Batch.BatchSource.AURORA,
)

registration = job.config["registration_reference_pk"]
client = AuroraClient()
with atomic():
for record in client.get(f"registration/{registration}/records/"):
hh = create_household(batch, record["flatten"])
total_hh += 1
total_ind += len(
create_individuals(
household=hh,
data=record["flatten"],
household_name_column=job.config.get("household_name_column"),
for record in client.get(f"registration/{job.config['registration_reference_pk']}/records/"):
inds_data = _collect_by_prefix(record["flatten"], job.config.get("individuals_column_prefix"))
if inds_data:
hh = create_household(batch, record["flatten"], job.config.get("household_column_prefix"))
total_hh += 1
total_ind += len(
create_individuals(
household=hh,
data=inds_data,
household_label_column=job.config.get("household_label_column"),
)
)
)

return {"households": total_hh, "individuals": total_ind}


def create_household(batch: Batch, data: dict[str, Any]) -> Household:
def create_household(batch: Batch, data: dict[str, Any], prefix: str) -> Household:
"""
Create a Household object from the provided data and associate it with a batch.
Args:
batch (Batch): The batch to which the household will be linked.
data (dict[str, Any]): A dictionary containing household-related information,
typically prefixed with "household_".
data (dict[str, Any]): A dictionary containing household-related information.
prefix (str): The prefix used to filter and group household-related information.
Returns:
Household: The newly created household instance.
Expand All @@ -66,55 +66,38 @@ def create_household(batch: Batch, data: dict[str, Any]) -> Household:
ValueError: If multiple household entries are found in the provided data.
"""
flex_fields = _collect_by_prefix(data, prefix="household_")

if len(flex_fields) == 1:
flex_fields = next(iter(flex_fields.values()))
else:
flex_fields = _collect_by_prefix(data, prefix)
if len(flex_fields) > 1:
raise ValueError("Multiple households found")

return batch.program.households.create(batch=batch, flex_fields=flex_fields)


def create_individuals(
household: Household,
data: dict[str, Any],
household_name_column: str,
) -> list[Individual]:
def create_individuals(household: Household, data: dict[str, Any], household_label_column: str) -> list[Individual]:
"""Create and associate Individual objects with a given Household.
Args:
household (Household): The household to which the individuals will be linked.
data (dict[str, Any]): A dictionary containing individual details, typically
structured with a prefix for multiple individuals.
household_name_column (str): The key in the individual data used to determine
the household head's name.
data (dict[str, Any]): A dictionary mapping indices to individual details.
household_label_column (str): The key in the individual data used to determine the household label.
Returns:
list[Individual]: A list of successfully created Individual instances.
Raises:
ValueError: If no household head is identified in the provided data.
"""
individuals = []
head_found = False

individuals_data = _collect_by_prefix(data, prefix="individuals_")
for individual in individuals_data.values():
for individual in data.values():
if not head_found:
head_found = _update_household_name_from_individual(household, individual, household_name_column)
fullname_field = next((k for k in individual if k.startswith("given_name")), None)
head_found = _update_household_label_from_individual(household, individual, household_label_column)
individuals.append(
Individual(
batch=household.batch,
household_id=household.pk,
name=individual.get(fullname_field, ""),
name=individual.get("given_name", ""),
flex_fields=individual,
),
)
if not head_found:
raise ValueError(f"No head of household {household.flex_fields} found")
return household.program.individuals.bulk_create(individuals)


Expand All @@ -127,28 +110,16 @@ def _collect_by_prefix(data: dict[str, Any], prefix: str) -> dict[str, dict[str,
Returns:
dict[str, dict[str, Any]]: A dictionary where each key is an index extracted from the original keys,
and each value is a dictionary of the corresponding grouped fields (with normalized field names and,
for specific fields, values converted to uppercase).
Raises:
ValueError: If no matching data is found with the specified prefix.
and each value is a dictionary of the corresponding grouped fields with normalized field names
and, for specific fields, values converted to uppercase. Returns an empty dictionary if no
matching keys are found.
Examples:
>>> data = {
... "user_0_relationship_h_c": "head",
... "user_0_gender_i_c": "male",
... "user_0_other_key": "other",
... "user_1_relationship_h_c": "son_daughter",
... "user_1_gender_i_c": "female",
... "user_1_other_key": "moreover",
... }
>>> data = {"user_0_relationship": "head", "user_0_gender": "male", "user_1_gender": "female"}
>>> _collect_by_prefix(data, "user_")
{'0': {'relationship': 'HEAD', 'gender': 'MALE', 'other_key': 'other'},
'1': {'relationship': 'SON_DAUGHTER', 'gender': 'FEMALE', 'other_key': 'moreover'}}
>>> _collect_by_prefix(data, "nonexistent_")
Traceback (most recent call last):
...
ValueError: No data found with prefix 'nonexistent_'
{'0': {'relationship': 'HEAD', 'gender': 'MALE'}, '1': {'gender': 'FEMALE'}}
>>> _collect_by_prefix(data, "other_")
{}
"""
result = {}
Expand All @@ -157,33 +128,29 @@ def _collect_by_prefix(data: dict[str, Any], prefix: str) -> dict[str, dict[str,
index, field = stripped.split("_", 1)
field_clean = clean_field_name(field)
result.setdefault(index, {})[field_clean] = uppercase_field_value(field_clean, v)
if not result:
raise ValueError(f"No data found with prefix '{prefix}'")
return result


def _update_household_name_from_individual(
household: Household,
individual: dict[str, Any],
household_name_column: str,
def _update_household_label_from_individual(
household: Household, individual: dict[str, Any], household_label_column: str
) -> bool:
"""Update the household's name based on an individual's role and specified name field.
Args:
household (Household): The household instance to update.
individual (dict[str, Any]): A dictionary containing the individual's data,
including relationship status and potential household name.
household_name_column (str): The key in the individual's data that stores
household_label_column (str): The key in the individual's data that stores
the name to assign to the household.
Returns:
bool: True if the household name was updated, False otherwise.
bool: True if the household name was updated (individual is head and name provided), False otherwise.
"""
if any(individual.get(k) == "HEAD" for k in individual if k.startswith("relationship")):
name = individual.get(household_name_column)
if name:
household.name = name
household.save()
return True
is_head = any(individual.get(k) == "HEAD" for k in individual if k.startswith("relationship"))
name = individual.get(household_label_column)
if is_head and name:
household.name = name
household.save(update_fields=["name"])
return True
return False
4 changes: 3 additions & 1 deletion src/country_workspace/workspaces/admin/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ def import_aurora(self, request: HttpRequest, program: "CountryProgram") -> "Imp
config={
"batch_name": form.cleaned_data["batch_name"] or BATCH_NAME_DEFAULT,
"registration_reference_pk": registration_reference_pk,
"household_name_column": form.cleaned_data["household_name_column"],
"household_column_prefix": form.cleaned_data["household_column_prefix"],
"individuals_column_prefix": form.cleaned_data["individuals_column_prefix"],
"household_label_column": form.cleaned_data["household_label_column"],
},
)
job.queue()
Expand Down
Loading
Loading