diff --git a/backend/python/app/models/admin.py b/backend/python/app/models/admin.py index fbe5ce21..8f771fd5 100644 --- a/backend/python/app/models/admin.py +++ b/backend/python/app/models/admin.py @@ -14,6 +14,7 @@ class AdminBase(SQLModel): receive_email_notifications: bool = Field(default=True, nullable=False) admin_phone: str = Field(min_length=1, max_length=100, nullable=False) + route_archive_after: int = Field(default=30, nullable=False) @field_validator("admin_phone") @classmethod @@ -62,3 +63,4 @@ class AdminUpdate(SQLModel): # user fields name: str | None = Field(default=None, min_length=1, max_length=255) email: EmailStr | None = Field(default=None) + route_archive_after: int | None = Field(default=None) \ No newline at end of file diff --git a/backend/python/app/models/location.py b/backend/python/app/models/location.py index 94e59c39..ae731bc2 100644 --- a/backend/python/app/models/location.py +++ b/backend/python/app/models/location.py @@ -1,3 +1,4 @@ +import datetime from typing import TYPE_CHECKING from uuid import UUID, uuid4 @@ -27,6 +28,7 @@ class LocationBase(SQLModel): num_children: int | None = None num_boxes: int notes: str = Field(default="") + geocoded_at: datetime.datetime | None = None class Location(LocationBase, BaseModel, table=True): @@ -68,3 +70,4 @@ class LocationUpdate(SQLModel): num_children: int | None = None num_boxes: int | None = None notes: str | None = None + geocoded_at: datetime.datetime | None = None diff --git a/backend/python/app/services/implementations/location_service.py b/backend/python/app/services/implementations/location_service.py index 30bf0367..94b9d68e 100644 --- a/backend/python/app/services/implementations/location_service.py +++ b/backend/python/app/services/implementations/location_service.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession @@ -73,6 +74,7 @@ async def create_location( num_children=location_data.num_children, num_boxes=location_data.num_boxes, notes=location_data.notes, + geocoded_at=datetime.now() if (location_data.latitude and location_data.longitude) else None, ) session.add(location) diff --git a/backend/python/app/services/implementations/scheduler_service.py b/backend/python/app/services/implementations/scheduler_service.py index 4167c273..de8c5fbb 100644 --- a/backend/python/app/services/implementations/scheduler_service.py +++ b/backend/python/app/services/implementations/scheduler_service.py @@ -75,8 +75,28 @@ def async_wrapper() -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: + # Reinitialize database connection for this event loop + # This is necessary because asyncpg connections are tied to a specific event loop + import app.models as models_module + models_module.init_database() + + # Update the session maker reference in job modules that import it + # This is needed because 'from module import name' creates a reference + # that doesn't update when the original module's global is reassigned + import sys + for module_name, module in sys.modules.items(): + if module_name.startswith("app.services.jobs") and hasattr(module, "async_session_maker_instance"): + module.async_session_maker_instance = models_module.async_session_maker_instance + + # Run the async function loop.run_until_complete(func()) finally: + # Clean up: close all pending tasks and the loop + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) loop.close() wrapped_func = async_wrapper diff --git a/backend/python/app/services/jobs/__init__.py b/backend/python/app/services/jobs/__init__.py index 248648c6..fb8084f7 100644 --- a/backend/python/app/services/jobs/__init__.py +++ b/backend/python/app/services/jobs/__init__.py @@ -14,6 +14,7 @@ def init_jobs(scheduler_service: SchedulerService) -> None: """ from .driver_history_jobs import process_daily_driver_history from .email_reminder_jobs import process_daily_reminder_emails + from .geocoding_refresh_jobs import refresh_geocoding # Driver history mark daily routes as complete scheduler_service.add_cron_job( @@ -23,9 +24,18 @@ def init_jobs(scheduler_service: SchedulerService) -> None: minute=59, ) + # Email reminders - runs daily at 12 PM scheduler_service.add_cron_job( process_daily_reminder_emails, job_id="daily_reminder_emails", hour=12, minute=0, ) + + # Geocoding refresh - runs daily at 2 AM + scheduler_service.add_cron_job( + refresh_geocoding, + job_id="daily_geocoding_refresh", + hour=2, + minute=0, + ) diff --git a/backend/python/app/services/jobs/geocoding_refresh_jobs.py b/backend/python/app/services/jobs/geocoding_refresh_jobs.py new file mode 100644 index 00000000..bc546101 --- /dev/null +++ b/backend/python/app/services/jobs/geocoding_refresh_jobs.py @@ -0,0 +1,108 @@ +"""Geocoding refresh scheduled jobs""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from sqlalchemy import or_ +from sqlmodel import select + +from app.dependencies.services import get_google_maps_client, get_logger +from app.models import async_session_maker_instance +from app.models.admin import Admin +from app.models.location import Location + + +async def _get_archive_threshold(session) -> int: + """Get the route archive threshold from admin settings""" + logger = get_logger() + statement = select(Admin) + result = await session.execute(statement) + admin_record = result.scalars().first() + + if not admin_record: + logger.warning("No admin record found, using default threshold of 30 days") + return 30 + + threshold = admin_record.route_archive_after or 30 + logger.info(f"Using route_archive_after threshold: {threshold} days") + return threshold + + +async def _get_locations_to_refresh(session, cutoff_date) -> list[Location]: + """Query locations that need geocoding refresh""" + logger = get_logger() + statement = select(Location).where( + or_( + Location.geocoded_at.is_(None), + Location.geocoded_at < cutoff_date + ) + ) + result = await session.execute(statement) + locations = result.scalars().all() + logger.info(f"Found {len(locations)} locations to refresh") + return locations + + +async def _refresh_locations(session, locations: list[Location]) -> int: + """Batch geocode and update locations, returns count of successfully refreshed""" + logger = get_logger() + + if not locations: + logger.info("No locations need geocoding refresh") + return 0 + + google_maps_client = get_google_maps_client() + addresses = [loc.address for loc in locations] + geocoding_results = await google_maps_client.geocode_addresses(addresses) + + refreshed_count = 0 + for location, geocode_result in zip(locations, geocoding_results, strict=True): + if geocode_result: + location.latitude = geocode_result.latitude + location.longitude = geocode_result.longitude + location.geocoded_at = datetime.now() + refreshed_count += 1 + logger.info( + f"Refreshed geocoding for location {location.location_id} " + f"({location.address}): lat={geocode_result.latitude}, lng={geocode_result.longitude}" + ) + else: + logger.warning( + f"Failed to geocode location {location.location_id} " + f"({location.address})" + ) + + await session.commit() + logger.info(f"Successfully refreshed {refreshed_count}/{len(locations)} locations") + return refreshed_count + + +async def refresh_geocoding() -> None: + """Refresh geocoding for locations - runs daily + + This job: + 1. Gets the route_archive_after setting from admin_info (default 30 days) + 2. Finds all locations that need refreshing (geocoded_at is null or older than threshold) + 3. Refreshes lat/lon for those locations using Google Geocoding API + 4. Updates geocoded_at timestamp for refreshed locations + """ + logger = get_logger() + + if async_session_maker_instance is None: + logger.error("Database session maker not initialized") + return + + try: + async with async_session_maker_instance() as session: + archive_threshold_days = await _get_archive_threshold(session) + cutoff_date = datetime.now() - timedelta(days=archive_threshold_days) + + locations = await _get_locations_to_refresh(session, cutoff_date) + await _refresh_locations(session, locations) + + except Exception as error: + logger.error( + f"Failed to refresh geocoding: {error!s}", exc_info=True + ) + raise error