Skip to content

Commit a808f8d

Browse files
Implement check if alien feature
1 parent 69a13ba commit a808f8d

File tree

7 files changed

+145
-63
lines changed

7 files changed

+145
-63
lines changed

src/country_workspace/contrib/aurora/pipeline.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,15 @@
44

55
from country_workspace.contrib.aurora.client import AuroraClient
66
from country_workspace.models import AsyncJob, Batch, Household, Individual
7-
from country_workspace.utils.fields import clean_field_name, uppercase_field_value
7+
from country_workspace.utils.config import BatchNameConfig, FailIfAlienConfig
8+
from country_workspace.utils.fields import uppercase_field_value, RecordPreprocessor, create_json_record_preprocessor
9+
10+
11+
class Config(BatchNameConfig, FailIfAlienConfig):
12+
registration_reference_pk: str | None
13+
household_column_prefix: str
14+
individuals_column_prefix: str
15+
household_label_column: str
816

917

1018
def import_from_aurora(job: AsyncJob) -> dict[str, int]:
@@ -25,39 +33,48 @@ def import_from_aurora(job: AsyncJob) -> dict[str, int]:
2533
- "individuals": The total number of individuals imported.
2634
2735
"""
36+
config: Config = job.config
2837
total_hh = total_ind = 0
2938
batch = Batch.objects.create(
30-
name=job.config["batch_name"],
39+
name=config["batch_name"],
3140
program=job.program,
3241
country_office=job.program.country_office,
3342
imported_by=job.owner,
3443
source=Batch.BatchSource.AURORA,
3544
)
3645
client = AuroraClient()
46+
individual_preprocessor = create_json_record_preprocessor(config, job.program.individual_checker)
47+
household_preprocessor = create_json_record_preprocessor(config, job.program.household_checker)
3748
with atomic():
38-
for record in client.get(f"registration/{job.config['registration_reference_pk']}/records/"):
39-
inds_data = _collect_by_prefix(record["flatten"], job.config.get("individuals_column_prefix"))
49+
for record in client.get(f"registration/{config['registration_reference_pk']}/records/"):
50+
inds_data = _collect_by_prefix(record["flatten"], config.get("individuals_column_prefix"))
4051
if inds_data:
41-
hh = create_household(batch, record["flatten"], job.config.get("household_column_prefix"))
52+
hh = create_household(
53+
batch, record["flatten"], config.get("household_column_prefix"), household_preprocessor
54+
)
4255
total_hh += 1
4356
total_ind += len(
4457
create_individuals(
4558
household=hh,
4659
data=inds_data,
47-
household_label_column=job.config.get("household_label_column"),
60+
household_label_column=config.get("household_label_column"),
61+
preprocess_record=individual_preprocessor,
4862
)
4963
)
5064
return {"households": total_hh, "individuals": total_ind}
5165

5266

53-
def create_household(batch: Batch, data: dict[str, Any], prefix: str) -> Household:
67+
def create_household(
68+
batch: Batch, data: dict[str, Any], prefix: str, preprocess_record: RecordPreprocessor
69+
) -> Household:
5470
"""
5571
Create a Household object from the provided data and associate it with a batch.
5672
5773
Args:
5874
batch (Batch): The batch to which the household will be linked.
5975
data (dict[str, Any]): A dictionary containing household-related information.
6076
prefix (str): The prefix used to filter and group household-related information.
77+
preprocess_record (RecordPreprocessor): The function normalizing field names and checking if they are valid.
6178
6279
Returns:
6380
Household: The newly created household instance.
@@ -69,16 +86,19 @@ def create_household(batch: Batch, data: dict[str, Any], prefix: str) -> Househo
6986
flex_fields = _collect_by_prefix(data, prefix)
7087
if len(flex_fields) > 1:
7188
raise ValueError("Multiple households found")
72-
return batch.program.households.create(batch=batch, flex_fields=flex_fields)
89+
return batch.program.households.create(batch=batch, flex_fields=preprocess_record(flex_fields))
7390

