Skip to content

Commit 433b76b

Browse files
authored
fix(backend/scheduler): Unbreak Scheduler.get_execution_schedules (#9919)
- Resolves #9918 - Follow-up fix for #9914 ### Changes 🏗️ - In `get_graph_execution_schedules`, skip jobs when their kwargs can't be parsed as `GraphExecutionJobArgs` - Rename methods of `Scheduler` to clarify their scope (scheduled *graph* executions) ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - Go to `/library/agents/[id]` (which calls `GET /api/schedules`) - [x] -> `GET /api/schedules` request returns HTTP 200
1 parent 1ad6c76 commit 433b76b

File tree

2 files changed

+32
-24
lines changed

2 files changed

+32
-24
lines changed

autogpt_platform/backend/backend/executor/scheduler.py

+30-22
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from autogpt_libs.utils.cache import thread_cached
1414
from dotenv import load_dotenv
1515
from prisma.enums import NotificationType
16-
from pydantic import BaseModel
16+
from pydantic import BaseModel, ValidationError
1717
from sqlalchemy import MetaData, create_engine
1818

1919
from backend.data.block import BlockInput
@@ -72,7 +72,7 @@ def get_notification_client():
7272

7373

7474
def execute_graph(**kwargs):
75-
args = ExecutionJobArgs(**kwargs)
75+
args = GraphExecutionJobArgs(**kwargs)
7676
try:
7777
log(f"Executing recurring job for graph #{args.graph_id}")
7878
execution_utils.add_graph_execution(
@@ -140,22 +140,24 @@ class Jobstores(Enum):
140140
WEEKLY_NOTIFICATIONS = "weekly_notifications"
141141

142142

143-
class ExecutionJobArgs(BaseModel):
143+
class GraphExecutionJobArgs(BaseModel):
144144
graph_id: str
145145
input_data: BlockInput
146146
user_id: str
147147
graph_version: int
148148
cron: str
149149

150150

151-
class ExecutionJobInfo(ExecutionJobArgs):
151+
class GraphExecutionJobInfo(GraphExecutionJobArgs):
152152
id: str
153153
name: str
154154
next_run_time: str
155155

156156
@staticmethod
157-
def from_db(job_args: ExecutionJobArgs, job_obj: JobObj) -> "ExecutionJobInfo":
158-
return ExecutionJobInfo(
157+
def from_db(
158+
job_args: GraphExecutionJobArgs, job_obj: JobObj
159+
) -> "GraphExecutionJobInfo":
160+
return GraphExecutionJobInfo(
159161
id=job_obj.id,
160162
name=job_obj.name,
161163
next_run_time=job_obj.next_run_time.isoformat(),
@@ -269,15 +271,15 @@ def cleanup(self):
269271
self.scheduler.shutdown(wait=False)
270272

271273
@expose
272-
def add_execution_schedule(
274+
def add_graph_execution_schedule(
273275
self,
274276
graph_id: str,
275277
graph_version: int,
276278
cron: str,
277279
input_data: BlockInput,
278280
user_id: str,
279-
) -> ExecutionJobInfo:
280-
job_args = ExecutionJobArgs(
281+
) -> GraphExecutionJobInfo:
282+
job_args = GraphExecutionJobArgs(
281283
graph_id=graph_id,
282284
input_data=input_data,
283285
user_id=user_id,
@@ -292,40 +294,46 @@ def add_execution_schedule(
292294
jobstore=Jobstores.EXECUTION.value,
293295
)
294296
log(f"Added job {job.id} with cron schedule '{cron}' input data: {input_data}")
295-
return ExecutionJobInfo.from_db(job_args, job)
297+
return GraphExecutionJobInfo.from_db(job_args, job)
296298

297299
@expose
298-
def delete_schedule(self, schedule_id: str, user_id: str) -> ExecutionJobInfo:
300+
def delete_graph_execution_schedule(
301+
self, schedule_id: str, user_id: str
302+
) -> GraphExecutionJobInfo:
299303
job = self.scheduler.get_job(schedule_id, jobstore=Jobstores.EXECUTION.value)
300304
if not job:
301305
log(f"Job {schedule_id} not found.")
302306
raise ValueError(f"Job #{schedule_id} not found.")
303307

304-
job_args = ExecutionJobArgs(**job.kwargs)
308+
job_args = GraphExecutionJobArgs(**job.kwargs)
305309
if job_args.user_id != user_id:
306310
raise ValueError("User ID does not match the job's user ID.")
307311

308312
log(f"Deleting job {schedule_id}")
309313
job.remove()
310314

311-
return ExecutionJobInfo.from_db(job_args, job)
315+
return GraphExecutionJobInfo.from_db(job_args, job)
312316

313317
@expose
314-
def get_execution_schedules(
318+
def get_graph_execution_schedules(
315319
self, graph_id: str | None = None, user_id: str | None = None
316-
) -> list[ExecutionJobInfo]:
320+
) -> list[GraphExecutionJobInfo]:
321+
jobs: list[JobObj] = self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value)
317322
schedules = []
318-
for job in self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value):
319-
logger.info(
323+
for job in jobs:
324+
logger.debug(
320325
f"Found job {job.id} with cron schedule {job.trigger} and args {job.kwargs}"
321326
)
322-
job_args = ExecutionJobArgs(**job.kwargs)
327+
try:
328+
job_args = GraphExecutionJobArgs.model_validate(job.kwargs)
329+
except ValidationError:
330+
continue
323331
if (
324332
job.next_run_time is not None
325333
and (graph_id is None or job_args.graph_id == graph_id)
326334
and (user_id is None or job_args.user_id == user_id)
327335
):
328-
schedules.append(ExecutionJobInfo.from_db(job_args, job))
336+
schedules.append(GraphExecutionJobInfo.from_db(job_args, job))
329337
return schedules
330338

331339
@expose
@@ -346,6 +354,6 @@ class SchedulerClient(AppServiceClient):
346354
def get_service_type(cls):
347355
return Scheduler
348356

349-
add_execution_schedule = endpoint_to_async(Scheduler.add_execution_schedule)
350-
delete_schedule = endpoint_to_async(Scheduler.delete_schedule)
351-
get_execution_schedules = endpoint_to_async(Scheduler.get_execution_schedules)
357+
add_execution_schedule = endpoint_to_async(Scheduler.add_graph_execution_schedule)
358+
delete_schedule = endpoint_to_async(Scheduler.delete_graph_execution_schedule)
359+
get_execution_schedules = endpoint_to_async(Scheduler.get_graph_execution_schedules)

autogpt_platform/backend/backend/server/routers/v1.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ class ScheduleCreationRequest(pydantic.BaseModel):
773773
async def create_schedule(
774774
user_id: Annotated[str, Depends(get_user_id)],
775775
schedule: ScheduleCreationRequest,
776-
) -> scheduler.ExecutionJobInfo:
776+
) -> scheduler.GraphExecutionJobInfo:
777777
graph = await graph_db.get_graph(
778778
schedule.graph_id, schedule.graph_version, user_id=user_id
779779
)
@@ -813,7 +813,7 @@ async def delete_schedule(
813813
async def get_execution_schedules(
814814
user_id: Annotated[str, Depends(get_user_id)],
815815
graph_id: str | None = None,
816-
) -> list[scheduler.ExecutionJobInfo]:
816+
) -> list[scheduler.GraphExecutionJobInfo]:
817817
return await execution_scheduler_client().get_execution_schedules(
818818
user_id=user_id,
819819
graph_id=graph_id,

0 commit comments

Comments
 (0)