Skip to content

Commit 38f78bb

Browse files
authored
Geocoding Cron Job (#76)
* Add geocoding refresh cron job and related fields - Add route_archive_after field to admin_info table (default: 30 days) - Add geocoded_at field to locations table to track when address was geocoded - Set geocoded_at timestamp when creating locations with coordinates - Create daily cron job to refresh geocoding for locations older than threshold - Fix scheduler service to handle event loop isolation for async jobs - Add database migration for new fields The cron job runs daily at 2 AM and refreshes lat/lon for locations where geocoded_at is NULL or older than route_archive_after days from admin settings. * Update README to include Geocoding Update section and add screenshots - Added a new section detailing the automated geocoding refresh cron job. - Included features of the geocoding update, such as automatic refresh and manual trigger. - Added test data instructions and before/after screenshots to illustrate the geocoding process. * Resolved commits on PR * removed appropriate migrations. * Refactor geocoding refresh logic into helper functions - Introduced `_get_archive_threshold`, `_get_locations_to_refresh`, and `_refresh_locations` to modularize the geocoding refresh process. - Updated `refresh_geocoding` to utilize these new functions for improved readability and maintainability. - Removed redundant code and streamlined the process of fetching and refreshing location data.
1 parent 1a38c8c commit 38f78bb

File tree

6 files changed

+153
-0
lines changed

6 files changed

+153
-0
lines changed

backend/python/app/models/admin.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class AdminBase(SQLModel):
1818
admin_email: EmailStr = Field(nullable=False)
1919
route_start_time: datetime.time | None = Field(default=None)
2020
warehouse_location: str | None = Field(default=None, min_length=1)
21+
route_archive_after: int = Field(default=30, nullable=False)
2122

2223
@field_validator("admin_phone")
2324
@classmethod
@@ -55,3 +56,4 @@ class AdminUpdate(SQLModel):
5556
admin_email: EmailStr | None = Field(default=None)
5657
route_start_time: datetime.time | None = Field(default=None)
5758
warehouse_location: str | None = Field(default=None, min_length=1)
59+
route_archive_after: int | None = Field(default=None)

backend/python/app/models/location.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
from typing import TYPE_CHECKING
23
from uuid import UUID, uuid4
34

@@ -27,6 +28,7 @@ class LocationBase(SQLModel):
2728
num_children: int | None = None
2829
num_boxes: int
2930
notes: str = Field(default="")
31+
geocoded_at: datetime.datetime | None = None
3032

3133

3234
class Location(LocationBase, BaseModel, table=True):
@@ -68,3 +70,4 @@ class LocationUpdate(SQLModel):
6870
num_children: int | None = None
6971
num_boxes: int | None = None
7072
notes: str | None = None
73+
geocoded_at: datetime.datetime | None = None

backend/python/app/services/implementations/location_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from uuid import UUID
3+
from datetime import datetime
34

45
from sqlalchemy.ext.asyncio import AsyncSession
56
from sqlmodel import select
@@ -73,6 +74,7 @@ async def create_location(
7374
num_children=location_data.num_children,
7475
num_boxes=location_data.num_boxes,
7576
notes=location_data.notes,
77+
geocoded_at=datetime.now() if (location_data.latitude and location_data.longitude) else None,
7678
)
7779

7880
session.add(location)

backend/python/app/services/implementations/scheduler_service.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,28 @@ def async_wrapper() -> None:
7575
loop = asyncio.new_event_loop()
7676
asyncio.set_event_loop(loop)
7777
try:
78+
# Reinitialize database connection for this event loop
79+
# This is necessary because asyncpg connections are tied to a specific event loop
80+
import app.models as models_module
81+
models_module.init_database()
82+
83+
# Update the session maker reference in job modules that import it
84+
# This is needed because 'from module import name' creates a reference
85+
# that doesn't update when the original module's global is reassigned
86+
import sys
87+
for module_name, module in sys.modules.items():
88+
if module_name.startswith('app.services.jobs') and hasattr(module, 'async_session_maker_instance'):
89+
module.async_session_maker_instance = models_module.async_session_maker_instance
90+
91+
# Run the async function
7892
loop.run_until_complete(func())
7993
finally:
94+
# Clean up: close all pending tasks and the loop
95+
pending = asyncio.all_tasks(loop)
96+
for task in pending:
97+
task.cancel()
98+
if pending:
99+
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
80100
loop.close()
81101

82102
wrapped_func = async_wrapper

backend/python/app/services/jobs/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
1414
"""
1515
from .driver_history_jobs import process_daily_driver_history
1616
from .email_reminder_jobs import process_daily_reminder_emails
17+
from .geocoding_refresh_jobs import refresh_geocoding
1718

1819
# Driver history mark daily routes as complete
1920
scheduler_service.add_cron_job(
@@ -23,9 +24,18 @@ def init_jobs(scheduler_service: SchedulerService) -> None:
2324
minute=59,
2425
)
2526

27+
# Email reminders - runs daily at 12 PM
2628
scheduler_service.add_cron_job(
2729
process_daily_reminder_emails,
2830
job_id="daily_reminder_emails",
2931
hour=12,
3032
minute=0,
3133
)
34+
35+
# Geocoding refresh - runs daily at 2 AM
36+
scheduler_service.add_cron_job(
37+
refresh_geocoding,
38+
job_id="daily_geocoding_refresh",
39+
hour=2,
40+
minute=0,
41+
)
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""Geocoding refresh scheduled jobs"""
2+
3+
from __future__ import annotations
4+
5+
from datetime import datetime, timedelta
6+
from typing import TYPE_CHECKING
7+
8+
if TYPE_CHECKING:
9+
from uuid import UUID
10+
11+
from sqlalchemy import and_, or_
12+
from sqlmodel import select
13+
14+
from app.dependencies.services import get_logger
15+
from app.models import async_session_maker_instance
16+
from app.models.admin import Admin
17+
from app.models.location import Location
18+
from app.utilities.geocoding import geocode_addresses
19+
20+
21+
async def _get_archive_threshold(session) -> int:
22+
"""Get the route archive threshold from admin settings"""
23+
logger = get_logger()
24+
statement = select(Admin)
25+
result = await session.execute(statement)
26+
admin_record = result.scalars().first()
27+
28+
if not admin_record:
29+
logger.warning("No admin record found, using default threshold of 30 days")
30+
return 30
31+
32+
threshold = admin_record.route_archive_after or 30
33+
logger.info(f"Using route_archive_after threshold: {threshold} days")
34+
return threshold
35+
36+
37+
async def _get_locations_to_refresh(session, cutoff_date) -> list[Location]:
38+
"""Query locations that need geocoding refresh"""
39+
logger = get_logger()
40+
statement = select(Location).where(
41+
or_(
42+
Location.geocoded_at.is_(None),
43+
Location.geocoded_at < cutoff_date
44+
)
45+
)
46+
result = await session.execute(statement)
47+
locations = result.scalars().all()
48+
logger.info(f"Found {len(locations)} locations to refresh")
49+
return locations
50+
51+
52+
async def _refresh_locations(session, locations: list[Location]) -> int:
53+
"""Batch geocode and update locations, returns count of successfully refreshed"""
54+
logger = get_logger()
55+
56+
if not locations:
57+
logger.info("No locations need geocoding refresh")
58+
return 0
59+
60+
addresses = [loc.address for loc in locations]
61+
geocoding_results = await geocode_addresses(addresses)
62+
63+
refreshed_count = 0
64+
for location, coords in zip(locations, geocoding_results):
65+
if coords:
66+
location.latitude = coords["lat"]
67+
location.longitude = coords["lng"]
68+
location.geocoded_at = datetime.now()
69+
refreshed_count += 1
70+
logger.info(
71+
f"Refreshed geocoding for location {location.location_id} "
72+
f"({location.address}): {coords}"
73+
)
74+
else:
75+
logger.warning(
76+
f"Failed to geocode location {location.location_id} "
77+
f"({location.address})"
78+
)
79+
80+
await session.commit()
81+
logger.info(f"Successfully refreshed {refreshed_count}/{len(locations)} locations")
82+
return refreshed_count
83+
84+
85+
async def refresh_geocoding() -> None:
86+
"""Refresh geocoding for locations - runs daily
87+
88+
This job:
89+
1. Gets the route_archive_after setting from admin_info (default 30 days)
90+
2. Finds all locations that need refreshing (geocoded_at is null or older than threshold)
91+
3. Refreshes lat/lon for those locations using Google Geocoding API
92+
4. Updates geocoded_at timestamp for refreshed locations
93+
"""
94+
logger = get_logger()
95+
96+
if async_session_maker_instance is None:
97+
logger.error("Database session maker not initialized")
98+
return
99+
100+
try:
101+
async with async_session_maker_instance() as session:
102+
admin_statement = select(Admin)
103+
admin_result = await session.execute(admin_statement)
104+
admin_record = admin_result.scalars().first()
105+
106+
archive_threshold_days = await _get_archive_threshold(session)
107+
cutoff_date = datetime.now() - timedelta(days=archive_threshold_days)
108+
109+
locations = await _get_locations_to_refresh(session, cutoff_date)
110+
await _refresh_locations(session, locations)
111+
112+
except Exception as error:
113+
logger.error(
114+
f"Failed to refresh geocoding: {error!s}", exc_info=True
115+
)
116+
raise error

0 commit comments

Comments
 (0)