Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"pytest-asyncio~=1.3.0",
"aiosqlite~=0.22.0",
"googlemaps~=4.10.0",
"openpyxl~=3.1.0",
]

[tool.pytest.ini_options]
Expand Down
8 changes: 4 additions & 4 deletions backend/src/modules/party/party_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ async def get_parties_csv(
raise BadRequestException("Start date must be less than or equal to end date")

parties = await party_service.get_parties_by_date_range(start_datetime, end_datetime)
csv_content = await party_service.export_parties_to_csv(parties)
excel_content = await party_service.export_parties_to_excel(parties)

return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=parties.csv"},
content=excel_content,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": "attachment; filename=parties.xlsx"},
)


Expand Down
228 changes: 117 additions & 111 deletions backend/src/modules/party/party_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import csv
import io
import math
from datetime import UTC, datetime, timedelta

from fastapi import Depends
from openpyxl import Workbook
from openpyxl.styles import Font
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -490,130 +491,135 @@ def _calculate_haversine_distance(
r = 3959
return c * r

async def export_parties_to_csv(self, parties: list[PartyDto]) -> str:
async def export_parties_to_excel(self, parties: list[PartyDto]) -> bytes:
"""
Export a list of parties to CSV format.
Export a list of parties to Excel format with formatting.

Args:
parties: List of Party models to export

Returns:
CSV content as a string
Excel file content as bytes
"""
if not parties:
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
[
"Fully formatted address",
"Date of Party",
"Time of Party",
"Contact One Full Name",
"Contact One Email",
"Contact One Phone Number",
"Contact One Contact Preference",
"Contact Two Full Name",
"Contact Two Email",
"Contact Two Phone Number",
"Contact Two Contact Preference",
]
wb = Workbook()
ws = wb.active
ws.title = "Parties"

# Define headers
headers = [
"Fully formatted address",
"Date of Party",
"Time of Party",
"Contact One Full Name",
"Contact One Email",
"Contact One Phone Number",
"Contact One Contact Preference",
"Contact Two Full Name",
"Contact Two Email",
"Contact Two Phone Number",
"Contact Two Contact Preference",
]

ws.append(headers)
for cell in ws[1]:
cell.font = Font(bold=True)

if parties:
party_ids = [party.id for party in parties]

result = await self.session.execute(
select(PartyEntity)
.options(
selectinload(PartyEntity.location),
selectinload(PartyEntity.contact_one).selectinload(StudentEntity.account),
)
.where(PartyEntity.id.in_(party_ids))
)
return output.getvalue()
party_entities = result.scalars().all()

party_ids = [party.id for party in parties]
party_entity_map = {party.id: party for party in party_entities}

result = await self.session.execute(
select(PartyEntity)
.options(
selectinload(PartyEntity.location),
selectinload(PartyEntity.contact_one).selectinload(StudentEntity.account),
)
.where(PartyEntity.id.in_(party_ids))
)
party_entities = result.scalars().all()

party_entity_map = {party.id: party for party in party_entities}

output = io.StringIO()
writer = csv.writer(output)

writer.writerow(
[
"Fully formatted address",
"Date of Party",
"Time of Party",
"Contact One Full Name",
"Contact One Email",
"Contact One Phone Number",
"Contact One Contact Preference",
"Contact Two Full Name",
"Contact Two Email",
"Contact Two Phone Number",
"Contact Two Contact Preference",
]
)
for party in parties:
party_entity = party_entity_map.get(party.id)
if party_entity is None:
continue

for party in parties:
party_entity = party_entity_map.get(party.id)
if party_entity is None:
continue
# Format address
formatted_address = ""
if party_entity.location:
formatted_address = party_entity.location.formatted_address or ""

# Format date and time
party_date = (
party.party_datetime.strftime("%Y-%m-%d") if party.party_datetime else ""
)
party_time = (
party.party_datetime.strftime("%H:%M:%S") if party.party_datetime else ""
)

# Format address
formatted_address = ""
if party_entity.location:
formatted_address = party_entity.location.formatted_address or ""

# Format date and time
party_date = party.party_datetime.strftime("%Y-%m-%d") if party.party_datetime else ""
party_time = party.party_datetime.strftime("%H:%M:%S") if party.party_datetime else ""

contact_one_full_name = ""
contact_one_email = ""
contact_one_phone = ""
contact_one_preference = ""
if party_entity.contact_one:
contact_one_full_name = (
f"{party_entity.contact_one.account.first_name} "
f"{party_entity.contact_one.account.last_name}"
contact_one_full_name = ""
contact_one_email = ""
contact_one_phone = ""
contact_one_preference = ""
if party_entity.contact_one:
contact_one_full_name = (
f"{party_entity.contact_one.account.first_name} "
f"{party_entity.contact_one.account.last_name}"
)
contact_one_phone = party_entity.contact_one.phone_number or ""
contact_one_preference = (
party_entity.contact_one.contact_preference.value
if party_entity.contact_one.contact_preference
else ""
)
if party_entity.contact_one.account:
contact_one_email = party_entity.contact_one.account.email or ""

contact_two_full_name = ""
contact_two_email = ""
contact_two_phone = ""
contact_two_preference = ""
contact_two_full_name = (
f"{party_entity.contact_two_first_name} {party_entity.contact_two_last_name}"
)
contact_one_phone = party_entity.contact_one.phone_number or ""
contact_one_preference = (
party_entity.contact_one.contact_preference.value
if party_entity.contact_one.contact_preference
contact_two_phone = party_entity.contact_two_phone_number or ""
contact_two_preference = (
party_entity.contact_two_contact_preference.value
if party_entity.contact_two_contact_preference
else ""
)
if party_entity.contact_one.account:
contact_one_email = party_entity.contact_one.account.email or ""

contact_two_full_name = ""
contact_two_email = ""
contact_two_phone = ""
contact_two_preference = ""
contact_two_full_name = (
f"{party_entity.contact_two_first_name} {party_entity.contact_two_last_name}"
)
contact_two_phone = party_entity.contact_two_phone_number or ""
contact_two_preference = (
party_entity.contact_two_contact_preference.value
if party_entity.contact_two_contact_preference
else ""
)
contact_two_email = party_entity.contact_two_email or ""

writer.writerow(
[
formatted_address,
party_date,
party_time,
contact_one_full_name,
contact_one_email,
contact_one_phone,
contact_one_preference,
contact_two_full_name,
contact_two_email,
contact_two_phone,
contact_two_preference,
]
)
contact_two_email = party_entity.contact_two_email or ""

ws.append(
[
formatted_address,
party_date,
party_time,
contact_one_full_name,
contact_one_email,
contact_one_phone,
contact_one_preference,
contact_two_full_name,
contact_two_email,
contact_two_phone,
contact_two_preference,
]
)

# Auto-fit column widths based on content
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width

output = io.BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
54 changes: 40 additions & 14 deletions backend/test/modules/party/party_router_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from datetime import UTC, datetime, timedelta
from io import BytesIO

import pytest
import pytest_asyncio
from httpx import AsyncClient
from openpyxl import load_workbook
from src.modules.account.account_entity import AccountRole
from src.modules.location.location_service import LocationHoldActiveException
from src.modules.party.party_model import PartyDto
Expand Down Expand Up @@ -448,25 +450,33 @@ def _setup(self, party_utils: PartyTestUtils, admin_client: AsyncClient):

@pytest.mark.asyncio
async def test_get_parties_csv_empty(self):
"""Test CSV export with no parties."""
"""Test Excel export with no parties."""
now = datetime.now(UTC)
params = {
"start_date": now.strftime("%Y-%m-%d"),
"end_date": (now + timedelta(days=30)).strftime("%Y-%m-%d"),
}
response = await self.admin_client.get("/api/parties/csv", params=params)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv; charset=utf-8"
assert (
response.headers["content-type"]
== "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)

# Parse Excel content
wb = load_workbook(BytesIO(response.content))
ws = wb.active

assert ws.max_row == 1

csv_content = response.text
lines = csv_content.strip().split("\n")
assert len(lines) == 1 # Only header row
# Check for expected headers in the CSV
assert "Contact One Email" in lines[0] or "Contact Two Email" in lines[0]
assert ws["A1"].value == "Fully formatted address"
assert ws["A1"].font.bold is True
assert "Contact One Email" in [cell.value for cell in ws[1]]
assert "Contact Two Email" in [cell.value for cell in ws[1]]

@pytest.mark.asyncio
async def test_get_parties_csv_with_data(self):
"""Test CSV export with parties."""
"""Test Excel export with parties."""
parties = await self.party_utils.create_many(i=3)

# Get date range that covers all parties
Expand All @@ -477,15 +487,31 @@ async def test_get_parties_csv_with_data(self):
}
response = await self.admin_client.get("/api/parties/csv", params=params)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv; charset=utf-8"
assert (
response.headers["content-type"]
== "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)

wb = load_workbook(BytesIO(response.content))
ws = wb.active

assert ws.max_row == 4
assert ws["A1"].font.bold is True

csv_content = response.text
lines = csv_content.strip().split("\n")
assert len(lines) == 4 # Header + 3 data rows
all_cell_values = []
for row in ws.iter_rows(values_only=True):
cell_strs = [str(cell) if cell is not None else "" for cell in row]
all_cell_values.extend(cell_strs)

# Verify party IDs are in CSV
excel_content = " ".join(all_cell_values)
for party in parties:
assert str(party.id) in csv_content
has_account = party.contact_one and party.contact_one.account
contact_one_email = party.contact_one.account.email if has_account else None
contact_two_email = party.contact_two_email
email_found = (contact_one_email and contact_one_email in excel_content) or (
contact_two_email and contact_two_email in excel_content
)
assert email_found

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down
Loading