Skip to content

Commit 643f295

Browse files
authored
Validate records in chunks (#271)
1 parent 3469a05 commit 643f295

File tree

5 files changed

+30
-57
lines changed

5 files changed

+30
-57
lines changed

src/country_workspace/config/fragments/constance.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
),
6565
"MAILJET_API_KEY": (MAILJET_API_KEY, "Mailjet API key", str),
6666
"MAILJET_SECRET_KEY": (MAILJET_SECRET_KEY, "Mailjet secret key", "write_only_input"),
67+
"CHUNK_SIZE_FOR_VALIDATION_TASK": (500, "Number of records to process per chunk in validation tasks", int),
6768
}
6869

6970
CONSTANCE_CONFIG_FIELDSETS = {
@@ -83,6 +84,7 @@
8384
"MAILJET_SECRET_KEY",
8485
),
8586
"Data consistency": ("CONCURRENCY_GUARD",),
87+
"System": ("CHUNK_SIZE_FOR_VALIDATION_TASK",),
8688
}
8789

8890
# Mapping of config keys to masked default display values in the Constance admin UI.

src/country_workspace/workspaces/admin/cleaners/actions.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from .calculate_checksum import calculate_checksum_impl
1818
from .mass_update import MassUpdateForm, mass_update_impl
1919
from .regex import RegexUpdateForm, regex_update_impl
20-
from .validate import validate_queryset
20+
from .validate import create_validation_jobs
2121

