Skip to content

Commit 66f0180

Browse files
✨ add ScheduledTaskTool with global scheduler and frontend polling
Add a scheduled task tool that enables agents to create, list, and cancel cron-based or one-shot scheduled tasks. The architecture uses a global scheduler singleton (independent of agent lifecycle), DB persistence, and frontend polling for real-time message delivery. Changes: - SDK: ScheduledTaskTool as thin CRUD wrapper with cron parser - Backend: global ScheduledTaskScheduler (10s DB poll), scheduled_task_db - Backend: new_messages polling API + batch endpoint - Backend: conversation_id passthrough for task-to-session binding - Backend: scheduler strips ScheduledTaskTool from triggered agent to prevent recursive task creation - Frontend: two-layer polling (5s active, 10s background conversations) - DB: scheduled_tasks_t table with multi-tenant fields No existing Nexent logic is modified — all changes are additive.
1 parent b6b6027 commit 66f0180

15 files changed

Lines changed: 843 additions & 2 deletions

File tree

backend/agents/create_agent_info.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ async def create_agent_config(
310310
allow_memory_search: bool = True,
311311
version_no: int = 0,
312312
override_model_id: int | None = None,
313+
conversation_id: int = None,
313314
):
314315
agent_info = search_agent_info_by_agent_id(
315316
agent_id=agent_id, tenant_id=tenant_id, version_no=version_no)
@@ -331,13 +332,14 @@ async def create_agent_config(
331332
allow_memory_search=allow_memory_search,
332333
version_no=sub_agent_version_no,
333334
override_model_id=None,
335+
conversation_id=None,
334336
)
335337
managed_agents.append(sub_agent_config)
336338

337339
# create external A2A agents (synchronous function, no await needed)
338340
external_a2a_agents = _get_external_a2a_agents(agent_id, tenant_id, version_no)
339341

340-
tool_list = await create_tool_config_list(agent_id, tenant_id, user_id, version_no=version_no)
342+
tool_list = await create_tool_config_list(agent_id, tenant_id, user_id, version_no=version_no, conversation_id=conversation_id)
341343

342344
# Build system prompt: prioritize segmented fields, fallback to original prompt field if not available
343345
duty_prompt = agent_info.get("duty_prompt", "")
@@ -562,7 +564,7 @@ async def create_agent_config(
562564
return agent_config
563565

564566

565-
async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int = 0):
567+
async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int = 0, conversation_id: int = None):
566568
# create tool
567569
tool_config_list = []
568570
langchain_tools = await discover_langchain_tools()
@@ -665,6 +667,17 @@ async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int
665667
"storage_client": minio_client,
666668
"validate_url_access": lambda urls: validate_urls_access(urls, user_id)
667669
}
670+
elif tool_config.class_name == "ScheduledTaskTool":
671+
from database.scheduled_task_db import create_scheduled_task, query_tasks_by_agent, cancel_task
672+
tool_config.metadata = {
673+
"db_create": create_scheduled_task,
674+
"db_list": query_tasks_by_agent,
675+
"db_cancel": cancel_task,
676+
"agent_id": agent_id,
677+
"tenant_id": tenant_id,
678+
"user_id": user_id,
679+
"conversation_id": conversation_id,
680+
}
668681

669682
tool_config_list.append(tool_config)
670683

@@ -929,6 +942,7 @@ async def create_agent_run_info(
929942
is_debug: bool = False,
930943
override_version_no: int | None = None,
931944
override_model_id: int | None = None,
945+
conversation_id: int = None,
932946
):
933947
# Determine which version_no to use based on is_debug flag
934948
# If is_debug=false, use the current published version (current_version_no)
@@ -957,6 +971,7 @@ async def create_agent_run_info(
957971
"last_user_query": final_query,
958972
"allow_memory_search": allow_memory_search,
959973
"version_no": version_no,
974+
"conversation_id": conversation_id,
960975
}
961976
if override_model_id is not None:
962977
create_config_kwargs["override_model_id"] = override_model_id

backend/apps/conversation_management_app.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any, Dict, Optional
44

55
from fastapi import APIRouter, Header, HTTPException, Request
6+
from starlette.responses import JSONResponse
67

