Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci.disabled/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,4 @@ workflows:
- staging
- master
- ch39409-backend
- LMSM_Improve_export_locations
129 changes: 77 additions & 52 deletions src/etools/applications/last_mile/admin_panel/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,61 +10,70 @@ class BaseCSVExporter:
def __init__(self, chunk_size: int = DEFAULT_CHUNK_SIZE):
self.chunk_size = chunk_size

def _write_csv_row(self, row_data: List[Any]) -> str:
def _write_csv_rows_bulk(self, rows_data: List[List[Any]]) -> str:
output = StringIO()
writer = csv.writer(output)
writer.writerow(row_data)
writer.writerows(rows_data)
output.seek(0)
return output.read()

def _write_csv_row(self, row_data: List[Any]) -> str:
return self._write_csv_rows_bulk([row_data])

def _serialize_items_batch(self, items: List[Any], serializer_class: Any) -> List[Dict[str, Any]]:
return serializer_class(items, many=True).data

def _serialize_item(self, item: Any, serializer_class: Any) -> Dict[str, Any]:
return serializer_class(item).data

def _extract_values(self, data: Dict[str, Any], headers: List[str]) -> List[Any]:
return [data.get(header, '') for header in headers]

def _get_first_item(self, queryset):
for item in queryset[:1]:
return item
return None
def _extract_values_bulk(self, data_list: List[Dict[str, Any]], headers: List[str]) -> List[List[Any]]:
return [[data.get(header, '') for header in headers] for data in data_list]

def _process_chunk_with_batch_serialization(
self, chunk: List[Any], serializer_class: Any, headers: Dict[str, str]
) -> Iterator[str]:
serialized_data_list = self._serialize_items_batch(chunk, serializer_class)
for serialized_data in serialized_data_list:
row_values = self._extract_values(serialized_data, headers.keys())
yield self._write_csv_row(row_values)
rows_values = self._extract_values_bulk(serialized_data_list, list(headers.keys()))
if rows_values:
yield self._write_csv_rows_bulk(rows_values)

def _process_chunk_with_row_expansion(
self, chunk: List[Any], serializer_class: Any, headers: Dict[str, str]
) -> Iterator[str]:
for obj in chunk:
item_serializer = serializer_class(obj)
rows = item_serializer.generate_rows(obj)
for row_data in rows:
row_values = self._extract_values(row_data, headers.keys())
yield self._write_csv_row(row_values)
all_rows = []

if hasattr(serializer_class, 'bulk_generate_rows'):
all_rows = serializer_class.bulk_generate_rows(chunk)
else:
for obj in chunk:
item_serializer = serializer_class(obj)
rows = item_serializer.generate_rows(obj)
all_rows.extend(rows)

if all_rows:
rows_values = self._extract_values_bulk(all_rows, list(headers.keys()))
yield self._write_csv_rows_bulk(rows_values)

def _paginate_queryset(self, queryset) -> Iterator[list]:
"""Paginate queryset in chunks, preserving prefetch_related.

Unlike .iterator(), this evaluates each chunk as a separate
queryset slice so prefetch_related lookups are properly applied.
"""
offset = 0
while True:
chunk = list(queryset[offset:offset + self.chunk_size])
if not chunk:
break
yield chunk
offset += self.chunk_size

def _iterate_with_chunking(
self, queryset, serializer_class: Any, headers: Dict[str, str], use_row_expansion: bool = False
) -> Iterator[str]:
chunk = []
process_method = (
self._process_chunk_with_row_expansion if use_row_expansion
else self._process_chunk_with_batch_serialization
)

for item in queryset.iterator(chunk_size=self.chunk_size):
chunk.append(item)
if len(chunk) >= self.chunk_size:
yield from process_method(chunk, serializer_class, headers)
chunk = []

if chunk:
for chunk in self._paginate_queryset(queryset):
yield from process_method(chunk, serializer_class, headers)


