Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/python/app/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class AdminBase(SQLModel):

receive_email_notifications: bool = Field(default=True, nullable=False)
admin_phone: str = Field(min_length=1, max_length=100, nullable=False)
route_archive_after: int = Field(default=30, nullable=False)

@field_validator("admin_phone")
@classmethod
Expand Down Expand Up @@ -62,3 +63,4 @@ class AdminUpdate(SQLModel):
# user fields
name: str | None = Field(default=None, min_length=1, max_length=255)
email: EmailStr | None = Field(default=None)
route_archive_after: int | None = Field(default=None)
3 changes: 3 additions & 0 deletions backend/python/app/models/location.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from typing import TYPE_CHECKING
from uuid import UUID, uuid4

Expand Down Expand Up @@ -27,6 +28,7 @@ class LocationBase(SQLModel):
num_children: int | None = None
num_boxes: int
notes: str = Field(default="")
geocoded_at: datetime.datetime | None = None


class Location(LocationBase, BaseModel, table=True):
Expand Down Expand Up @@ -68,3 +70,4 @@ class LocationUpdate(SQLModel):
num_children: int | None = None
num_boxes: int | None = None
notes: str | None = None
geocoded_at: datetime.datetime | None = None
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -73,6 +74,7 @@ async def create_location(
num_children=location_data.num_children,
num_boxes=location_data.num_boxes,
notes=location_data.notes,
geocoded_at=datetime.now() if (location_data.latitude and location_data.longitude) else None,
)

session.add(location)
Expand Down
20 changes: 20 additions & 0 deletions backend/python/app/services/implementations/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,28 @@ def async_wrapper() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Reinitialize database connection for this event loop
# This is necessary because asyncpg connections are tied to a specific event loop
import app.models as models_module
models_module.init_database()

# Update the session maker reference in job modules that import it
# This is needed because 'from module import name' creates a reference
# that doesn't update when the original module's global is reassigned
import sys
for module_name, module in sys.modules.items():
if module_name.startswith("app.services.jobs") and hasattr(module, "async_session_maker_instance"):
module.async_session_maker_instance = models_module.async_session_maker_instance

# Run the async function
loop.run_until_complete(func())
finally:
# Clean up: close all pending tasks and the loop
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()

wrapped_func = async_wrapper
Expand Down
10 changes: 10 additions & 0 deletions backend/python/app/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
"""
from .driver_history_jobs import process_daily_driver_history
from .email_reminder_jobs import process_daily_reminder_emails
from .geocoding_refresh_jobs import refresh_geocoding

# Driver history mark daily routes as complete
scheduler_service.add_cron_job(
Expand All @@ -23,9 +24,18 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
minute=59,
)

# Email reminders - runs daily at 12 PM
scheduler_service.add_cron_job(
process_daily_reminder_emails,
job_id="daily_reminder_emails",
hour=12,
minute=0,
)

# Geocoding refresh - runs daily at 2 AM
scheduler_service.add_cron_job(
refresh_geocoding,
job_id="daily_geocoding_refresh",
hour=2,
minute=0,
)
108 changes: 108 additions & 0 deletions backend/python/app/services/jobs/geocoding_refresh_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Geocoding refresh scheduled jobs"""

from __future__ import annotations

from datetime import datetime, timedelta

from sqlalchemy import or_
from sqlmodel import select

from app.dependencies.services import get_google_maps_client, get_logger
from app.models import async_session_maker_instance
from app.models.admin import Admin
from app.models.location import Location


async def _get_archive_threshold(session) -> int:
"""Get the route archive threshold from admin settings"""
logger = get_logger()
statement = select(Admin)
result = await session.execute(statement)
admin_record = result.scalars().first()

if not admin_record:
logger.warning("No admin record found, using default threshold of 30 days")
return 30

threshold = admin_record.route_archive_after or 30
logger.info(f"Using route_archive_after threshold: {threshold} days")
return threshold


async def _get_locations_to_refresh(session, cutoff_date) -> list[Location]:
"""Query locations that need geocoding refresh"""
logger = get_logger()
statement = select(Location).where(
or_(
Location.geocoded_at.is_(None),
Location.geocoded_at < cutoff_date
)
)
result = await session.execute(statement)
locations = result.scalars().all()
logger.info(f"Found {len(locations)} locations to refresh")
return locations


async def _refresh_locations(session, locations: list[Location]) -> int:
"""Batch geocode and update locations, returns count of successfully refreshed"""
logger = get_logger()

if not locations:
logger.info("No locations need geocoding refresh")
return 0

google_maps_client = get_google_maps_client()
addresses = [loc.address for loc in locations]
geocoding_results = await google_maps_client.geocode_addresses(addresses)

refreshed_count = 0
for location, geocode_result in zip(locations, geocoding_results, strict=True):
if geocode_result:
location.latitude = geocode_result.latitude
location.longitude = geocode_result.longitude
location.geocoded_at = datetime.now()
refreshed_count += 1
logger.info(
f"Refreshed geocoding for location {location.location_id} "
f"({location.address}): lat={geocode_result.latitude}, lng={geocode_result.longitude}"
)
else:
logger.warning(
f"Failed to geocode location {location.location_id} "
f"({location.address})"
)

await session.commit()
logger.info(f"Successfully refreshed {refreshed_count}/{len(locations)} locations")
return refreshed_count


async def refresh_geocoding() -> None:
"""Refresh geocoding for locations - runs daily

This job:
1. Gets the route_archive_after setting from admin_info (default 30 days)
2. Finds all locations that need refreshing (geocoded_at is null or older than threshold)
3. Refreshes lat/lon for those locations using Google Geocoding API
4. Updates geocoded_at timestamp for refreshed locations
"""
logger = get_logger()

if async_session_maker_instance is None:
logger.error("Database session maker not initialized")
return

try:
async with async_session_maker_instance() as session:
archive_threshold_days = await _get_archive_threshold(session)
cutoff_date = datetime.now() - timedelta(days=archive_threshold_days)

locations = await _get_locations_to_refresh(session, cutoff_date)
await _refresh_locations(session, locations)

except Exception as error:
logger.error(
f"Failed to refresh geocoding: {error!s}", exc_info=True
)
raise error
Loading