78
from consts.model import (
89
ConversationRequest,
@@ -18,6 +19,7 @@
1819
generate_conversation_title_service,
1920
get_conversation_history_service,
2021
get_conversation_list_service,
22+
get_new_messages_service,
2123
get_sources_service,
2224
rename_conversation_service,
2325
update_message_opinion_service, get_message_id_by_index_impl,
@@ -240,3 +242,52 @@ async def get_message_id_endpoint(request: MessageIdRequest):
240242
except Exception as e:
241243
logging.error(f"Failed to get message ID: {str(e)}")
242244
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
245+
246+
247+
@router.get("/{conversation_id}/new_messages", response_model=Dict[str, Any])
248+
async def check_new_messages_endpoint(conversation_id: int, since_index: int = 0, authorization: Optional[str] = Header(None)):
249+
"""
250+
Lightweight polling: check if new messages exist for a single conversation.
251+
252+
Args:
253+
conversation_id: Conversation ID
254+
since_index: Last known message index on the client side
255+
authorization: Authorization header
256+
257+
Returns:
258+
Dict with has_new, max_index, since_index
259+
"""
260+
try:
261+
user_id, tenant_id = get_current_user_id(authorization)
262+
result = get_new_messages_service(conversation_id, user_id, since_index)
263+
return JSONResponse(status_code=HTTPStatus.OK, content=result)
264+
except Exception as e:
265+
logging.error(f"Failed to check new messages: {str(e)}")
266+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
267+
268+
269+
@router.post("/batch_new_messages", response_model=Dict[str, Any])
270+
async def batch_check_new_messages_endpoint(request: Dict[str, Any], authorization: Optional[str] = Header(None)):
271+
"""
272+
Batch check for new messages across multiple conversations.
273+
274+
Args:
275+
request: Body with "checks" list of {"conversation_id": int, "since_index": int}
276+
authorization: Authorization header
277+
278+
Returns:
279+
Dict mapping conversation_id to {has_new, max_index, since_index}
280+
"""
281+
try:
282+
user_id, tenant_id = get_current_user_id(authorization)
283+
checks = request.get("checks", [])
284+
results = {}
285+
for check in checks:
286+
cid = check.get("conversation_id")
287+
since = check.get("since_index", 0)
288+
if cid is not None:
289+
results[str(cid)] = get_new_messages_service(cid, user_id, since)
290+
return JSONResponse(status_code=HTTPStatus.OK, content={"results": results})
291+
except Exception as e:
292+
logging.error(f"Failed to batch check new messages: {str(e)}")
293+
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))

backend/apps/runtime_app.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from apps.file_management_app import file_management_runtime_router as file_management_router
99
from apps.skill_app import skill_creator_router
1010
from middleware.exception_handler import ExceptionHandlerMiddleware
11+
from services.scheduled_task_scheduler import scheduled_task_scheduler
1112

1213
# Create logger instance
1314
logger = logging.getLogger("runtime_app")
@@ -24,3 +25,13 @@
2425
app.include_router(file_management_router)
2526
app.include_router(voice_router)
2627
app.include_router(skill_creator_router)
28+
29+
30+
@app.on_event("startup")
31+
async def start_scheduled_task_scheduler():
32+
scheduled_task_scheduler.start()
33+
34+
35+
@app.on_event("shutdown")
36+
async def stop_scheduled_task_scheduler():
37+
scheduled_task_scheduler.stop()