2222
if TYPE_CHECKING:
2323
from django.db.models import QuerySet
@@ -32,19 +32,15 @@ def validate_records(
3232
queryset: "QuerySet[Beneficiary]",
3333
) -> None:
3434
if model_admin._check_empty_queryset(request, queryset):
35-
return None
36-
opts = queryset.model._meta
37-
job = AsyncJob.objects.create(
35+
return
36+
create_validation_jobs(
3837
description=validate_records.short_description,
39-
type=AsyncJob.JobType.ACTION,
4038
owner=request.user,
41-
action=fqn(validate_queryset),
4239
program=state.program,
43-
config={"pks": list(queryset.values_list("pk", flat=True)), "model_name": opts.label},
40+
queryset=queryset,
4441
)
45-
job.queue()
4642
model_admin.message_user(request, "Task scheduled", messages.SUCCESS)
47-
return job
43+
return
4844

4945

5046
@admin.action(description="Mass update record fields", permissions=["mass_update"])

src/country_workspace/workspaces/admin/cleaners/validate.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22
from typing import Any
33
from itertools import batched
44
from collections.abc import Iterable
5+
6+
from concurrency.utils import fqn
7+
from constance import config
58
from django.db.models import Model, QuerySet, Prefetch
69
from django.db.models.query import prefetch_related_objects
710

811
from country_workspace.context import batch_ctx
9-
from country_workspace.models import AsyncJob, Household, Individual
12+
from country_workspace.models import AsyncJob, Household, Individual, Program
1013
from country_workspace.state import state
1114

1215

@@ -50,17 +53,6 @@ def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwarg
5053
return {"valid": valid, "invalid": invalid}
5154

5255

53-
def validate_program(job: AsyncJob) -> dict[str, int]:
54-
try:
55-
program = job.program
56-
qs = program.households.all() if program.beneficiary_group.master_detail else program.individuals.all()
57-
return validate_queryset(qs)
58-
59-
except Exception as e: # pragma: no cover
60-
logger.error("Error during program validation: %s", e)
61-
raise
62-
63-
6456
def _validate_and_count(objs: Iterable[Model]) -> tuple[int, int]:
6557
valid = invalid = 0
6658
for obj in objs:
@@ -70,3 +62,18 @@ def _validate_and_count(objs: Iterable[Model]) -> tuple[int, int]:
7062
else:
7163
invalid += 1
7264
return valid, invalid
65+
66+
67+
def create_validation_jobs(description: str, owner: str, program: Program, queryset: QuerySet) -> AsyncJob:
68+
opts = queryset.model._meta
69+
queryset = queryset.values_list("pk", flat=True).order_by("pk")
70+
for chunk in batched(queryset, config.CHUNK_SIZE_FOR_VALIDATION_TASK):
71+
job = AsyncJob.objects.create(
72+
description=f"{description} (PKs {chunk[0]} - {chunk[-1]})",
73+
type=AsyncJob.JobType.ACTION,
74+
owner=owner,
75+
action=fqn(validate_queryset),
76+
program=program,
77+
config={"pks": chunk, "model_name": opts.label},
78+
)
79+
job.queue()

src/country_workspace/workspaces/admin/hh_ind.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from admin_extra_buttons.decorators import button
44
from adminfilters.mixin import AdminAutoCompleteSearchMixin
5-
from concurrency.utils import fqn
65
from django.contrib import messages
76
from django.contrib.admin.utils import unquote
87
from django.core.exceptions import PermissionDenied
@@ -16,12 +15,11 @@
1615
from django.utils.translation import gettext_lazy as _
1716

1817
from ...cache.manager import cache_manager
19-
from ...models import AsyncJob
2018
from ...state import state
2119
from ..options import WorkspaceModelAdmin
2220
from ..models import CountryHousehold, CountryIndividual
2321
from .cleaners import actions
24-
from .cleaners.validate import validate_program
22+
from .cleaners.validate import create_validation_jobs
2523
from ...utils.flex_fields import Base64ImageField, get_checker_fields
2624

2725
if TYPE_CHECKING:
@@ -168,15 +166,13 @@ def validate_single(self, request: HttpRequest, pk: str) -> "HttpResponse":
168166
)
169167
def validate_program(self, request: HttpRequest) -> "HttpResponse":
170168
program = state.program
171-
job = AsyncJob.objects.create(
169+
queryset = program.households.all() if program.beneficiary_group.master_detail else program.individuals.all()
170+
create_validation_jobs(
172171
description="Validate Entire Programme",
173-
type=AsyncJob.JobType.TASK,
174172
owner=request.user,
175-
action=fqn(validate_program),
176173
program=program,
177-
config={},
174+
queryset=queryset,
178175
)
179-
job.queue()
180176
self.message_user(request, _("Task scheduled"), messages.SUCCESS)
181177

182178
@button(html_attrs={"title": "Shows raw data as stored, ready to be sent to HOPE"})

tests/admin/test_admin_cleaners.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
import pytest
2-
from pytest_mock import MockerFixture
3-
from unittest.mock import Mock
4-
5-
from country_workspace.workspaces.admin.cleaners.validate import validate_program
62
from country_workspace.workspaces.models import CountryHousehold, CountryIndividual, CountryProgram
73

84
type Beneficiary = CountryHousehold | CountryIndividual
@@ -31,27 +27,3 @@ def beneficiary(program: CountryProgram, master_detail: bool) -> Beneficiary:
3127
batch__country_office=program.country_office,
3228
household=None,
3329
)
34-
35-
36-
@pytest.mark.parametrize(
37-
("return_value", "expected"),
38-
[(True, {"valid": 1, "invalid": 0}), (False, {"valid": 0, "invalid": 1})],
39-
ids=["valid", "invalid"],
40-
)
41-
def test_validate_program_success(
42-
mocker: MockerFixture, beneficiary: Beneficiary, master_detail: bool, return_value: bool, expected: dict[str, int]
43-
) -> None:
44-
model_path = "country_workspace.models.Household" if master_detail else "country_workspace.models.Individual"
45-
mocker.patch(f"{model_path}.validate_with_checker", return_value=return_value)
46-
assert validate_program(Mock(program=beneficiary.program)) == expected
47-
48-
49-
def test_validate_program_exception(mocker: MockerFixture, beneficiary: Beneficiary, master_detail: bool) -> None:
50-
model_path = "country_workspace.models.Household" if master_detail else "country_workspace.models.Individual"
51-
mocker.patch(f"{model_path}.validate_with_checker", side_effect=Exception("Test error"))
52-
with pytest.raises(Exception, match="Test error"):
53-
validate_program(Mock(program=beneficiary.program))
54-
55-
56-
def test_validate_program_empty_queryset(program: CountryProgram) -> None:
57-
assert validate_program(Mock(program=program)) == {"valid": 0, "invalid": 0}

0 commit comments

Comments
 (0)