Skip to content

Commit 94b46aa

Browse files
ryaneggzclaude
andauthored
FROM feat/720-schedules-task-iq TO development (#721)
* feat: US-001 - Add DISTRIBUTED_WORKERS constant to src/constants Consolidate DISTRIBUTED_WORKERS into src/constants/__init__.py and update llm.py to import from there instead of defining it inline. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ryaneggz <kre8mymedia@gmail.com> * feat: US-002 - Dispatch scheduled jobs to TaskIQ when distributed mode enabled scheduled_llm_invoke() now checks DISTRIBUTED_WORKERS and dispatches to run_agent_stream.kiq() when true, preserving the in-process fallback. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ryaneggz <kre8mymedia@gmail.com> * feat: US-003 - Simplify schedule create route Remove LLMController indirection from create_job route. Call schedule_service.create_job() directly, removing unused imports. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ryaneggz <kre8mymedia@gmail.com> * feat: US-004 - Add unit tests for TaskIQ dispatch path Tests verify dispatch to TaskIQ when DISTRIBUTED_WORKERS=true, in-process fallback when false, and thread_id extraction/generation logic. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ryaneggz <kre8mymedia@gmail.com> * docs: update prd.json and progress.txt - all stories complete Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: ryaneggz <kre8mymedia@gmail.com> * Finished ralph loop * Move the input files --------- Signed-off-by: ryaneggz <kre8mymedia@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4889958 commit 94b46aa

30 files changed

Lines changed: 274 additions & 57 deletions

backend/src/constants/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ def values(cls) -> list[str]:
145145
# GridSite
146146
MICROSOFT_TEAMS_WEBHOOK_URL = os.getenv("MICROSOFT_TEAMS_WEBHOOK_URL")
147147

148+
# Distributed Workers
149+
DISTRIBUTED_WORKERS = os.getenv("DISTRIBUTED_WORKERS", "false").lower() == "true"
150+
148151
# Thread Search
149152
# Number of recent messages to store per thread snapshot for semantic search
150153
THREAD_SNAPSHOT_MESSAGE_COUNT = 20

backend/src/routes/v0/llm.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import os
2-
from typing import Any, List
1+
from typing import Any
32
from uuid import uuid4
43
from fastapi.responses import JSONResponse, StreamingResponse
54
from fastapi import (
@@ -20,7 +19,7 @@
2019
from src.controllers.llm import LLMController
2120
from src.services.llm import llm_service
2221
from src.services.prompt.optimize import PromptOptimizer, PromptOptimizerRequest
23-
from src.constants import GROQ_API_KEY
22+
from src.constants import DISTRIBUTED_WORKERS, GROQ_API_KEY
2423
from src.schemas.models import ProtectedUser
2524
from src.utils.auth import get_optional_user, get_optional_user_from_token
2625
from src.utils.logger import logger
@@ -34,9 +33,6 @@
3433
from src.constants.llm import DEFAULT_CHAT_MODEL, get_all_models, get_free_models
3534
from src.repos.user_settings_repo import UserSettingsRepo
3635

37-
# Distributed workers mode - when true, tasks are enqueued to TaskIQ workers
38-
DISTRIBUTED_WORKERS = os.getenv("DISTRIBUTED_WORKERS", "false").lower() == "true"
39-
4036
llm_router = APIRouter(tags=["LLM"], prefix="/llm")
4137

4238
# TIME_LIMIT = "1/minute"

backend/src/routes/v0/schedule.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
from fastapi import APIRouter, Depends, Response, Body
22
from fastapi.responses import JSONResponse
33
from fastapi_cache.decorator import cache
4-
from langgraph.store.base import BaseStore
54

6-
from src.controllers.llm import LLMController
7-
from src.agents import init_config
8-
from src.services.db import get_store
95
from src.services.schedule import schedule_service
106
from src.schemas.models import ProtectedUser
117
from src.utils.auth import verify_credentials
@@ -84,11 +80,9 @@ async def get_job(
8480
async def create_job(
8581
job: ScheduleCreate = Body(openapi_examples=Examples.SCHEDULE_CREATE_EXAMPLES),
8682
user: ProtectedUser = Depends(verify_credentials),
87-
store: BaseStore = Depends(get_store),
8883
):
89-
config = init_config(job.task, user.id)
90-
llm_controller = LLMController(user=user, store=store, config=config)
91-
schedule = await llm_controller.llm_task(job)
84+
schedule_service.user_id = user.id
85+
schedule = schedule_service.create_job(job)
9286
return JSONResponse(
9387
status_code=201,
9488
content={

backend/src/services/project.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from langgraph.store.base import BaseStore, SearchOp
1+
from langgraph.store.base import BaseStore
22
from src.services.db import get_store_in_memory
33
from src.repos.project_repo import Project, ProjectRepo
44
from src.schemas.entities import SearchFilter

backend/src/services/schedule.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,26 @@ async def scheduled_llm_invoke(task_dict: dict, user_id: str, title: str = None)
5454
This must be a module-level function (not a method) so APScheduler can pickle it.
5555
"""
5656
from uuid import uuid4
57+
from src.constants import DISTRIBUTED_WORKERS
58+
59+
# Distributed mode: dispatch to TaskIQ worker
60+
if DISTRIBUTED_WORKERS:
61+
from src.workers.tasks import run_agent_stream
62+
63+
metadata = task_dict.get("metadata") or {}
64+
thread_id = metadata.get("thread_id") or str(uuid4())
65+
logger.info(
66+
f"🚀 Dispatching scheduled job '{title}' to TaskIQ worker"
67+
f" (thread_id={thread_id})"
68+
)
69+
await run_agent_stream.kiq(
70+
task_dict=task_dict,
71+
user_id=user_id,
72+
thread_id=thread_id,
73+
)
74+
return
75+
76+
# In-process mode: execute directly
5777
from src.schemas.entities import LLMRequest
5878
from src.agents import construct_agent, Orchestra, init_config
5979
from src.services.db import get_checkpoint_db, get_store_db
@@ -65,7 +85,7 @@ async def scheduled_llm_invoke(task_dict: dict, user_id: str, title: str = None)
6585
params = LLMRequest(**task_dict)
6686
params.metadata.user_id = user_id
6787
params.metadata.thread_id = params.metadata.thread_id or str(uuid4())
68-
logger.info(f"✓ Successfully reconstructed LLMRequest")
88+
logger.info("✓ Successfully reconstructed LLMRequest")
6989

7090
# Initialize config and get files and todos
7191
config = init_config(params, user_id)
@@ -104,7 +124,7 @@ async def scheduled_llm_invoke(task_dict: dict, user_id: str, title: str = None)
104124
response = await agent.invoke(
105125
params.input, config=config, context=ctx_schema
106126
)
107-
logger.info(f"✓ LLM invocation completed successfully")
127+
logger.info("✓ LLM invocation completed successfully")
108128

109129
files_map = {**files_map, **response.get("files", {})}
110130
todos_list = [*todos_list, *response.get("todos", [])]

backend/src/services/tool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ async def invoke_default_tool(
105105
configurable: dict[str, Any] = {"user_id": self.user_id}
106106
if config is not None:
107107
# Merge caller config but exclude user_id to prevent override
108-
config_without_user_id = {k: v for k, v in config.items() if k != "user_id"}
108+
config_without_user_id = {
109+
k: v for k, v in config.items() if k != "user_id"
110+
}
109111
configurable.update(config_without_user_id)
110112
runnable_config = {"configurable": configurable}
111113
return await tool.ainvoke(

backend/src/tools/base/reasoning.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from pydantic import BaseModel, Field, field_validator
1+
from pydantic import BaseModel, Field
22
from langchain_core.tools import tool
33

44

backend/src/tools/finance/news.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def get_financial_news(ticker: str) -> tuple[str, dict]:
4646
)
4747

4848
# Convert to markdown (clean, no index)
49-
markdown = f"## Financial News\n" + f"```csv\n{df.to_csv(index=False)}\n```"
49+
markdown = "## Financial News\n" + f"```csv\n{df.to_csv(index=False)}\n```"
5050
return markdown
5151

5252
# TODO: Eval if this is cleaner later on. Get fidgety as table but looks good.

backend/src/tools/ms_teams/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ async def webhook_teams(text: str, runtime: ToolRuntime) -> str:
4848
json={"text": text},
4949
)
5050
response.raise_for_status()
51-
return f"Message sent to Microsoft Teams channel"
51+
return "Message sent to Microsoft Teams channel"
5252
except Exception as e:
5353
raise ToolException(
5454
f"Error sending message to Microsoft Teams channel: {e}"

backend/src/tools/search.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
import httpx
88
from charset_normalizer import from_bytes
9-
from langchain.tools import ToolRuntime
10-
from langchain_core.messages import ToolMessage
11-
from langgraph.types import Command
129
from markdownify import markdownify as md
1310
from langchain_core.tools import tool
1411
from langchain_core.tools import ToolException

0 commit comments

Comments
 (0)