Skip to content

Commit 87262c3

Browse files
committed
feat: add draft functionality and manager module
1 parent 4dd9e2f commit 87262c3

File tree

8 files changed

+1163
-0
lines changed

8 files changed

+1163
-0
lines changed

intentkit/core/draft.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
"""Service functions for agent draft operations."""
2+
3+
from __future__ import annotations
4+
5+
from epyxid import XID
6+
from fastapi import status
7+
from sqlalchemy import desc, select
8+
from sqlalchemy.ext.asyncio import AsyncSession
9+
10+
from intentkit.models.agent import Agent, AgentUserInput
11+
from intentkit.models.draft import AgentDraft, AgentDraftTable
12+
from intentkit.utils.error import IntentKitAPIError
13+
14+
15+
async def update_agent_draft(
16+
*,
17+
agent_id: str,
18+
user_id: str,
19+
input: AgentUserInput,
20+
db: AsyncSession,
21+
) -> AgentDraft:
22+
"""Update the latest draft for the specified agent with partial field updates.
23+
24+
This function only updates fields that are explicitly provided in the input,
25+
leaving other fields unchanged. This is more efficient than override as it
26+
reduces context usage and minimizes the risk of accidentally changing fields.
27+
"""
28+
query = (
29+
select(AgentDraftTable)
30+
.where(AgentDraftTable.agent_id == agent_id, AgentDraftTable.owner == user_id)
31+
.order_by(desc(AgentDraftTable.created_at))
32+
.limit(1)
33+
)
34+
35+
result = await db.execute(query)
36+
latest_draft = result.scalar_one_or_none()
37+
38+
if not latest_draft:
39+
raise IntentKitAPIError(
40+
status.HTTP_404_NOT_FOUND,
41+
"DraftNotFound",
42+
"No drafts found for this agent",
43+
)
44+
45+
# Get only the fields that are explicitly provided (exclude_unset=True)
46+
update_data = input.model_dump(exclude_unset=True)
47+
48+
if latest_draft.deployed_at is not None:
49+
# Create new draft version if current one is deployed
50+
draft_id = str(XID())
51+
52+
# Start with existing draft data and merge updates
53+
draft_data = AgentUserInput.model_validate(latest_draft).model_dump()
54+
draft_data.update(update_data)
55+
56+
updated_input = AgentUserInput.model_validate(draft_data)
57+
58+
draft_table = AgentDraftTable(
59+
id=draft_id,
60+
agent_id=agent_id,
61+
owner=user_id,
62+
version=updated_input.hash(),
63+
last_draft_id=latest_draft.id,
64+
project_id=latest_draft.project_id,
65+
**draft_data,
66+
)
67+
68+
db.add(draft_table)
69+
await db.commit()
70+
await db.refresh(draft_table)
71+
72+
return AgentDraft.model_validate(draft_table)
73+
74+
# Update existing draft in-place
75+
for key, value in update_data.items():
76+
setattr(latest_draft, key, value)
77+
78+
# Update version hash based on updated data
79+
updated_input = AgentUserInput.model_validate(latest_draft)
80+
latest_draft.version = updated_input.hash()
81+
82+
await db.commit()
83+
await db.refresh(latest_draft)
84+
85+
return AgentDraft.model_validate(latest_draft)
86+
87+
88+
async def override_agent_draft(
89+
*,
90+
agent_id: str,
91+
user_id: str,
92+
input: AgentUserInput,
93+
db: AsyncSession,
94+
) -> AgentDraft:
95+
"""Override the latest draft for the specified agent."""
96+
query = (
97+
select(AgentDraftTable)
98+
.where(AgentDraftTable.agent_id == agent_id, AgentDraftTable.owner == user_id)
99+
.order_by(desc(AgentDraftTable.created_at))
100+
.limit(1)
101+
)
102+
103+
result = await db.execute(query)
104+
latest_draft = result.scalar_one_or_none()
105+
106+
if not latest_draft:
107+
raise IntentKitAPIError(
108+
status.HTTP_404_NOT_FOUND,
109+
"DraftNotFound",
110+
"No drafts found for this agent",
111+
)
112+
113+
if latest_draft.deployed_at is not None:
114+
draft_id = str(XID())
115+
116+
draft_table = AgentDraftTable(
117+
id=draft_id,
118+
agent_id=agent_id,
119+
owner=user_id,
120+
version=input.hash(),
121+
last_draft_id=latest_draft.id,
122+
project_id=latest_draft.project_id,
123+
**input.model_dump(),
124+
)
125+
126+
db.add(draft_table)
127+
await db.commit()
128+
await db.refresh(draft_table)
129+
130+
return AgentDraft.model_validate(draft_table)
131+
132+
for key, value in input.model_dump().items():
133+
setattr(latest_draft, key, value)
134+
135+
latest_draft.version = input.hash()
136+
137+
await db.commit()
138+
await db.refresh(latest_draft)
139+
140+
return AgentDraft.model_validate(latest_draft)
141+
142+
143+
async def get_agent_latest_draft(
144+
*,
145+
agent_id: str,
146+
user_id: str,
147+
db: AsyncSession,
148+
) -> AgentDraft:
149+
"""Return the latest draft for the specified agent."""
150+
query = (
151+
select(AgentDraftTable)
152+
.where(AgentDraftTable.agent_id == agent_id, AgentDraftTable.owner == user_id)
153+
.order_by(desc(AgentDraftTable.created_at))
154+
.limit(1)
155+
)
156+
157+
result = await db.execute(query)
158+
latest_draft = result.scalar_one_or_none()
159+
160+
if latest_draft:
161+
return AgentDraft.model_validate(latest_draft)
162+
163+
agent = await Agent.get(agent_id)
164+
165+
if not agent:
166+
raise IntentKitAPIError(
167+
status.HTTP_404_NOT_FOUND,
168+
"AgentNotFound",
169+
"No drafts found for this agent",
170+
)
171+
172+
if agent.owner != user_id:
173+
raise IntentKitAPIError(
174+
status.HTTP_403_FORBIDDEN,
175+
"Forbidden",
176+
"Not your agent",
177+
)
178+
179+
draft_id = str(XID())
180+
181+
agent_dict = agent.model_dump()
182+
input_dict: dict[str, object] = {}
183+
for key in AgentUserInput.model_fields:
184+
if key in agent_dict:
185+
input_dict[key] = agent_dict[key]
186+
input = AgentUserInput.model_validate(input_dict)
187+
188+
draft_table = AgentDraftTable(
189+
id=draft_id,
190+
agent_id=agent_id,
191+
owner=user_id,
192+
version=input.hash(),
193+
deployed_at=agent.updated_at,
194+
**input.model_dump(),
195+
)
196+
197+
db.add(draft_table)
198+
await db.commit()
199+
await db.refresh(draft_table)
200+
201+
return AgentDraft.model_validate(draft_table)

