Skip to content

Commit 1dc0b67

Browse files
Move AG-UI event generation from chatMessage route to service and delete chatMessage schema.
1 parent 3331151 commit 1dc0b67

File tree

3 files changed

+111
-110
lines changed

3 files changed

+111
-110
lines changed
Lines changed: 3 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,10 @@
11
from fastapi import APIRouter, Depends
22
from fastapi.responses import StreamingResponse
33
from sqlalchemy.orm import Session
4-
from typing import AsyncIterator
54

65
from app.core.db_connection import get_db
7-
from ag_ui.core import (
8-
RunAgentInput,
9-
RunStartedEvent,
10-
TextMessageStartEvent,
11-
TextMessageContentEvent,
12-
TextMessageEndEvent,
13-
RunFinishedEvent,
14-
MessagesSnapshotEvent,
15-
UserMessage,
16-
AssistantMessage,
17-
TextInputContent,
18-
EventEncoder
19-
)
20-
from app.services.chatMessage import create_user_message
6+
from ag_ui.core import RunAgentInput
7+
from app.services.chatMessage import process_agent_message
218

229

2310
router = APIRouter(
@@ -37,87 +24,6 @@ async def post_user_message(
3724
3825
This endpoint accepts RunAgentInput and returns a stream of AG-UI events.
3926
"""
40-
async def event_generator() -> AsyncIterator[str]:
41-
"""Generate AG-UI protocol events."""
42-
encoder = EventEncoder()
43-
44-
try:
45-
# Extract the user message from the messages list
46-
user_message_text = ""
47-
if payload.messages:
48-
last_message = payload.messages[-1]
49-
if hasattr(last_message, 'content'):
50-
if isinstance(last_message.content, list) and len(last_message.content) > 0:
51-
user_message_text = last_message.content[0].text
52-
elif isinstance(last_message.content, str):
53-
user_message_text = last_message.content
54-
55-
# Use thread_id as session_id (AG-UI uses thread_id for conversation tracking)
56-
session_id = payload.thread_id if hasattr(payload, 'thread_id') else None
57-
58-
# Emit RUN_STARTED event
59-
run_started = RunStartedEvent(
60-
run_id=payload.run_id,
61-
thread_id=payload.thread_id
62-
)
63-
yield encoder.encode(run_started)
64-
65-
# Process the message through the service
66-
assistant_msg, session_id = create_user_message(
67-
db=db,
68-
message=user_message_text,
69-
session_id=session_id
70-
)
71-
72-
# Generate a unique message ID for the assistant response
73-
message_id = f"msg_{payload.run_id}"
74-
75-
# Emit TEXT_MESSAGE_START event
76-
text_start = TextMessageStartEvent(
77-
run_id=payload.run_id,
78-
message_id=message_id
79-
)
80-
yield encoder.encode(text_start)
81-
82-
# Emit TEXT_MESSAGE_CONTENT event with the assistant's response
83-
text_content = TextMessageContentEvent(
84-
run_id=payload.run_id,
85-
message_id=message_id,
86-
content=assistant_msg.message
87-
)
88-
yield encoder.encode(text_content)
89-
90-
# Emit TEXT_MESSAGE_END event
91-
text_end = TextMessageEndEvent(
92-
run_id=payload.run_id,
93-
message_id=message_id
94-
)
95-
yield encoder.encode(text_end)
96-
97-
# Emit MESSAGES_SNAPSHOT event with the full conversation
98-
messages_snapshot = MessagesSnapshotEvent(
99-
run_id=payload.run_id,
100-
messages=[
101-
*payload.messages, # Include previous messages
102-
AssistantMessage(
103-
id=message_id,
104-
content=[TextInputContent(text=assistant_msg.message)]
105-
)
106-
]
107-
)
108-
yield encoder.encode(messages_snapshot)
109-
110-
# Emit RUN_FINISHED event
111-
run_finished = RunFinishedEvent(
112-
run_id=payload.run_id
113-
)
114-
yield encoder.encode(run_finished)
115-
116-
except Exception as e:
117-
# In case of error, we should emit a RUN_ERROR event
118-
# For now, we'll just re-raise the exception
119-
raise
120-
12127
return StreamingResponse(
122-
event_generator(),
28+
process_agent_message(db, payload)
12329
)

DocsManager/app/schemas/chatMessage.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

DocsManager/app/services/chatMessage.py

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
from sqlalchemy.orm import Session
2+
from typing import AsyncIterator
23

34
from app.models.chat_message import ChatMessage
45
from app.schemas.enums.sender_type import SenderType
56
from app.models.chat_session import ChatSession
7+
from ag_ui.core import (
8+
RunAgentInput,
9+
RunStartedEvent,
10+
TextMessageStartEvent,
11+
TextMessageContentEvent,
12+
TextMessageEndEvent,
13+
RunFinishedEvent,
14+
MessagesSnapshotEvent,
15+
AssistantMessage,
16+
TextInputContent,
17+
EventEncoder
18+
)
19+
620

721
def assistant_reply(text: str) -> str:
822
return f"Respuesta del asistente a: {text}"
@@ -45,4 +59,97 @@ def create_user_message(
4559
db.commit()
4660
db.refresh(assistant_msg)
4761

48-
return assistant_msg, session_id
62+
return assistant_msg, session_id
63+
64+
65+
async def process_agent_message(
66+
db: Session,
67+
payload: RunAgentInput
68+
) -> AsyncIterator[str]:
69+
"""Process agent message and generate AG-UI protocol events.
70+
71+
Args:
72+
db: Database session
73+
payload: RunAgentInput containing the user's message and context
74+
75+
Yields:
76+
AG-UI protocol events as encoded strings
77+
"""
78+
encoder = EventEncoder()
79+
80+
try:
81+
# Extract the user message from the messages list
82+
user_message_text = ""
83+
if payload.messages:
84+
last_message = payload.messages[-1]
85+
if hasattr(last_message, 'content'):
86+
if isinstance(last_message.content, list) and len(last_message.content) > 0:
87+
user_message_text = last_message.content[0].text
88+
elif isinstance(last_message.content, str):
89+
user_message_text = last_message.content
90+
91+
# Use thread_id as session_id (AG-UI uses thread_id for conversation tracking)
92+
session_id = payload.thread_id if hasattr(payload, 'thread_id') else None
93+
94+
# Emit RUN_STARTED event
95+
run_started = RunStartedEvent(
96+
run_id=payload.run_id,
97+
thread_id=payload.thread_id
98+
)
99+
yield encoder.encode(run_started)
100+
101+
# Process the message through the service
102+
assistant_msg, session_id = create_user_message(
103+
db=db,
104+
message=user_message_text,
105+
session_id=session_id
106+
)
107+
108+
# Generate a unique message ID for the assistant response
109+
message_id = f"msg_{payload.run_id}"
110+
111+
# Emit TEXT_MESSAGE_START event
112+
text_start = TextMessageStartEvent(
113+
run_id=payload.run_id,
114+
message_id=message_id
115+
)
116+
yield encoder.encode(text_start)
117+
118+
# Emit TEXT_MESSAGE_CONTENT event with the assistant's response
119+
text_content = TextMessageContentEvent(
120+
run_id=payload.run_id,
121+
message_id=message_id,
122+
content=assistant_msg.message
123+
)
124+
yield encoder.encode(text_content)
125+
126+
# Emit TEXT_MESSAGE_END event
127+
text_end = TextMessageEndEvent(
128+
run_id=payload.run_id,
129+
message_id=message_id
130+
)
131+
yield encoder.encode(text_end)
132+
133+
# Emit MESSAGES_SNAPSHOT event with the full conversation
134+
messages_snapshot = MessagesSnapshotEvent(
135+
run_id=payload.run_id,
136+
messages=[
137+
*payload.messages, # Include previous messages
138+
AssistantMessage(
139+
id=message_id,
140+
content=[TextInputContent(text=assistant_msg.message)]
141+
)
142+
]
143+
)
144+
yield encoder.encode(messages_snapshot)
145+
146+
# Emit RUN_FINISHED event
147+
run_finished = RunFinishedEvent(
148+
run_id=payload.run_id
149+
)
150+
yield encoder.encode(run_finished)
151+
152+
except Exception as e:
153+
# In case of error, we should emit a RUN_ERROR event
154+
# For now, we'll just re-raise the exception
155+
raise

0 commit comments

Comments
 (0)