7491

75-
def create_individuals(household: Household, data: dict[str, Any], household_label_column: str) -> list[Individual]:
92+
def create_individuals(
93+
household: Household, data: dict[str, Any], household_label_column: str, preprocess_record: RecordPreprocessor
94+
) -> list[Individual]:
7695
"""Create and associate Individual objects with a given Household.
7796
7897
Args:
7998
household (Household): The household to which the individuals will be linked.
8099
data (dict[str, Any]): A dictionary mapping indices to individual details.
81100
household_label_column (str): The key in the individual data used to determine the household label.
101+
preprocess_record (RecordPreprocessor): The function normalizing field names and checking if they are valid.
82102
83103
Returns:
84104
list[Individual]: A list of successfully created Individual instances.
@@ -95,7 +115,7 @@ def create_individuals(household: Household, data: dict[str, Any], household_lab
95115
batch=household.batch,
96116
household_id=household.pk,
97117
name=individual.get("given_name", ""),
98-
flex_fields=individual,
118+
flex_fields=preprocess_record(individual),
99119
),
100120
)
101121
return household.program.individuals.bulk_create(individuals)
@@ -126,8 +146,7 @@ def _collect_by_prefix(data: dict[str, Any], prefix: str) -> dict[str, dict[str,
126146
for k, v in data.items():
127147
if (stripped := k.removeprefix(prefix)) != k:
128148
index, field = stripped.split("_", 1)
129-
field_clean = clean_field_name(field)
130-
result.setdefault(index, {})[field_clean] = uppercase_field_value(field_clean, v)
149+
result.setdefault(index, {})[field] = uppercase_field_value(field, v)
131150
return result
132151

133152

src/country_workspace/contrib/kobo/sync.py

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
from typing import Any, TypedDict, cast
22

3-
from constance import config
3+
from constance import config as constance_config
44
from django.core.cache import cache
55

66
from country_workspace.contrib.kobo.api.client.main import Client
77
from country_workspace.contrib.kobo.api.data.asset import Asset
88
from country_workspace.contrib.kobo.api.data.submission import Submission
99
from country_workspace.contrib.kobo.models import KoboSubmission
10-
from country_workspace.models import AsyncJob, Batch, Household, Individual
11-
from country_workspace.utils.fields import clean_field_name
10+
from country_workspace.models import AsyncJob, Batch, Household, Individual, Program
11+
from country_workspace.utils.config import FailIfAlienConfig, BatchNameConfig
12+
from country_workspace.utils.fields import create_json_record_preprocessor, RecordPreprocessor
13+
14+
15+
class Config(BatchNameConfig, FailIfAlienConfig):
16+
project_id: str
17+
individual_records_field: str
1218

1319

1420
def make_client(country_code: str | None) -> Client:
15-
token = config.KOBO_MASTER_API_TOKEN or config.KOBO_API_TOKEN
16-
project_view_id = config.KOBO_PROJECT_VIEW_ID if config.KOBO_MASTER_API_TOKEN else None
21+
token = constance_config.KOBO_MASTER_API_TOKEN or constance_config.KOBO_API_TOKEN
22+
project_view_id = constance_config.KOBO_PROJECT_VIEW_ID if constance_config.KOBO_MASTER_API_TOKEN else None
1723
return Client(
18-
base_url=config.KOBO_KF_URL,
24+
base_url=constance_config.KOBO_KF_URL,
1925
token=token,
2026
country_code=country_code,
2127
project_view_id=project_view_id,
@@ -27,30 +33,35 @@ def extract_household_data(submission: Submission, individual_records_field: str
2733

2834

2935
def create_individuals(
30-
batch: Batch, household: Household, submission: Submission, individual_records_field: str
36+
batch: Batch, household: Household, submission: Submission, config: Config, preprocess_record: RecordPreprocessor
3137
) -> int:
3238
individuals = []
33-
for raw_individual in submission.get(individual_records_field, []):
34-
individual = {key.lstrip(f"{individual_records_field}/"): value for key, value in raw_individual.items()}
39+
for raw_individual in submission.get(config["individual_records_field"], []):
40+
individual = {
41+
key.lstrip(f"{config['individual_records_field']}/"): value for key, value in raw_individual.items()
42+
}
3543
fullname = next((key for key in individual if key.startswith("full_name")), None)
3644
individuals.append(
3745
Individual(
3846
batch=batch,
3947
household=household,
4048
name=individual.get(fullname, ""),
41-
flex_fields={clean_field_name(key): value for key, value in individual.items()},
49+
flex_fields=preprocess_record(individual),
4250
),
4351
)
4452
household.program.individuals.bulk_create(individuals)
4553
return len(individuals)
4654

4755

48-
def create_household(batch: Batch, submission: Submission, individual_records_field: str) -> Household:
49-
household_fields = extract_household_data(submission, individual_records_field)
56+
def create_household(
57+
batch: Batch, submission: Submission, config: Config, preprocess_record: RecordPreprocessor
58+
) -> Household:
59+
household_fields = extract_household_data(submission, config["individual_records_field"])
5060
return cast(
5161
Household,
5262
batch.program.households.create(
53-
batch=batch, flex_fields={clean_field_name(key): value for key, value in household_fields.items()}
63+
batch=batch,
64+
flex_fields=preprocess_record(household_fields),
5465
),
5566
)
5667

@@ -63,40 +74,44 @@ class ImportResult(TypedDict):
6374
individuals: int
6475

6576

66-
def import_asset(batch: Batch, asset: Asset, individual_records_field: str) -> ImportResult:
77+
def import_asset(batch: Batch, asset: Asset, config: Config, program: Program) -> ImportResult:
6778
household_counter = 0
6879
individual_counter = 0
6980

81+
individual_preprocessor = create_json_record_preprocessor(config, program.individual_checker)
82+
household_preprocessor = create_json_record_preprocessor(config, program.household_checker)
83+
7084
with cache.lock(ASSET_CACHE_KEY.format(asset_id=asset.uid)):
7185
submission_ids = set(KoboSubmission.objects.filter(asset_uid=asset.uid).values_list("submission_id", flat=True))
7286
for submission in asset.submissions:
7387
if submission.id in submission_ids:
7488
continue
75-
household = create_household(batch, submission, individual_records_field)
89+
household = create_household(batch, submission, config, household_preprocessor)
7690
household_counter += 1
77-
individual_counter += create_individuals(batch, household, submission, individual_records_field)
91+
individual_counter += create_individuals(batch, household, submission, config, individual_preprocessor)
7892

7993
return ImportResult(households=household_counter, individuals=individual_counter)
8094

8195

8296
def import_data(job: AsyncJob) -> ImportResult:
97+
config: Config = job.config
98+
8399
batch = Batch.objects.create(
84-
name=job.config["batch_name"],
100+
name=config["batch_name"],
85101
program=job.program,
86102
country_office=job.program.country_office,
87103
imported_by=job.owner,
88104
source=Batch.BatchSource.KOBO,
89105
)
90-
individual_records_field = job.config["individual_records_field"]
91106
client = make_client(job.program.country_office.kobo_country_code)
92107

93108
household_counter = 0
94109
individual_counter = 0
95110

96111
for asset in client.assets:
97112
# TODO: fetch specific asset
98-
if job.config["project_id"] == asset.uid:
99-
import_result = import_asset(batch, asset, individual_records_field)
113+
if config["project_id"] == asset.uid:
114+
import_result = import_asset(batch, asset, config, job.program)
100115
household_counter += import_result["households"]
101116
individual_counter += import_result["individuals"]
102117

src/country_workspace/datasources/rdi.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
import io
22
from collections.abc import Iterable
3-
from typing import Mapping, Any, TypedDict, cast
3+
from typing import Mapping, Any, cast
44

55
from django.db.transaction import atomic
66
from hope_smart_import.readers import open_xls_multi
77

88
from country_workspace.models import AsyncJob, Batch, Household
9-
from country_workspace.utils.fields import clean_field_name
9+
from country_workspace.utils.config import FailIfAlienConfig, BatchNameConfig
10+
from country_workspace.utils.fields import create_json_record_preprocessor, Record
1011

1112
RDI = str | io.BytesIO
12-
Row = Mapping[str, Any]
13-
Sheet = Iterable[Row]
13+
Sheet = Iterable[Record]
1414

1515
INDIVIDUAL = "individual"
1616
HOUSEHOLD = "household"
1717

1818

19-
class Config(TypedDict):
20-
batch_name: str
19+
class Config(BatchNameConfig, FailIfAlienConfig):
2120
household_pk_col: str
2221
master_column_label: str
2322
detail_column_label: str
@@ -62,11 +61,7 @@ def __str__(self) -> str:
6261
return f"Failed to validate household {self.household_key}."
6362

6463

65-
def normalize_row(row: Row) -> Mapping[str, Any]:
66-
return {clean_field_name(k): v for k, v in row.items()}
67-
68-
69-
def get_value(row: Row, column_name: str) -> Any:
64+
def get_value(row: Record, column_name: str) -> Any:
7065
if column_name in row:
7166
return row[column_name]
7267

@@ -76,7 +71,7 @@ def get_value(row: Row, column_name: str) -> Any:
7671
def filter_rows_with_household_pk(config: Config, *sheets: Sheet) -> Iterable[Sheet]:
7772
household_pk_col = config["household_pk_col"]
7873

79-
def has_household_pk(row: Row) -> bool:
74+
def has_household_pk(row: Record) -> bool:
8075
return bool(get_value(row, household_pk_col))
8176

8277
return (filter(has_household_pk, sheet) for sheet in sheets)
@@ -85,17 +80,21 @@ def has_household_pk(row: Row) -> bool:
8580
def process_households(sheet: Sheet, job: AsyncJob, batch: Batch, config: Config) -> Mapping[int, Household]:
8681
mapping = {}
8782

83+
preprocess_json_record = create_json_record_preprocessor(config, job.program.household_checker)
84+
8885
for i, row in enumerate(sheet, 1):
8986
name = get_value(row, config["master_column_label"])
9087
household_key = get_value(row, config["household_pk_col"])
9188

89+
preprocessed_row = preprocess_json_record(row)
90+
9291
try:
9392
mapping[household_key] = cast(
9493
Household,
9594
job.program.households.create(
9695
batch=batch,
9796
name=name,
98-
flex_fields=normalize_row(row),
97+
flex_fields=preprocessed_row,
9998
),
10099
)
101100
except Exception as e:
@@ -109,6 +108,8 @@ def process_individuals(
109108
) -> int:
110109
processed = 0
111110

111+
preprocess_json_record = create_json_record_preprocessor(config, job.program.individual_checker)
112+
112113
for i, row in enumerate(sheet, 1):
113114
name = get_value(row, config["detail_column_label"])
114115
household_key = get_value(row, config["household_pk_col"])
@@ -117,12 +118,14 @@ def process_individuals(
117118
if not household:
118119
raise MissingHouseholdError(i, household_key)
119120

121+
preprocessed_row = preprocess_json_record(row)
122+
120123
try:
121124
job.program.individuals.create(
122125
batch=batch,
123126
name=name,
124127
household_id=household.pk,
125-
flex_fields=normalize_row(row),
128+
flex_fields=preprocessed_row,
126129
)
127130
except Exception as e:
128131
raise SheetProcessingError(INDIVIDUAL, i) from e
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from typing import TypedDict
2+
3+
4+
class FailIfAlienConfig(TypedDict):
5+
fail_if_alien: bool
6+
7+
8+
class BatchNameConfig(TypedDict):
9+
batch_name: str

0 commit comments

Comments
 (0)