intentkit/core/draft_chat.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Utilities for streaming draft agent conversations."""
2+
3+
import logging
4+
import time
5+
from datetime import datetime, timedelta, timezone
6+
from typing import AsyncGenerator
7+
8+
from langgraph.graph.state import CompiledStateGraph
9+
from sqlalchemy import desc, select
10+
from sqlalchemy.ext.asyncio import AsyncSession
11+
12+
from intentkit.core.engine import build_agent, stream_agent_raw
13+
from intentkit.models.agent import Agent
14+
from intentkit.models.agent_data import AgentData
15+
from intentkit.models.chat import ChatMessage, ChatMessageCreate
16+
from intentkit.models.db import get_session
17+
from intentkit.models.draft import AgentDraft, AgentDraftTable
18+
from intentkit.utils.error import IntentKitAPIError
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
_draft_executors: dict[str, CompiledStateGraph] = {}
24+
_draft_updated_at: dict[str, datetime] = {}
25+
_draft_cached_at: dict[str, datetime] = {}
26+
27+
_CACHE_TTL = timedelta(days=1)
28+
29+
30+
async def stream_draft(
31+
agent_id: str, message: ChatMessageCreate
32+
) -> AsyncGenerator[ChatMessage, None]:
33+
"""Stream chat messages for the latest draft of an agent."""
34+
35+
draft = await _get_latest_draft(agent_id)
36+
agent = _agent_from_draft(draft)
37+
executor, cold_start_cost = await _get_draft_executor(agent, draft)
38+
39+
if not message.agent_id:
40+
message.agent_id = agent.id
41+
message.cold_start_cost = cold_start_cost
42+
43+
async for chat_message in stream_agent_raw(message, agent, executor):
44+
yield chat_message
45+
46+
47+
async def _get_latest_draft(agent_id: str) -> AgentDraft:
48+
async with get_session() as session:
49+
result = await _execute_latest_draft_query(session, agent_id)
50+
draft_row = result.scalar_one_or_none()
51+
52+
if not draft_row:
53+
raise IntentKitAPIError(
54+
status_code=404,
55+
key="DraftNotFound",
56+
message=f"No draft found for agent {agent_id}",
57+
)
58+
59+
return AgentDraft.model_validate(draft_row)
60+
61+
62+
async def _execute_latest_draft_query(session: AsyncSession, agent_id: str):
63+
statement = (
64+
select(AgentDraftTable)
65+
.where(AgentDraftTable.agent_id == agent_id)
66+
.order_by(desc(AgentDraftTable.updated_at))
67+
.limit(1)
68+
)
69+
return await session.execute(statement)
70+
71+
72+
def _agent_from_draft(draft: AgentDraft) -> Agent:
73+
data = draft.model_dump()
74+
data.pop("id", None)
75+
data.pop("agent_id", None)
76+
data.pop("last_draft_id", None)
77+
data["id"] = draft.agent_id
78+
data["owner"] = draft.owner
79+
data["deployed_at"] = draft.deployed_at
80+
data["created_at"] = draft.created_at
81+
data["updated_at"] = draft.updated_at
82+
data["version"] = draft.version
83+
return Agent.model_validate(data)
84+
85+
86+
async def _get_draft_executor(
87+
agent: Agent, draft: AgentDraft
88+
) -> tuple[CompiledStateGraph, float]:
89+
now = datetime.now(timezone.utc)
90+
_cleanup_cache(now)
91+
92+
cached_executor = _draft_executors.get(agent.id)
93+
cached_updated = _draft_updated_at.get(agent.id)
94+
cold_start_cost = 0.0
95+
96+
if not cached_executor or cached_updated != draft.updated_at:
97+
start = time.perf_counter()
98+
agent_data = AgentData(id=agent.id)
99+
cached_executor = await build_agent(agent, agent_data)
100+
cold_start_cost = time.perf_counter() - start
101+
_draft_executors[agent.id] = cached_executor
102+
_draft_updated_at[agent.id] = draft.updated_at
103+
_draft_cached_at[agent.id] = now
104+
logger.info("Initialized draft executor for agent %s", agent.id)
105+
else:
106+
_draft_cached_at[agent.id] = now
107+
108+
return cached_executor, cold_start_cost
109+
110+
111+
def _cleanup_cache(now: datetime) -> None:
112+
expired_before = now - _CACHE_TTL
113+
for agent_id, cached_time in list(_draft_cached_at.items()):
114+
if cached_time < expired_before:
115+
_draft_cached_at.pop(agent_id, None)
116+
_draft_updated_at.pop(agent_id, None)
117+
_draft_executors.pop(agent_id, None)
118+
logger.debug("Removed expired draft executor for agent %s", agent_id)

intentkit/core/manager/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""Manager module for agent management operations."""
2+
3+
from intentkit.core.manager.engine import stream_manager
4+
from intentkit.core.manager.service import (
5+
agent_draft_json_schema,
6+
get_latest_public_info,
7+
get_skills_hierarchical_text,
8+
)
9+
from intentkit.core.manager.skills import (
10+
get_agent_latest_draft_skill,
11+
get_agent_latest_public_info_skill,
12+
update_agent_draft_skill,
13+
update_public_info_skill,
14+
)
15+
16+
__all__ = [
17+
"stream_manager",
18+
"agent_draft_json_schema",
19+
"get_skills_hierarchical_text",
20+
"get_latest_public_info",
21+
"get_agent_latest_draft_skill",
22+
"get_agent_latest_public_info_skill",
23+
"update_agent_draft_skill",
24+
"update_public_info_skill",
25+
]

0 commit comments

Comments
 (0)