|
| 1 | +from typing import Any, NamedTuple |
| 2 | +from itertools import chain |
| 3 | +from collections.abc import Mapping |
| 4 | +from functools import partial |
| 5 | + |
| 6 | +from django.contrib.contenttypes.models import ContentType |
| 7 | +from django.db import transaction |
| 8 | +from django.utils import timezone |
| 9 | + |
| 10 | +from country_workspace.contrib.aurora.client import AuroraClient |
| 11 | +from country_workspace.models import AsyncJob, Batch, Individual, SyncLog, Program |
| 12 | +from country_workspace.utils.config import BatchNameConfig, ValidateModeConfig |
| 13 | +from country_workspace.utils.fields import clean_field_names |
| 14 | +from country_workspace.utils.sync_log import get_aurora_sync_log_name |
| 15 | +from country_workspace.utils.functional import compose |
| 16 | + |
| 17 | + |
| 18 | +class Config(BatchNameConfig, ValidateModeConfig): |
| 19 | + registration_reference_pk: str | None |
| 20 | + master_detail: bool |
| 21 | + |
| 22 | + |
| 23 | +class ImportResult(NamedTuple): |
| 24 | + people: int |
| 25 | + |
| 26 | + |
| 27 | +def import_data(job: AsyncJob) -> ImportResult: |
| 28 | + config: Config = job.config |
| 29 | + if config.get("master_detail"): |
| 30 | + raise NotImplementedError |
| 31 | + if not config.get("registration_reference_pk"): |
| 32 | + raise ImportError("registration_reference_pk is required for Aurora import") |
| 33 | + |
| 34 | + batch = Batch.objects.create( |
| 35 | + name=config["batch_name"], |
| 36 | + program=job.program, |
| 37 | + country_office=job.program.country_office, |
| 38 | + imported_by=job.owner, |
| 39 | + source=Batch.BatchSource.AURORA, |
| 40 | + ) |
| 41 | + |
| 42 | + total_people = 0 |
| 43 | + client = AuroraClient() |
| 44 | + for result in client.get(f"registration/{config['registration_reference_pk']}/records/"): |
| 45 | + imported = import_result(batch, result, config) |
| 46 | + total_people += imported.people |
| 47 | + return ImportResult(people=total_people) |
| 48 | + |
| 49 | + |
| 50 | +def import_result(batch: Batch, result: Mapping[str, Any], config: Config) -> ImportResult: |
| 51 | + people_counter = 0 |
| 52 | + sync_log_name = get_aurora_sync_log_name(f"registration{config['registration_reference_pk']}") |
| 53 | + program_ct = ContentType.objects.get_for_model(Program) |
| 54 | + sync_log = SyncLog.objects.filter(name=sync_log_name, content_type=program_ct, object_id=batch.program.id).first() |
| 55 | + last_id = int(sync_log.last_id) if sync_log and sync_log.last_id else 0 |
| 56 | + last_successful_id = last_id |
| 57 | + |
| 58 | + try: |
| 59 | + current_id = int(result["pk"]) |
| 60 | + if current_id <= last_id: |
| 61 | + return ImportResult(people=0) |
| 62 | + with transaction.atomic(): |
| 63 | + create_people(batch, result, config) |
| 64 | + people_counter += 1 |
| 65 | + last_successful_id = current_id |
| 66 | + except Exception as e: |
| 67 | + failed_id = result.get("pk", "unknown (before first record)") |
| 68 | + error_msg = ( |
| 69 | + f"Successfully imported {people_counter} people, before stopping at record {failed_id} due to:\n" |
| 70 | + f"Error: {e}\n" |
| 71 | + f"Last successful record ID: {last_successful_id}." |
| 72 | + ) |
| 73 | + raise ImportError(error_msg) from e |
| 74 | + finally: |
| 75 | + if last_successful_id > last_id: |
| 76 | + SyncLog.objects.update_or_create( |
| 77 | + name=sync_log_name, |
| 78 | + content_type=program_ct, |
| 79 | + object_id=batch.program.id, |
| 80 | + defaults={"last_id": str(last_successful_id), "last_update_date": timezone.now()}, |
| 81 | + ) |
| 82 | + return ImportResult(people=people_counter) |
| 83 | + |
| 84 | + |
| 85 | +def create_people(batch: Batch, record: dict[str, Any], config: Config) -> Individual: |
| 86 | + transform_individual_row = compose( |
| 87 | + flatten_top2_prefixed, |
| 88 | + clean_field_names, |
| 89 | + partial(batch.program.apply_mapping_importer, Individual), |
| 90 | + make_full_name, |
| 91 | + ) |
| 92 | + return Individual.objects.create( |
| 93 | + batch_id=batch.pk, |
| 94 | + name="", |
| 95 | + household=None, |
| 96 | + flex_fields=transform_individual_row(record["fields"]), |
| 97 | + raw_data=record, |
| 98 | + ) |
| 99 | + |
| 100 | + |
| 101 | +def flatten_top2_prefixed( |
| 102 | + data: Mapping[str, Any], |
| 103 | + sep: str = "_", |
| 104 | +) -> dict[str, Any]: |
| 105 | + """Flatten top level; prefix-expand second-level dicts by parent key; ignore deeper nesting.""" |
| 106 | + out: dict[str, Any] = {} |
| 107 | + |
| 108 | + def ld2d(items: list[Mapping[str, Any]]) -> dict[str, Any]: |
| 109 | + return dict(chain.from_iterable(d.items() for d in items)) |
| 110 | + |
| 111 | + def merge(d: Mapping[str, Any]) -> None: |
| 112 | + for k, v in d.items(): |
| 113 | + if isinstance(v, Mapping): # 2nd level dict |
| 114 | + out.update({f"{k}{sep}{kk}": vv for kk, vv in v.items()}) |
| 115 | + elif isinstance(v, list) and all(isinstance(it, Mapping) for it in v): # list[dict] |
| 116 | + merge(ld2d(v)) |
| 117 | + else: |
| 118 | + out[k] = v |
| 119 | + |
| 120 | + merge(data) |
| 121 | + return out |
| 122 | + |
| 123 | + |
| 124 | +def make_full_name(row: dict[str, Any]) -> dict[str, Any]: # pragma: no cover |
| 125 | + if (row.get("full_name") or "").strip(): |
| 126 | + return row |
| 127 | + |
| 128 | + parts = [(row.get(k) or "").strip() for k in ("given_name", "middle_name", "family_name")] |
| 129 | + if full := " ".join(p for p in parts if p): |
| 130 | + row["full_name"] = full |
| 131 | + |
| 132 | + return row |
0 commit comments