Expand Down Expand Up @@ -126,24 +135,46 @@ def generate_csv_data(self, queryset, serializer_class, only_locations=False) ->
"approval_status": "Approval Status",
}

first_item = self._get_first_item(queryset)
if not first_item:
return
queryset = self._optimize_queryset(queryset)

serializer = serializer_class(first_item)
has_row_expansion = hasattr(serializer, 'generate_rows')

if only_locations and has_row_expansion:
has_row_expansion = False
if not queryset.exists():
return

if has_row_expansion:
yield self._write_csv_row(expanded_headers.values())
yield from self._iterate_with_chunking(
queryset, serializer_class, expanded_headers, use_row_expansion=True
)
else:
if only_locations:
yield self._write_csv_row(standard_headers.values())
yield from self._iterate_with_chunking(queryset, serializer_class, standard_headers)
for chunk in self._chunk_queryset(queryset):
rows_data = []
for location in chunk:
serialized = serializer_class(location).data
row_values = [serialized.get(header, '') for header in standard_headers.keys()]
rows_data.append(row_values)
if rows_data:
yield self._write_csv_rows_bulk(rows_data)
else:
has_row_expansion = hasattr(serializer_class, 'generate_rows')
if has_row_expansion:
yield self._write_csv_row(expanded_headers.values())
self.chunk_size = min(self.chunk_size, 500) # Limit to avoid memory issues
yield from self._iterate_with_chunking(
queryset, serializer_class, expanded_headers, use_row_expansion=True
)
else:
yield self._write_csv_row(standard_headers.values())
yield from self._iterate_with_chunking(queryset, serializer_class, standard_headers)

def _chunk_queryset(self, queryset):
yield from self._paginate_queryset(queryset)

def _optimize_queryset(self, queryset):
queryset = queryset.distinct()

if not hasattr(queryset, '_prefetch_related_lookups') or not queryset._prefetch_related_lookups:
queryset = queryset.prefetch_related(
'partner_organizations__organization',
'poi_type',
'parent'
)
return queryset


class UserLocationsCSVExporter(BaseCSVExporter):
Expand Down Expand Up @@ -175,10 +206,7 @@ def generate_csv_data(self, queryset, serializer_class) -> Iterator[str]:

yield self._write_csv_row(headers.values())

for poi_type in queryset.iterator(chunk_size=self.chunk_size):
serialized_data = self._serialize_item(poi_type, serializer_class)
row_values = self._extract_values(serialized_data, headers.keys())
yield self._write_csv_row(row_values)
yield from self._iterate_with_chunking(queryset, serializer_class, headers)


class UserAlertNotificationsCSVExporter(BaseCSVExporter):
Expand All @@ -190,7 +218,4 @@ def generate_csv_data(self, queryset, serializer_class) -> Iterator[str]:
}
yield self._write_csv_row(headers.values())

for poi_type in queryset.iterator(chunk_size=self.chunk_size):
serialized_data = self._serialize_item(poi_type, serializer_class)
row_values = self._extract_values(serialized_data, headers.keys())
yield self._write_csv_row(row_values)
yield from self._iterate_with_chunking(queryset, serializer_class, headers)
53 changes: 51 additions & 2 deletions src/etools/applications/last_mile/admin_panel/serializers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import defaultdict

from django.contrib.auth import get_user_model
from django.contrib.gis.geos import Point
from django.db import connection, transaction
Expand Down Expand Up @@ -565,8 +567,13 @@ def get_status(self, obj):
def get_primary_type(self, obj):
return obj.poi_type.name if obj.poi_type else None

def _get_cached_partners(self, obj):
if not hasattr(obj, '_cached_partners'):
obj._cached_partners = list(obj.partner_organizations.all())
return obj._cached_partners

def get_implementing_partner(self, obj):
partners = obj.partner_organizations.all().prefetch_related('organization')
partners = self._get_cached_partners(obj)
return ",".join([f"{partner.organization.vendor_number} - {partner.organization.name}" if partner.organization else partner.name if partner.name else "-" for partner in partners])

