Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [Application Execution](#application-execution)
- [API Testing with Postman](#api-testing-with-postman)
- [Database Interactions](#database-interactions)
- [Geocoding Update](#geocoding-update)
- [Version Control Guide](#version-control-guide)
- [Branching](#branching)
- [Docker Commands](#docker-commands)
Expand Down Expand Up @@ -157,6 +158,44 @@ SELECT * FROM users; # Run SQL queries
docker-compose exec backend python app/seed_database.py
```

## Geocoding Update

The system includes an automated geocoding refresh cron job that updates location coordinates using the Google Maps Geocoding API.

### Features

- **Automatic refresh** of location coordinates that are NULL or older than 30 days
- **Respects `route_archive_after`** setting in admin_info (default: 30 days)
- **Scheduled job** that runs daily at midnight EST
- **Manual trigger** via API endpoint

### Test Data

Seed the database with test locations for geocoding:

```bash
docker-compose exec backend python -w /app app/seed_geocoding_test_data.py
```

### Before Update

Locations with NULL coordinates or stale geocoding data:

![Before Geocoding Update](assets/screanshots/before_test.png)

### After Update

Locations with refreshed coordinates from Google Geocoding API:

![After Geocoding Update](assets/screanshots/after_test.png)

### Manual Trigger

```bash
# Trigger geocoding refresh job manually
docker-compose exec backend python -c "from app.services.jobs.geocoding_refresh import run_geocoding_refresh_job; import asyncio; asyncio.run(run_geocoding_refresh_job())"
```

## Version Control Guide

### Branching
Expand Down
Binary file added assets/screanshots/after_test.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/screanshots/before_test.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions backend/python/app/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class AdminBase(SQLModel):
admin_email: EmailStr = Field(nullable=False)
route_start_time: datetime.time | None = Field(default=None)
warehouse_location: str | None = Field(default=None, min_length=1)
route_archive_after: int = Field(default=30, nullable=False)

@field_validator("admin_phone")
@classmethod
Expand Down Expand Up @@ -55,3 +56,4 @@ class AdminUpdate(SQLModel):
admin_email: EmailStr | None = Field(default=None)
route_start_time: datetime.time | None = Field(default=None)
warehouse_location: str | None = Field(default=None, min_length=1)
route_archive_after: int | None = Field(default=None)
3 changes: 3 additions & 0 deletions backend/python/app/models/location.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from typing import TYPE_CHECKING
from uuid import UUID, uuid4

Expand Down Expand Up @@ -27,6 +28,7 @@ class LocationBase(SQLModel):
num_children: int | None = None
num_boxes: int
notes: str = Field(default="")
geocoded_at: datetime.datetime | None = None


class Location(LocationBase, BaseModel, table=True):
Expand Down Expand Up @@ -68,3 +70,4 @@ class LocationUpdate(SQLModel):
num_children: int | None = None
num_boxes: int | None = None
notes: str | None = None
geocoded_at: datetime.datetime | None = None
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from uuid import UUID
from datetime import datetime

from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
Expand Down Expand Up @@ -73,6 +74,7 @@ async def create_location(
num_children=location_data.num_children,
num_boxes=location_data.num_boxes,
notes=location_data.notes,
geocoded_at=datetime.now() if (location_data.latitude and location_data.longitude) else None,
)

session.add(location)
Expand Down
20 changes: 20 additions & 0 deletions backend/python/app/services/implementations/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,28 @@ def async_wrapper() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Reinitialize database connection for this event loop
# This is necessary because asyncpg connections are tied to a specific event loop
import app.models as models_module
models_module.init_database()

# Update the session maker reference in job modules that import it
# This is needed because 'from module import name' creates a reference
# that doesn't update when the original module's global is reassigned
import sys
for module_name, module in sys.modules.items():
if module_name.startswith('app.services.jobs') and hasattr(module, 'async_session_maker_instance'):
module.async_session_maker_instance = models_module.async_session_maker_instance

# Run the async function
loop.run_until_complete(func())
finally:
# Clean up: close all pending tasks and the loop
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()

wrapped_func = async_wrapper
Expand Down
10 changes: 10 additions & 0 deletions backend/python/app/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
"""
from .driver_history_jobs import process_daily_driver_history
from .email_reminder_jobs import process_daily_reminder_emails
from .geocoding_refresh_jobs import refresh_geocoding

# Driver history mark daily routes as complete
scheduler_service.add_cron_job(
Expand All @@ -23,9 +24,18 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
minute=59,
)

# Email reminders - runs daily at 12 PM
scheduler_service.add_cron_job(
process_daily_reminder_emails,
job_id="daily_reminder_emails",
hour=12,
minute=0,
)

# Geocoding refresh - runs daily at 2 AM
scheduler_service.add_cron_job(
refresh_geocoding,
job_id="daily_geocoding_refresh",
hour=2,
minute=0,
)
118 changes: 118 additions & 0 deletions backend/python/app/services/jobs/geocoding_refresh_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Geocoding refresh scheduled jobs"""

from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from uuid import UUID

from sqlalchemy import and_, or_
from sqlmodel import select

from app.dependencies.services import get_logger
from app.models import async_session_maker_instance
from app.models.admin import Admin
from app.models.location import Location
from app.utilities.geocoding import geocode, geocode_addresses


async def refresh_geocoding() -> None:
"""Refresh geocoding for locations and warehouse - runs daily

This job:
1. Gets the route_archive_after setting from admin_info (default 30 days)
2. Finds all locations that need refreshing (geocoded_at is null or older than threshold)
3. Refreshes lat/lon for those locations using Google Geocoding API
4. Updates geocoded_at timestamp for refreshed locations
5. Refreshes the warehouse location in admin_info table
"""
logger = get_logger()

if async_session_maker_instance is None:
logger.error("Database session maker not initialized")
return

try:
async with async_session_maker_instance() as session:
# Get the admin settings to determine archive threshold
admin_statement = select(Admin)
admin_result = await session.execute(admin_statement)
admin_record = admin_result.scalars().first()

if not admin_record:
logger.warning("No admin record found, using default threshold of 30 days")
archive_threshold_days = 30
else:
archive_threshold_days = admin_record.route_archive_after or 30
logger.info(f"Using route_archive_after threshold: {archive_threshold_days} days")

# Calculate the cutoff date
cutoff_date = datetime.now() - timedelta(days=archive_threshold_days)

# Find all locations that need refreshing
# (geocoded_at is null OR geocoded_at < cutoff_date)
location_statement = select(Location).where(
or_(
Location.geocoded_at.is_(None),
Location.geocoded_at < cutoff_date
)
)
location_result = await session.execute(location_statement)
locations_to_refresh = location_result.scalars().all()

if not locations_to_refresh:
logger.info("No locations need geocoding refresh")
else:
logger.info(f"Found {len(locations_to_refresh)} locations to refresh")

# Collect addresses and their corresponding location IDs
addresses = [loc.address for loc in locations_to_refresh]

# Batch geocode all addresses
geocoding_results = await geocode_addresses(addresses)

# Update each location with new coordinates
refreshed_count = 0
for location, coords in zip(locations_to_refresh, geocoding_results):
if coords:
location.latitude = coords["lat"]
location.longitude = coords["lng"]
location.geocoded_at = datetime.now()
refreshed_count += 1
logger.info(
f"Refreshed geocoding for location {location.location_id} "
f"({location.address}): {coords}"
)
else:
logger.warning(
f"Failed to geocode location {location.location_id} "
f"({location.address})"
)

await session.commit()
logger.info(f"Successfully refreshed {refreshed_count}/{len(locations_to_refresh)} locations")

# Refresh warehouse location if it exists
if admin_record and admin_record.warehouse_location:
logger.info("Refreshing warehouse location geocoding")
warehouse_coords = await geocode(admin_record.warehouse_location)

if warehouse_coords:
# Store warehouse coordinates in a separate table or add fields to admin
# For now, we'll just log it since admin table doesn't have lat/lng fields
logger.info(
f"Warehouse location geocoded: {admin_record.warehouse_location} -> {warehouse_coords}"
)
# TODO: Add warehouse_latitude and warehouse_longitude fields to admin table if needed
else:
logger.warning(
f"Failed to geocode warehouse location: {admin_record.warehouse_location}"
)

except Exception as error:
logger.error(
f"Failed to refresh geocoding: {error!s}", exc_info=True
)
raise error
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Add geocoding refresh fields

Revision ID: 6dc0749d4ffd
Revises: 6e14b59510ce4d28a7af0b0f3f4d3385
Create Date: 2025-11-28 21:00:00.000000

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '6dc0749d4ffd'
down_revision = '6e14b59510ce4d28a7af0b0f3f4d3385'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column('locations', sa.Column('geocoded_at', sa.DateTime(), nullable=True))
op.add_column('admin_info', sa.Column('route_archive_after', sa.Integer(), nullable=False, server_default='30'))
op.alter_column('admin_info', 'route_archive_after', nullable=False)


def downgrade() -> None:
op.drop_column('admin_info', 'route_archive_after')
op.drop_column('locations', 'geocoded_at')
Loading