Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/country_workspace/admin/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,31 @@
from typing import TypedDict
from admin_extra_buttons.mixins import ExtraButtonsMixin
from adminfilters.mixin import AdminAutoCompleteSearchMixin, AdminFiltersMixin
from django.contrib import admin
from django.contrib import admin, messages
from django.db.models import Model
from django.http import HttpRequest
from admin_extra_buttons.api import button

from country_workspace.contrib.hope.sync.context_programs import SyncStep, sync_context_programs


class BaseModelAdmin(ExtraButtonsMixin, AdminAutoCompleteSearchMixin, AdminFiltersMixin, admin.ModelAdmin):
pass


class SyncConfig(TypedDict):
model: type[Model]
step: SyncStep


class SyncAdminMixin:
sync_config: SyncConfig

@button()
def sync(self, request: HttpRequest) -> None:
totals = sync_context_programs(step=self.sync_config["step"])
if errors := totals.get("errors"):
self.message_user(request, "; ".join(errors), level=messages.ERROR)
else:
info = totals[self.sync_config["model"]._meta.model_name]
self.message_user(request, f"{info['add']} created - {info['upd']} updated", level=messages.SUCCESS)
16 changes: 5 additions & 11 deletions src/country_workspace/admin/office.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
from admin_extra_buttons.buttons import LinkButton
from admin_extra_buttons.decorators import button, link
from admin_extra_buttons.decorators import link
from django.contrib import admin
from django.http import HttpRequest
from django.urls import reverse
from country_workspace.contrib.hope.sync.context_programs import SyncStep

from ..models import Office
from .base import BaseModelAdmin
from .base import BaseModelAdmin, SyncAdminMixin, SyncConfig


@admin.register(Office)
class OfficeAdmin(BaseModelAdmin):
class OfficeAdmin(SyncAdminMixin, BaseModelAdmin):
list_display = ("name", "long_name", "slug", "code", "active", "kobo_country_code")
search_fields = ("name", "slug", "code")
list_filter = ("active",)
readonly_fields = ("hope_id", "slug")
ordering = ("name",)
sync_config = SyncConfig(model=Office, step=SyncStep.OFFICES)

@link(change_list=False)
def programmes(self, btn: LinkButton) -> None:
url = reverse("admin:country_workspace_program_changelist")
pk = btn.context.get("original").pk
btn.href = f"{url}?country_office__exact={pk}"

@button()
def sync(self, request: HttpRequest) -> None:
from country_workspace.contrib.hope.sync.office import sync_offices

totals = sync_offices()
self.message_user(request, f"{totals['add']} created - {totals['upd']} updated")
13 changes: 4 additions & 9 deletions src/country_workspace/admin/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from ..cache.manager import cache_manager
from ..compat.admin_extra_buttons import confirm_action
from ..models import Program
from .base import BaseModelAdmin
from .base import BaseModelAdmin, SyncAdminMixin, SyncConfig
from country_workspace.contrib.hope.sync.context_programs import SyncStep

if TYPE_CHECKING:
from admin_extra_buttons.buttons import LinkButton


@admin.register(Program)
class ProgramAdmin(BaseModelAdmin):
class ProgramAdmin(SyncAdminMixin, BaseModelAdmin):
list_display = (
"name",
"sector",
Expand All @@ -40,6 +41,7 @@ class ProgramAdmin(BaseModelAdmin):
)
ordering = ("name",)
autocomplete_fields = ("country_office",)
sync_config = SyncConfig(model=Program, step=SyncStep.PROGRAMS)

@button()
def invalidate_cache(self, request: HttpRequest, pk: str) -> None:
Expand Down Expand Up @@ -73,10 +75,3 @@ def _action(request: HttpRequest) -> HttpResponse:
description="Continuing will erase all the beneficiaries from this program",
success_message="Successfully executed",
)

@button()
def sync(self, request: HttpRequest) -> None:
from country_workspace.contrib.hope.sync.office import sync_programs

totals = sync_programs()
self.message_user(request, f"{totals['add']} created - {totals['upd']} updated - {totals['skip']} skipped")
4 changes: 2 additions & 2 deletions src/country_workspace/contrib/aurora/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from country_workspace.contrib.aurora.client import AuroraClient
from country_workspace.contrib.aurora.models import Project, Registration
from country_workspace.contrib.hope.sync.office import sync_programs
from country_workspace.contrib.hope.sync.context_programs import SyncStep, sync_context_programs
from country_workspace.models import AsyncJob, SyncLog