def get_lat(self, obj):
Expand All @@ -577,7 +584,7 @@ def get_lng(self, obj):

def to_representation(self, instance):
data = super().to_representation(instance)
partners = instance.partner_organizations.all().prefetch_related('organization')
partners = self._get_cached_partners(instance)
implementing_partner_names = ",".join([f"{partner.organization.name}" if partner.organization else partner.name if partner.name else "-" for partner in partners])
implementing_partner_numbers = ",".join([f"{partner.organization.vendor_number}" if partner.organization else partner.name if partner.name else "-" for partner in partners])
data.update({
Expand Down Expand Up @@ -618,6 +625,48 @@ def generate_rows(self, instance):

return rows or [base]

@classmethod
def bulk_generate_rows(cls, instances):
poi_ids = [poi.id for poi in instances]

transfers_qs = (
models.Transfer.all_objects
.filter(destination_point_id__in=poi_ids)
.select_related('destination_point')
.prefetch_related('items')
)

transfers_by_poi = defaultdict(list)
for transfer in transfers_qs:
transfers_by_poi[transfer.destination_point_id].append(transfer)

all_rows = []
serializer = cls()

for instance in instances:
base = serializer.base_representation(instance)
poi_transfers = transfers_by_poi.get(instance.id, [])

if poi_transfers:
for transfer in poi_transfers:
for item in transfer.items.all():
row = dict(base)
row.update({
"transfer_name": transfer.name,
"transfer_ref": getattr(transfer, "unicef_release_order", None),
"item_id": item.id,
"item_name": getattr(item, "description", None),
"item_qty": getattr(item, "quantity", None),
"item_batch_number": getattr(item, "batch_id", None),
"item_expiry_date": getattr(item, "expiry_date", None),
'approval_status': transfer.approval_status,
})
all_rows.append(row)
else:
all_rows.append(base)

return all_rows

class Meta:
model = models.PointOfInterest
fields = (
Expand Down
33 changes: 17 additions & 16 deletions src/etools/applications/last_mile/admin_panel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,23 @@ def get_queryset(self):
'parent__parent__parent__parent__geom'
)

pending_subquery = models.Item.objects.filter(
transfer__destination_point_id=OuterRef('id'),
transfer__approval_status=models.Transfer.ApprovalStatus.PENDING,
hidden=False
).values('transfer__destination_point_id').annotate(count=Count('id')).values('count')

approved_subquery = models.Item.objects.filter(
transfer__destination_point_id=OuterRef('id'),
transfer__approval_status=models.Transfer.ApprovalStatus.APPROVED,
hidden=False
).values('transfer__destination_point_id').annotate(count=Count('id')).values('count')

self.queryset = self.queryset.annotate(
pending_approval=Coalesce(Subquery(pending_subquery, output_field=IntegerField()), Value(0)),
approved=Coalesce(Subquery(approved_subquery, output_field=IntegerField()), Value(0))
)
if self.action != 'list_export_csv':
pending_subquery = models.Item.objects.filter(
transfer__destination_point_id=OuterRef('id'),
transfer__approval_status=models.Transfer.ApprovalStatus.PENDING,
hidden=False
).values('transfer__destination_point_id').annotate(count=Count('id')).values('count')

approved_subquery = models.Item.objects.filter(
transfer__destination_point_id=OuterRef('id'),
transfer__approval_status=models.Transfer.ApprovalStatus.APPROVED,
hidden=False
).values('transfer__destination_point_id').annotate(count=Count('id')).values('count')

self.queryset = self.queryset.annotate(
pending_approval=Coalesce(Subquery(pending_subquery, output_field=IntegerField()), Value(0)),
approved=Coalesce(Subquery(approved_subquery, output_field=IntegerField()), Value(0))
)

return self.queryset

Expand Down