1212from country_workspace .contrib .hope .constants import HOUSEHOLD_PUSH_BATCH_SIZE
1313from country_workspace .exceptions import RemoteError
1414from country_workspace .models import AsyncJob
15- from country_workspace .workspaces .models import CountryHousehold
15+ from country_workspace .workspaces .models import CountryHousehold , CountryIndividual
1616
1717
1818@dataclass
1919class PushProcessor :
20- """Handles pushing household data to an external system through the HopeClient API."""
20+ """Handles pushing beneficiaries data to an external system through the HopeClient API."""
2121
2222 co_slug : str
2323 batch_name : str
2424 program_id : str
25+ master_detail : bool
2526 queryset : QuerySet [CountryHousehold ] = field (default_factory = lambda : CountryHousehold .objects .none ())
2627 client : HopeClient = field (default_factory = HopeClient )
27- total : dict [str , Any ] = field (default_factory = lambda : {"households" : 0 , " errors" : []})
28+ total : dict [str , Any ] = field (default_factory = lambda : {"errors" : []})
2829 rdi_id : str | None = field (default = None , init = False )
30+ model : type = field (init = False )
31+ push_endpoint : str = field (init = False )
32+ has_members : bool = field (init = False )
2933
3034 def __post_init__ (self ) -> None :
31- """Initialize the base path for API requests."""
3235 self .base_path = f"{ self .co_slug } /rdi/"
36+ self .model , self .push_endpoint , self .has_members = (
37+ (CountryHousehold , "push/lax/" , True ) if self .master_detail else (CountryIndividual , "push/people/" , False )
38+ )
3339
34- def check_households_validity (self ) -> None :
35- """Check the validity of each household and its members in the queryset.
40+ def set_queryset (self , pks : list [int ]) -> None :
41+ """Set the queryset based on master_detail and provided pks."""
42+ qs = self .model .objects .filter (pk__in = pks )
43+ self .queryset = qs .prefetch_related ("members" ) if self .master_detail else qs
3644
37- Adds errors to `self.total["errors"]` if any household or member is invalid.
45+ def check_beneficiaries_validity (self ) -> None :
46+ """Check the validity of each beneficiaries in the queryset.
47+
48+ Adds errors to `self.total["errors"]` if any beneficiaries is invalid.
3849 """
39- for hh in self .queryset :
40- if not hh .is_valid ():
41- self .total ["errors" ].append (f"HH #{ hh .pk } invalid." )
42- for ind in hh .members .all ():
43- if not ind .is_valid ():
44- self .total ["errors" ].append (f"Ind #{ ind .pk } invalid." )
50+ for item in self .queryset :
51+ if not item .is_valid ():
52+ self .total ["errors" ].append (f"{ self .model .__name__ } #{ item .pk } invalid" )
53+ if self .has_members :
54+ for ind in item .members .all ():
55+ if not ind .is_valid ():
56+ self .total ["errors" ].append (f"Ind #{ ind .pk } invalid" )
4557
4658 def rdi_create (self ) -> None :
4759 """Create a new RDI record in the external system.
@@ -53,9 +65,9 @@ def rdi_create(self) -> None:
5365 if response := self .safe_post (path , data , "Error creating RDI" ):
5466 self .rdi_id = response .get ("id" )
5567
56- def rdi_push_lax (self ) -> None :
68+ def rdi_push (self ) -> None :
5769 """
58- Pushes a batch of household data to the external RDI system.
70+ Pushes a batch of beneficiaries data to the external RDI system.
5971
6072 Adds errors to `self.total["errors"]` if `rdi_id` is not set.
6173 Successfully pushed records are marked as removed.
@@ -64,7 +76,7 @@ def rdi_push_lax(self) -> None:
6476 self .total ["errors" ].append ("Cannot push data: rdi_id is not set" )
6577 return
6678 batch_ids , batch_data = self .prepare_batch ()
67- path = f"{ self .base_path } { self .rdi_id } /push/lax/ "
79+ path = f"{ self .base_path } { self .rdi_id } /{ self . push_endpoint } "
6880 if successful_ids := self .process_batch_response (
6981 self .safe_post (path , batch_data , "Error pushing data" ),
7082 batch_ids ,
@@ -103,17 +115,19 @@ def safe_post(self, path: str, data: Any, error_msg: str) -> dict[str, Any] | No
103115
104116 def prepare_batch (self ) -> tuple [list [int ], list [dict ]]:
105117 """
106- Prepare a batch of household data for API submission.
118+ Prepare a batch of household/individual data for API submission.
107119
108120 Returns:
109- tuple[list[int], list[dict]]: A tuple of household IDs and transformed data.
121+ tuple[list[int], list[dict]]: A tuple of household/individual IDs and transformed data.
110122
111123 """
112124 ids , data = [], []
113125 for item in self .queryset :
114126 ids .append (item .id )
115127 data .append (
116128 {** map_fields (item .flex_fields ), "members" : [map_fields (m .flex_fields ) for m in item .members .all ()]}
129+ if self .has_members
130+ else map_fields (item .flex_fields )
117131 )
118132 return ids , data
119133
@@ -133,6 +147,11 @@ def process_batch_response(self, response: dict | None, batch_ids: list[int]) ->
133147 case {"processed" : int (p ), "accepted" : int (a )} if p == a == len (batch_ids ):
134148 self .total ["households" ] = self .total .get ("households" , 0 ) + a
135149 return batch_ids
150+ case {"id" : str (_rdi_id ), "people" : list (_batch_ids )} if _rdi_id == self .rdi_id and len (_batch_ids ) == len (
151+ batch_ids
152+ ):
153+ self .total ["people" ] = self .total .get ("people" , 0 ) + len (_batch_ids )
154+ return batch_ids
136155 case {"errors" : int (e )} if e > 0 :
137156 self .total ["errors" ].append (f"Error pushing data for IDs: { batch_ids } - { response } " )
138157 case None :
@@ -143,27 +162,30 @@ def process_batch_response(self, response: dict | None, batch_ids: list[int]) ->
143162
144163 def mark_batch_removed (self , successful_ids : list [int ]) -> None :
145164 """
146- Mark successfully pushed households and members as removed in the database.
165+ Mark successfully pushed beneficiaries as removed in the database.
147166
148167 Args:
149- successful_ids (list[int]): List of successfully pushed household IDs.
168+ successful_ids (list[int]): List of successfully pushed beneficiaries IDs.
150169
151170 """
152171 try :
153172 with transaction .atomic ():
154- households = CountryHousehold .objects .filter (id__in = successful_ids ).prefetch_related ("members" )
155- for hh in households :
156- if hh .removed :
157- self .total ["errors" ].append (f"Household #{ hh .id } already marked as removed" )
173+ items = self .model .objects .filter (id__in = successful_ids )
174+ if self .has_members :
175+ items = items .prefetch_related ("members" )
176+ for item in items :
177+ if item .removed :
178+ self .total ["errors" ].append (f"{ self .model .__name__ } #{ item .id } already marked as removed" )
158179 else :
159- hh .removed = True
160- hh .save (update_fields = ["removed" ])
161- for ind in hh .members .all ():
162- if ind .removed :
163- self .total ["errors" ].append (f"Individual #{ ind .id } already marked as removed" )
164- else :
165- ind .removed = True
166- ind .save (update_fields = ["removed" ])
180+ item .removed = True
181+ item .save (update_fields = ["removed" ])
182+ if self .has_members :
183+ for ind in item .members .all ():
184+ if ind .removed :
185+ self .total ["errors" ].append (f"Individual #{ ind .id } already marked as removed" )
186+ else :
187+ ind .removed = True
188+ ind .save (update_fields = ["removed" ])
167189 except (DatabaseError , Exception ) as e :
168190 self .total ["errors" ].append (f"Failed to mark IDs { successful_ids } as removed: { e } " )
169191
@@ -176,23 +198,28 @@ def push_to_hope_core(job: AsyncJob) -> dict[str, Any]:
176198 job (AsyncJob): The job configuration containing relevant identifiers and parameters.
177199
178200 Returns:
179- dict[str, Any]: Summary of the operation including processed households and errors.
201+ dict[str, Any]: Summary of the operation including processed beneficiaries and errors.
180202
181203 """
182204
183205 def steps () -> Iterator [Callable [[], None ]]:
184- """Yield steps for pushing household data in batches."""
206+ """Yield steps for pushing beneficiaries data in batches."""
185207 yield processor .rdi_create
186208 for batch_pks in batched (job .config ["pks" ], HOUSEHOLD_PUSH_BATCH_SIZE ):
187- processor .queryset = CountryHousehold . objects . filter ( pk__in = batch_pks ). prefetch_related ( "members" )
188- yield from (processor .check_households_validity , processor .rdi_push_lax )
209+ processor .set_queryset ( batch_pks )
210+ yield from (processor .check_beneficiaries_validity , processor .rdi_push )
189211 yield processor .rdi_complete
190212
213+ if job .program .beneficiary_group is None :
214+ return {"errors" : ["Cannot proceed: beneficiary_group is not set" ]}
215+
191216 processor = PushProcessor (
192217 co_slug = job .program .country_office .slug ,
193218 batch_name = job .config .get ("batch_name" ),
194219 program_id = job .program .hope_id ,
220+ master_detail = job .program .beneficiary_group .master_detail ,
195221 )
222+
196223 for step in steps ():
197224 step ()
198225 if processor .total ["errors" ]:
0 commit comments