Expand All @@ -23,7 +23,7 @@ def sync_all(job: AsyncJob) -> dict[str, Any]:
client = AuroraClient()
with cache.lock("sync-aurora"):
return {
"programs": sync_programs(),
"programs": sync_context_programs(step=SyncStep.PROGRAMS),
"projects": sync_projects(client),
"registrations": sync_registrations(client),
}
Expand Down
228 changes: 228 additions & 0 deletions src/country_workspace/contrib/hope/sync/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
from typing import Any, Final, TypedDict, NotRequired, TypeVar
from collections.abc import Generator, Callable
from io import TextIOBase

from dataclasses import dataclass, field
from enum import Enum, auto

from django.core.cache import cache
from django.db import DatabaseError
from django.db.models import Model, Q
from country_workspace.exceptions import RemoteError

from ....models import SyncLog
from ..client import HopeClient


T = TypeVar("T", bound="BaseSync")


MESSAGES: Final[dict[str, str]] = {
"DEACTIVATION_FAILURE": "Failed during deactivation for '{entity}' : {error}",
"RECORDS_DEACTIVATED": "Deactivated '{count}' records '{entity}' (obsolete or marked inactive in source).",
"RECORD_MISSING_ID": "Skipping record due to missing 'id': {record}",
"RECORD_SKIPPED": "Skipped record '{hope_id}': {error}",
"RECORD_SYNC_FAILURE": "Failed to sync DB record '{hope_id}': {error}",
"REMOTE_API_FAILURE": "API Error fetching '{path}': {error}",
"SYNC_COMPLETE": "Sync complete for '{entity}' with result {result} with '{errors_count}' erors.",
"SYNC_START": "Start fetching '{entity}' data from HOPE core...",
}


class SkipRecordError(Exception):
"""Exception raised when a record should be skipped during synchronization."""


class LogLevel(Enum):
"""Log levels for synchronization messages."""

INFO = auto()
ERROR = auto()


class SyncConfig(TypedDict):
"""Configuration for synchronizing an entity.

Attributes:
model: The Django model class to synchronize.
path: The API path to fetch data from.
prepare_defaults: Function to prepare default values for the model.
should_process: Optional function to filter records before processing.
post_process: Optional function to process the model instance after creation/update.
should_deactivate: Optional function to determine if a record should be deactivated.

"""

model: type[Model]
path: str
should_process: NotRequired[Callable[[dict[str, Any]], bool] | None]
prepare_defaults: Callable[[dict[str, Any]], dict[str, Any] | None]
post_process: NotRequired[Callable[[Model, bool], None] | None]
should_deactivate: NotRequired[Callable[["dict[str, Any]"], bool] | None]


class BaseSyncStep(Enum):
"""Base class for synchronization steps.

Attributes:
value: The enumeration value.
sync_method: The method to execute for this step.

"""

def __init__(self, value: Any, sync_method: Callable[["BaseSync"], None]) -> None:
self._value_ = value
self._sync_method = sync_method

@property
def func(self) -> Callable[["BaseSync"], None]:
return self._sync_method


@dataclass
class BaseSync:
"""Base class for synchronization operations.

Attributes:
client: The client for fetching data from the remote API.
stdout: Optional output stream for logging messages.
total: Accumulated results of synchronization (e.g., counts of added/updated records).

"""

client: HopeClient = field(default_factory=HopeClient)
stdout: TextIOBase | None = None
total: dict[str, Any] = field(default_factory=dict)

def safe_get(self, path: str) -> Generator[dict[str, Any], None, None]:
"""Fetch data from the remote API safely, handling errors."""
try:
yield from self.client.get(path)
except RemoteError as e:
self.emit_log("REMOTE_API_FAILURE", LogLevel.ERROR, path=path, error=e)

