Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/python/app/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class AdminBase(SQLModel):
admin_email: EmailStr = Field(nullable=False)
route_start_time: datetime.time | None = Field(default=None)
warehouse_location: str | None = Field(default=None, min_length=1)
route_archive_after: int = Field(default=30, nullable=False)

@field_validator("admin_phone")
@classmethod
Expand Down Expand Up @@ -55,3 +56,4 @@ class AdminUpdate(SQLModel):
admin_email: EmailStr | None = Field(default=None)
route_start_time: datetime.time | None = Field(default=None)
warehouse_location: str | None = Field(default=None, min_length=1)
route_archive_after: int | None = Field(default=None)
3 changes: 3 additions & 0 deletions backend/python/app/models/location.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from typing import TYPE_CHECKING
from uuid import UUID, uuid4

Expand Down Expand Up @@ -27,6 +28,7 @@ class LocationBase(SQLModel):
num_children: int | None = None
num_boxes: int
notes: str = Field(default="")
geocoded_at: datetime.datetime | None = None


class Location(LocationBase, BaseModel, table=True):
Expand Down Expand Up @@ -68,3 +70,4 @@ class LocationUpdate(SQLModel):
num_children: int | None = None
num_boxes: int | None = None
notes: str | None = None
geocoded_at: datetime.datetime | None = None
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from uuid import UUID
from datetime import datetime

from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
Expand Down Expand Up @@ -73,6 +74,7 @@ async def create_location(
num_children=location_data.num_children,
num_boxes=location_data.num_boxes,
notes=location_data.notes,
geocoded_at=datetime.now() if (location_data.latitude and location_data.longitude) else None,
)

session.add(location)
Expand Down
20 changes: 20 additions & 0 deletions backend/python/app/services/implementations/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,28 @@ def async_wrapper() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Reinitialize database connection for this event loop
# This is necessary because asyncpg connections are tied to a specific event loop
import app.models as models_module
models_module.init_database()

# Update the session maker reference in job modules that import it
# This is needed because 'from module import name' creates a reference
# that doesn't update when the original module's global is reassigned
import sys
for module_name, module in sys.modules.items():
if module_name.startswith('app.services.jobs') and hasattr(module, 'async_session_maker_instance'):
module.async_session_maker_instance = models_module.async_session_maker_instance

# Run the async function
loop.run_until_complete(func())
finally:
# Clean up: close all pending tasks and the loop
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()

wrapped_func = async_wrapper
Expand Down
10 changes: 10 additions & 0 deletions backend/python/app/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
"""
from .driver_history_jobs import process_daily_driver_history
from .email_reminder_jobs import process_daily_reminder_emails
from .geocoding_refresh_jobs import refresh_geocoding

# Driver history mark daily routes as complete
scheduler_service.add_cron_job(
Expand All @@ -23,9 +24,18 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
minute=59,
)

# Email reminders - runs daily at 12 PM
scheduler_service.add_cron_job(
process_daily_reminder_emails,
job_id="daily_reminder_emails",
hour=12,
minute=0,
)

# Geocoding refresh - runs daily at 2 AM
scheduler_service.add_cron_job(
refresh_geocoding,
job_id="daily_geocoding_refresh",
hour=2,
minute=0,
)
116 changes: 116 additions & 0 deletions backend/python/app/services/jobs/geocoding_refresh_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Geocoding refresh scheduled jobs"""

from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from uuid import UUID

from sqlalchemy import and_, or_
from sqlmodel import select

from app.dependencies.services import get_logger
from app.models import async_session_maker_instance
from app.models.admin import Admin
from app.models.location import Location
from app.utilities.geocoding import geocode_addresses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the app.utilities.google_maps_client instead of the raw functions here :)



async def _get_archive_threshold(session) -> int:
"""Get the route archive threshold from admin settings"""
logger = get_logger()
statement = select(Admin)
result = await session.execute(statement)
admin_record = result.scalars().first()

if not admin_record:
logger.warning("No admin record found, using default threshold of 30 days")
return 30

threshold = admin_record.route_archive_after or 30
logger.info(f"Using route_archive_after threshold: {threshold} days")
return threshold


async def _get_locations_to_refresh(session, cutoff_date) -> list[Location]:
"""Query locations that need geocoding refresh"""
logger = get_logger()
statement = select(Location).where(
or_(
Location.geocoded_at.is_(None),
Location.geocoded_at < cutoff_date
)
)
result = await session.execute(statement)
locations = result.scalars().all()
logger.info(f"Found {len(locations)} locations to refresh")
return locations


async def _refresh_locations(session, locations: list[Location]) -> int:
"""Batch geocode and update locations, returns count of successfully refreshed"""
logger = get_logger()

if not locations:
logger.info("No locations need geocoding refresh")
return 0

addresses = [loc.address for loc in locations]
geocoding_results = await geocode_addresses(addresses)

refreshed_count = 0
for location, coords in zip(locations, geocoding_results):
if coords:
location.latitude = coords["lat"]
location.longitude = coords["lng"]
location.geocoded_at = datetime.now()
refreshed_count += 1
logger.info(
f"Refreshed geocoding for location {location.location_id} "
f"({location.address}): {coords}"
)
else:
logger.warning(
f"Failed to geocode location {location.location_id} "
f"({location.address})"
)

await session.commit()
logger.info(f"Successfully refreshed {refreshed_count}/{len(locations)} locations")
return refreshed_count


async def refresh_geocoding() -> None:
"""Refresh geocoding for locations - runs daily

This job:
1. Gets the route_archive_after setting from admin_info (default 30 days)
2. Finds all locations that need refreshing (geocoded_at is null or older than threshold)
3. Refreshes lat/lon for those locations using Google Geocoding API
4. Updates geocoded_at timestamp for refreshed locations
"""
logger = get_logger()

if async_session_maker_instance is None:
logger.error("Database session maker not initialized")
return

try:
async with async_session_maker_instance() as session:
admin_statement = select(Admin)
admin_result = await session.execute(admin_statement)
admin_record = admin_result.scalars().first()

archive_threshold_days = await _get_archive_threshold(session)
cutoff_date = datetime.now() - timedelta(days=archive_threshold_days)

locations = await _get_locations_to_refresh(session, cutoff_date)
await _refresh_locations(session, locations)

except Exception as error:
logger.error(
f"Failed to refresh geocoding: {error!s}", exc_info=True
)
raise error
Loading