Skip to content

Commit e4abac5

Browse files
Pranav/cron job (#59)
* initial structure of mark as complete service * added infrastructure for cron jobs + implemented trial job * configure timezone appropriately * linting + formatting * fix lint issues * copilot review fixes * linter fixes
1 parent 26f0975 commit e4abac5

File tree

8 files changed

+425
-3
lines changed

8 files changed

+425
-3
lines changed

backend/python/app/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
from fastapi import FastAPI
77
from fastapi.middleware.cors import CORSMiddleware
88

9+
from app.dependencies.services import get_scheduler_service
10+
from app.services.jobs import init_jobs
11+
912
from .config import settings
1013
from .models import init_app as init_models
1114
from .routers import init_app as init_routers
@@ -119,11 +122,15 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
119122
initialize_firebase()
120123
init_models()
121124

125+
# Initialize scheduler
126+
scheduler_service = get_scheduler_service()
127+
scheduler_service.start()
128+
init_jobs(scheduler_service)
129+
122130
yield
123131

124-
# Shutdown
125-
# Add any cleanup code here if needed
126-
pass
132+
# Cleanup: stop the scheduler service during application shutdown
133+
scheduler_service.stop()
127134

128135

129136
def create_app() -> FastAPI:

backend/python/app/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ class Settings(BaseSettings):
5757
port: int = Field(default=8080)
5858
host: str = Field(default="0.0.0.0")
5959

60+
# Scheduler
61+
scheduler_timezone: str = Field(default="America/New_York")
62+
6063
# Preview deploy
6164
preview_deploy: bool = Field(default=False)
6265

backend/python/app/dependencies/services.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
MockRoutingAlgorithm,
2121
)
2222
from app.services.implementations.route_group_service import RouteGroupService
23+
from app.services.implementations.scheduler_service import SchedulerService
2324
from app.services.implementations.simple_entity_service import SimpleEntityService
2425
from app.services.protocols.routing_algorithm import RoutingAlgorithmProtocol
2526