def emit_log(self, key: str, level: LogLevel = LogLevel.INFO, **kwargs: Any) -> None:
"""Emit a log message with the specified key and level."""
if template := MESSAGES.get(key):
try:
msg = template.format(**kwargs).rstrip("\n")
except KeyError as e:
raise ValueError(
f"Log format error for key '{key}': missing placeholder '{e}'. Provided args: {kwargs}"
)
if self.stdout:
self.stdout.write(f"[{level.name}] {msg}\n")
self.stdout.flush()
if level == LogLevel.ERROR:
self.total.setdefault("errors", []).append(msg)
else:
raise KeyError(f"Log key '{key}' not found in MESSAGES configuration.")

def validated_hope_id(self, record: dict[str, Any]) -> str | None:
"""Validate and retrieve the HOPE core ID from the record."""
hope_id = record.get("id")
if not hope_id:
self.emit_log("RECORD_MISSING_ID", record=record)
return hope_id

def sync_entity(self, config: SyncConfig) -> None:
"""Synchronize an entity with the remote API.

Args:
config (SyncConfig): Configuration for the entity synchronization.

Notes:
- Fetches records from the API, processes them, and updates/creates model instances.
- Logs synchronization start, errors, and completion.
- Deactivates records based on the should_deactivate function, if specified.

"""
model, model_name = config["model"], config["model"]._meta.model_name
self.total.setdefault(model_name, {"add": 0, "upd": 0})
should_process, prepare_defaults, post_process, should_deactivate = (
config.get(k, v)
for k, v in (
("should_process", None),
("prepare_defaults", lambda _: {}),
("post_process", None),
("should_deactivate", None),
)
)

with cache.lock(f"sync-{model_name.lower()}"):
self.emit_log("SYNC_START", entity=model_name)
all_processed, inactive = set(), set()
for record in self.safe_get(config["path"]):
if not (hope_id := self.validated_hope_id(record)):
continue
all_processed.add(hope_id)
if should_deactivate and should_deactivate(record):
inactive.add(hope_id)
continue
if should_process and not should_process(record):
continue
try:
defaults = prepare_defaults(record)
if defaults is None or not defaults:
continue
instance, created = model.objects.update_or_create(hope_id=hope_id, defaults=defaults)
if post_process:
post_process(instance, created)
self.total[model_name]["add" if created else "upd"] += 1
except SkipRecordError as e:
self.emit_log("RECORD_SKIPPED", hope_id=hope_id, error=str(e))
except (DatabaseError, KeyError, AttributeError) as e:
self.emit_log("RECORD_SYNC_FAILURE", LogLevel.ERROR, hope_id=hope_id, error=str(e))
if should_deactivate:
self._deactivate_records(model, model_name, all_processed, inactive)
SyncLog.objects.register_sync(model)
self.emit_log(
"SYNC_COMPLETE",
entity=model_name,
result=self.total[model_name],
errors_count=len(self.total.get("errors", [])),
)

def _deactivate_records(self, model: type[Model], model_name: str, processed: set[str], inactive: set[str]) -> None:
"""Deactivate existed records in the database that are inactive or not present in the source."""
self.total.setdefault(model_name, {})
try:
deactivated_count = model.objects.filter(
Q(active=True) & (~Q(hope_id__in=processed) | Q(hope_id__in=inactive))
).update(active=False)
if deactivated_count:
self.total[model_name]["deactivated"] = deactivated_count
self.emit_log("RECORDS_DEACTIVATED", count=deactivated_count, entity=model_name)
except DatabaseError as e:
self.emit_log("DEACTIVATION_FAILURE", LogLevel.ERROR, entity=model_name, error=str(e))


def sync_context(
context_class: type[T],
step: BaseSyncStep | None,
stdout: TextIOBase | None = None,
**context_kwargs: Any,
) -> dict[str, Any]:
"""Run synchronization steps for a given context.

Args:
context_class (type[T]): The synchronization context class (inheriting from BaseSync).
step (BaseSyncStep | None): Specific step to execute. If None, all steps are run.
stdout (TextIOBase | None): Optional output stream for logging.
**context_kwargs (Any): Additional keyword arguments to pass to the context class.

Returns:
dict[str, Any]: Synchronization results, including counts and errors.

Notes:
Executes the specified step or all steps defined in the context's SyncStep and refreshes SyncLog.
Stops on errors.

"""
sync = context_class(stdout=stdout, **context_kwargs)
steps = (step,) if step else tuple(sync.__class__.SyncStep)
for current_step in steps:
current_step.func(sync)()
if sync.total.get("errors"):
return sync.total
return sync.total
Loading
Loading