backend/database/conversation_db.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,17 @@ def get_source_searches_by_conversation(conversation_id: int, user_id: Optional[
956956
return [as_dict(record) for record in search_records]
957957

958958

959+
def get_max_message_index(conversation_id: int) -> int:
960+
"""Return the maximum message_index for a conversation, or -1 if empty."""
961+
with get_db_session() as session:
962+
conversation_id = int(conversation_id)
963+
stmt = select(func.coalesce(func.max(ConversationMessage.message_index), -1)).where(
964+
ConversationMessage.conversation_id == conversation_id,
965+
ConversationMessage.delete_flag == 'N',
966+
)
967+
return session.execute(stmt).scalar()
968+
969+
959970
def get_message(message_id: int, user_id: Optional[str] = None) -> Dict[str, Any]:
960971
"""
961972
Get message details by message ID

backend/database/db_models.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,35 @@ class A2AMessage(SimpleTableBase):
11641164
timezone=False), server_default=func.now(), doc="Message creation timestamp")
11651165

11661166

1167+
class ScheduledTaskRecord(TableBase):
1168+
"""
1169+
Scheduled task records for deferred / recurring agent execution.
1170+
"""
1171+
__tablename__ = "scheduled_tasks_t"
1172+
__table_args__ = (
1173+
Index("ix_scheduled_task_status_next_fire", "status", "next_fire_time"),
1174+
Index("ix_scheduled_task_agent_delete", "agent_id", "delete_flag"),
1175+
{"schema": SCHEMA},
1176+
)
1177+
1178+
task_id = Column(Integer, Sequence("scheduled_tasks_t_task_id_seq", schema=SCHEMA),
1179+
primary_key=True, nullable=False, autoincrement=True, doc="Primary key")
1180+
task_uuid = Column(String(36), unique=True, nullable=False, doc="Unique task identifier (UUID)")
1181+
task_name = Column(String(200), doc="Human-readable task name")
1182+
task_prompt = Column(Text, nullable=False, doc="The prompt to execute when the task fires")
1183+
task_type = Column(String(10), nullable=False, doc="Task type: oneshot or cron")
1184+
cron_expression = Column(String(100), doc="Cron expression for recurring tasks")
1185+
delay_seconds = Column(Integer, doc="Delay in seconds for oneshot tasks")
1186+
status = Column(String(20), default="pending", doc="Task status: pending, fired, cancelled, error")
1187+
next_fire_time = Column(TIMESTAMP(timezone=False), doc="Next scheduled execution time")
1188+
fire_count = Column(Integer, default=0, doc="Number of times this task has fired")
1189+
max_fires = Column(Integer, nullable=True, doc="Maximum number of fires (NULL = unlimited)")
1190+
agent_id = Column(Integer, nullable=False, doc="Agent ID that owns this task")
1191+
conversation_id = Column(Integer, nullable=True, doc="Conversation ID associated with this task")
1192+
tenant_id = Column(String(100), nullable=False, doc="Tenant ID for multi-tenancy isolation")
1193+
user_id = Column(String(100), nullable=False, doc="User ID who created this task")
1194+
1195+
11671196
class A2AArtifact(SimpleTableBase):
11681197
"""
11691198
A2A artifacts. Stores the output/artifacts produced by a task.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import logging
2+
from datetime import datetime
3+
from typing import Optional
4+
5+
from sqlalchemy import select, update
6+
7+
from .client import as_dict, get_db_session
8+
from .db_models import ScheduledTaskRecord
9+
10+
logger = logging.getLogger("scheduled_task_db")
11+
12+
13+
def create_scheduled_task(data: dict) -> dict:
14+
"""Insert a new scheduled task record and return it as a dict."""
15+
with get_db_session() as session:
16+
record = ScheduledTaskRecord(**data)
17+
session.add(record)
18+
session.flush()
19+
return as_dict(record)
20+
21+
22+
def query_tasks_by_agent(agent_id: int, tenant_id: str, user_id: str = None) -> list[dict]:
23+
"""Return pending tasks for a given agent and tenant, optionally filtered by user."""
24+
with get_db_session() as session:
25+
stmt = select(ScheduledTaskRecord).where(
26+
ScheduledTaskRecord.agent_id == agent_id,
27+
ScheduledTaskRecord.tenant_id == tenant_id,
28+
ScheduledTaskRecord.status == "pending",
29+
ScheduledTaskRecord.delete_flag == "N",
30+
)
31+
if user_id:
32+
stmt = stmt.where(ScheduledTaskRecord.user_id == user_id)
33+
stmt = stmt.order_by(ScheduledTaskRecord.task_id.desc())
34+
records = session.scalars(stmt).all()
35+
return [as_dict(r) for r in records]
36+
37+
38+
def query_pending_tasks_due(now: datetime) -> list[dict]:
39+
"""Return all pending tasks whose next_fire_time <= now (global, no tenant filter)."""
40+
with get_db_session() as session:
41+
stmt = select(ScheduledTaskRecord).where(
42+
ScheduledTaskRecord.status == "pending",
43+
ScheduledTaskRecord.next_fire_time <= now,
44+
ScheduledTaskRecord.delete_flag == "N",
45+
)
46+
records = session.scalars(stmt).all()
47+
return [as_dict(r) for r in records]
48+
49+
50+
def cancel_task(task_uuid: str, agent_id: int, tenant_id: str, user_id: str = None) -> bool:
51+
"""Soft-cancel a task. Optionally restrict to a specific user for isolation."""
52+
with get_db_session() as session:
53+
conditions = [
54+
ScheduledTaskRecord.task_uuid == task_uuid,
55+
ScheduledTaskRecord.agent_id == agent_id,
56+
ScheduledTaskRecord.tenant_id == tenant_id,
57+
ScheduledTaskRecord.delete_flag == "N",
58+
ScheduledTaskRecord.status == "pending",
59+
]
60+
if user_id:
61+
conditions.append(ScheduledTaskRecord.user_id == user_id)
62+
stmt = (
63+
update(ScheduledTaskRecord)
64+
.where(*conditions)
65+
.values(status="cancelled")
66+
)
67+
result = session.execute(stmt)
68+
return result.rowcount > 0
69+
70+
71+
def update_task_status(task_uuid: str, updates: dict) -> None:
72+
"""Update arbitrary columns on a task record identified by task_uuid."""
73+
with get_db_session() as session:
74+
stmt = (
75+
update(ScheduledTaskRecord)
76+
.where(
77+
ScheduledTaskRecord.task_uuid == task_uuid,
78+
ScheduledTaskRecord.delete_flag == "N",
79+
)
80+
.values(**updates)
81+
)
82+
session.execute(stmt)

backend/services/conversation_management_service.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,17 @@ def delete_conversation_service(conversation_id: int, user_id: str) -> bool:
384384
raise Exception(str(e))
385385

386386

387+
def get_new_messages_service(conversation_id: int, user_id: str, since_index: int) -> Dict[str, Any]:
388+
"""Lightweight polling: check for new messages after since_index."""
389+
from database.conversation_db import get_conversation, get_max_message_index
390+
conv = get_conversation(conversation_id, user_id)
391+
if conv is None:
392+
return {"has_new": False, "max_index": since_index, "since_index": since_index}
393+
max_idx = get_max_message_index(conversation_id)
394+
has_new = max_idx > since_index
395+
return {"has_new": has_new, "max_index": max_idx, "since_index": since_index}
396+
397+
387398
def get_conversation_history_service(conversation_id: int, user_id: str) -> List[Dict[str, Any]]:
388399
"""
389400
Get complete history of specified conversation

0 commit comments

Comments
 (0)