@@ -105,3 +106,10 @@ def get_routing_algorithm() -> RoutingAlgorithmProtocol:
105106
Swap this to use a different algorithm implementation.
106107
"""
107108
return MockRoutingAlgorithm()
109+
110+
111+
@lru_cache
112+
def get_scheduler_service() -> SchedulerService:
113+
"""Get scheduler service instance"""
114+
logger = get_logger()
115+
return SchedulerService(logger)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import asyncio
2+
import logging
3+
from collections.abc import Callable
4+
from typing import Any
5+
from zoneinfo import ZoneInfo
6+
7+
from apscheduler.schedulers.background import ( # type: ignore[import-untyped]
8+
BackgroundScheduler,
9+
)
10+
from apscheduler.triggers.cron import CronTrigger # type: ignore[import-untyped]
11+
12+
from app.config import settings
13+
14+
15+
class SchedulerService:
16+
"""Centralized service for managing scheduled tasks (cron jobs)"""
17+
18+
def __init__(self, logger: logging.Logger):
19+
self.logger = logger
20+
self.scheduler: BackgroundScheduler | None = None
21+
self._is_running = False
22+
# Get timezone from settings
23+
self.timezone = ZoneInfo(settings.scheduler_timezone)
24+
25+
def start(self) -> None:
26+
"""Start the scheduler"""
27+
if self._is_running:
28+
self.logger.warning("Scheduler is already running")
29+
return
30+
31+
# Pass timezone to scheduler (applies to all jobs)
32+
self.scheduler = BackgroundScheduler(timezone=self.timezone)
33+
self.scheduler.start()
34+
self._is_running = True
35+
self.logger.info(f"Scheduler started with timezone: {self.timezone}")
36+
37+
def stop(self) -> None:
38+
"""Stop the scheduler"""
39+
if not self._is_running or self.scheduler is None:
40+
return
41+
42+
self.scheduler.shutdown(wait=True)
43+
self._is_running = False
44+
self.logger.info("Scheduler stopped")
45+
46+
def add_cron_job(
47+
self,
48+
func: Callable,
49+
job_id: str,
50+
hour: int | str = "*",
51+
minute: int | str = "*",
52+
day_of_week: int | str = "*",
53+
day: int | str = "*",
54+
month: int | str = "*",
55+
) -> None:
56+
"""Add a cron job to the scheduler
57+
58+
Args:
59+
func: The function to execute (can be sync or async)
60+
job_id: Unique identifier for the job
61+
hour: Hour (0-23) or '*' for every hour (in configured timezone)
62+
minute: Minute (0-59) or '*' for every minute
63+
day_of_week: Day of week (0-6, 0=Monday) or '*' for every day
64+
day: Day of month (1-31) or '*' for every day
65+
month: Month (1-12) or '*' for every month
66+
"""
67+
if not self._is_running or self.scheduler is None:
68+
raise RuntimeError("Scheduler must be started before adding jobs")
69+
70+
# Wrap async functions to run in event loop
71+
if asyncio.iscoroutinefunction(func):
72+
73+
def async_wrapper() -> None:
74+
# Create new event loop for the background thread
75+
loop = asyncio.new_event_loop()
76+
asyncio.set_event_loop(loop)
77+
try:
78+
loop.run_until_complete(func())
79+
finally:
80+
loop.close()
81+
82+
wrapped_func = async_wrapper
83+
else:
84+
wrapped_func = func
85+
86+
# Explicitly pass timezone to CronTrigger to ensure the trigger uses the intended timezone,
87+
# even though the scheduler also has a timezone setting.
88+
trigger = CronTrigger(
89+
hour=hour,
90+
minute=minute,
91+
day_of_week=day_of_week,
92+
day=day,
93+
month=month,
94+
timezone=self.timezone,
95+
)
96+
97+
self.scheduler.add_job(
98+
wrapped_func,
99+
trigger=trigger,
100+
id=job_id,
101+
replace_existing=True,
102+
)
103+
self.logger.info(
104+
f"Registered job '{job_id}' - schedule: {month}/{day} {hour}:{minute} (day_of_week={day_of_week}) in {self.timezone}"
105+
)
106+
107+
def remove_job(self, job_id: str) -> None:
108+
"""Remove a job from the scheduler"""
109+
if self.scheduler is None:
110+
return
111+
try:
112+
self.scheduler.remove_job(job_id)
113+
self.logger.info(f"Removed cron job '{job_id}'")
114+
except Exception as e:
115+
self.logger.warning(f"Failed to remove job '{job_id}': {e}")
116+
117+
def list_jobs(self) -> list[dict[str, Any]]:
118+
"""List all scheduled jobs"""
119+
if self.scheduler is None:
120+
return []
121+
return [
122+
{
123+
"id": job.id,
124+
"next_run": str(job.next_run_time),
125+
"trigger": str(job.trigger),
126+
}
127+
for job in self.scheduler.get_jobs()
128+
]
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Scheduled Jobs
2+
3+
This directory contains all scheduled tasks (cron jobs) for the Food4Kids platform.
4+
5+
### Architecture
6+
7+
```
8+
app/
9+
services/
10+
implementations/
11+
scheduler_service.py # Central scheduler management
12+
jobs/
13+
__init__.py # init_jobs() - registers all jobs
14+
driver_history_jobs.py # Example job implementation
15+
README.md # This file
16+
```
17+
18+
### How It Works
19+
20+
1. On application startup, `SchedulerService` is initialized and started
21+
2. `init_jobs()` is called, which imports all job modules and registers them with the scheduler
22+
3. APScheduler runs jobs in background threads according to their cron schedules
23+
4. Async job functions are automatically wrapped to run in their own event loops
24+
5. On application shutdown, the scheduler is gracefully stopped
25+
26+
## Creating Your Own Cron Job
27+
28+
Follow these steps to add a new scheduled job:
29+
30+
### Step 1: Create a Job File
31+
32+
Create a new file in this directory (e.g., `email_jobs.py`, `cleanup_jobs.py`):
33+
34+
```python
35+
"""Your job description"""
36+
import logging
37+
from app.dependencies.services import get_logger
38+
from app.models import async_session_maker_instance
39+
40+
41+
async def your_job_function() -> None:
42+
"""Description of what this job does
43+
"""
44+
logger = get_logger()
45+
46+
if async_session_maker_instance is None:
47+
logger.error("Database session maker not initialized")
48+
return
49+
50+
try:
51+
async with async_session_maker_instance() as session:
52+
# Your job logic here
53+
await session.commit()
54+
logger.info("Job completed successfully")
55+
except Exception as e:
56+
logger.error(f"Error in job: {e}", exc_info=True)
57+
if async_session_maker_instance:
58+
try:
59+
async with async_session_maker_instance() as session:
60+
await session.rollback()
61+
except Exception:
62+
pass
63+
raise
64+
```
65+
66+
### Step 2: Register the Job
67+
68+
Add your job to `__init__.py`:
69+
70+
```python
71+
def init_jobs(scheduler_service) -> None:
72+
from .driver_history_jobs import process_daily_driver_history
73+
from .email_jobs import your_job_function # Import your new job
74+
75+
# Existing jobs...
76+
scheduler_service.add_cron_job(
77+
process_daily_driver_history,
78+
job_id="daily_driver_history",
79+
hour=23,
80+
minute=59,
81+
)
82+
83+
# Register your new job
84+
scheduler_service.add_cron_job(
85+
your_job_function,
86+
job_id="your_job_id",
87+
hour=9,
88+
minute=0,
89+
day_of_week=0,
90+
)
91+
```
92+
93+
### Step 3: Cron Schedule Parameters
94+
95+
The `add_cron_job()` method accepts these parameters:
96+
97+
- `job_id` (required): Unique identifier for the job
98+
- `hour` (default: "*"): Hour (0-23) or "*" for every hour
99+
- `minute` (default: "*"): Minute (0-59) or "*" for every minute
100+
- `day_of_week` (default: "*"): Day of week (0-6, 0=Monday) or "*" for every day
101+
- `day` (default: "*"): Day of month (1-31) or "*" for every day
102+
- `month` (default: "*"): Month (1-12) or "*" for every month
103+
104+
### Testing
105+
106+
To test your job manually, you can call it directly:
107+
108+
```python
109+
from app.services.jobs.your_job_file import your_job_function
110+
import asyncio
111+
112+
asyncio.run(your_job_function())
113+
```
114+
115+
You can also create a test endpoint in development to trigger jobs manually.
116+
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Scheduled jobs - follows same pattern as routers"""
2+
3+
from app.services.implementations.scheduler_service import SchedulerService
4+
5+
6+
def init_jobs(scheduler_service: SchedulerService) -> None:
7+
"""Initialize all scheduled jobs - add new jobs here
8+
9+
This function follows the same pattern as app.routers.init_app().
10+
To add a new scheduled job:
11+
1. Create a new file in this directory (e.g., email_jobs.py)
12+
2. Define your job function
13+
3. Import and register it here
14+
"""
15+
from .driver_history_jobs import process_daily_driver_history
16+
17+
# Driver history mark daily routes as complete
18+
scheduler_service.add_cron_job(
19+
process_daily_driver_history,
20+
job_id="daily_driver_history",
21+
hour=23,
22+
minute=59,
23+
)

0 commit comments

Comments
 (0)