Conversation
…entInput` and returning `StreamingResponse` with various AG-UI events.
…ete chatMessage schema.
🔍 PR Validation Results
|
There was a problem hiding this comment.
Pull request overview
This PR migrates the chat messaging API from a simple request-response pattern to the AG-UI protocol, implementing a streaming event-based architecture. The changes replace custom Pydantic schemas with AG-UI's RunAgentInput and transform the synchronous endpoint into an asynchronous streaming endpoint that emits protocol-compliant events.
Key changes include:
- Implementation of AG-UI protocol event streaming for chat interactions
- Migration from synchronous request-response to asynchronous streaming architecture
- Removal of custom chat message schemas in favor of AG-UI types
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| DocsManager/app/services/chatMessage.py | Added process_agent_message async generator function that wraps the existing create_user_message logic with AG-UI protocol events (RUN_STARTED, TEXT_MESSAGE_START/CONTENT/END, MESSAGES_SNAPSHOT, RUN_FINISHED) |
| DocsManager/app/schemas/chatMessage.py | Removed UserMessageIn and AssistantMessageOut schemas as they're replaced by AG-UI types |
| DocsManager/app/api/routes/chatMessage.py | Converted /messages endpoint from sync to async, changed from returning JSON to StreamingResponse, and switched from custom schemas to RunAgentInput |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from typing import AsyncIterator | ||
|
|
||
| from app.models.chat_message import ChatMessage | ||
| from app.schemas.enums.sender_type import SenderType | ||
| from app.models.chat_session import ChatSession | ||
| from ag_ui.core import ( | ||
| RunAgentInput, | ||
| RunStartedEvent, | ||
| TextMessageStartEvent, | ||
| TextMessageContentEvent, | ||
| TextMessageEndEvent, | ||
| RunFinishedEvent, | ||
| MessagesSnapshotEvent, | ||
| AssistantMessage, | ||
| TextInputContent, | ||
| EventEncoder | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
The ag_ui package is imported but not listed in the project dependencies in pyproject.toml. This will cause import errors at runtime. Add the ag_ui package to the dependencies list in pyproject.toml to ensure the package is installed.
| from typing import AsyncIterator | |
| from app.models.chat_message import ChatMessage | |
| from app.schemas.enums.sender_type import SenderType | |
| from app.models.chat_session import ChatSession | |
| from ag_ui.core import ( | |
| RunAgentInput, | |
| RunStartedEvent, | |
| TextMessageStartEvent, | |
| TextMessageContentEvent, | |
| TextMessageEndEvent, | |
| RunFinishedEvent, | |
| MessagesSnapshotEvent, | |
| AssistantMessage, | |
| TextInputContent, | |
| EventEncoder | |
| ) | |
| from typing import AsyncIterator, TYPE_CHECKING | |
| from app.models.chat_message import ChatMessage | |
| from app.schemas.enums.sender_type import SenderType | |
| from app.models.chat_session import ChatSession | |
| try: | |
| from ag_ui.core import ( | |
| RunAgentInput, | |
| RunStartedEvent, | |
| TextMessageStartEvent, | |
| TextMessageContentEvent, | |
| TextMessageEndEvent, | |
| RunFinishedEvent, | |
| MessagesSnapshotEvent, | |
| AssistantMessage, | |
| TextInputContent, | |
| EventEncoder, | |
| ) | |
| except ImportError: # pragma: no cover | |
| if TYPE_CHECKING: | |
| # For static type checkers, try to import the real types | |
| from ag_ui.core import ( # type: ignore[no-redef] | |
| RunAgentInput, | |
| RunStartedEvent, | |
| TextMessageStartEvent, | |
| TextMessageContentEvent, | |
| TextMessageEndEvent, | |
| RunFinishedEvent, | |
| MessagesSnapshotEvent, | |
| AssistantMessage, | |
| TextInputContent, | |
| EventEncoder, | |
| ) | |
| else: | |
| # Runtime fallbacks to avoid ImportError if ag_ui is not installed. | |
| class RunAgentInput: # type: ignore[no-redef] | |
| """Fallback stub for RunAgentInput when ag_ui is unavailable.""" | |
| pass | |
| class RunStartedEvent: # type: ignore[no-redef] | |
| """Fallback stub for RunStartedEvent when ag_ui is unavailable.""" | |
| pass | |
| class TextMessageStartEvent: # type: ignore[no-redef] | |
| """Fallback stub for TextMessageStartEvent when ag_ui is unavailable.""" | |
| pass | |
| class TextMessageContentEvent: # type: ignore[no-redef] | |
| """Fallback stub for TextMessageContentEvent when ag_ui is unavailable.""" | |
| pass | |
| class TextMessageEndEvent: # type: ignore[no-redef] | |
| """Fallback stub for TextMessageEndEvent when ag_ui is unavailable.""" | |
| pass | |
| class RunFinishedEvent: # type: ignore[no-redef] | |
| """Fallback stub for RunFinishedEvent when ag_ui is unavailable.""" | |
| pass | |
| class MessagesSnapshotEvent: # type: ignore[no-redef] | |
| """Fallback stub for MessagesSnapshotEvent when ag_ui is unavailable.""" | |
| pass | |
| class AssistantMessage: # type: ignore[no-redef] | |
| """Fallback stub for AssistantMessage when ag_ui is unavailable.""" | |
| pass | |
| class TextInputContent: # type: ignore[no-redef] | |
| """Fallback stub for TextInputContent when ag_ui is unavailable.""" | |
| pass | |
| class EventEncoder: # type: ignore[no-redef] | |
| """Fallback stub for EventEncoder when ag_ui is unavailable.""" | |
| pass |
| session_id = payload.thread_id if hasattr(payload, 'thread_id') else None | ||
|
|
||
| # Emit RUN_STARTED event | ||
| run_started = RunStartedEvent( | ||
| run_id=payload.run_id, | ||
| thread_id=payload.thread_id |
There was a problem hiding this comment.
Using hasattr to check for thread_id and then accessing payload.thread_id without the hasattr check at line 97 creates inconsistent logic. If thread_id doesn't exist as an attribute, line 97 will raise an AttributeError. Either use getattr with a default value, or ensure thread_id always exists on RunAgentInput.
| session_id = payload.thread_id if hasattr(payload, 'thread_id') else None | |
| # Emit RUN_STARTED event | |
| run_started = RunStartedEvent( | |
| run_id=payload.run_id, | |
| thread_id=payload.thread_id | |
| thread_id = getattr(payload, "thread_id", None) | |
| session_id = thread_id | |
| # Emit RUN_STARTED event | |
| run_started = RunStartedEvent( | |
| run_id=payload.run_id, | |
| thread_id=thread_id |
| assistant_msg, session_id = create_user_message( | ||
| db=db, | ||
| message=user_message_text, | ||
| session_id=session_id | ||
| ) |
There was a problem hiding this comment.
The session_id returned from create_user_message is assigned but never used after line 106. The function already computed session_id at line 92, and create_user_message may return a different session_id if a new session was created. Consider whether the updated session_id should be used in subsequent events or remove the assignment if it's not needed.
|
|
||
| This endpoint accepts RunAgentInput and returns a stream of AG-UI events. | ||
| """ | ||
| return StreamingResponse( | ||
| process_agent_message(db, payload) |
There was a problem hiding this comment.
The docstring mentions returning "a stream of AG-UI events" but doesn't specify the format of these events or the expected media type. Consider adding more detail about the event format (e.g., whether they're JSON, SSE, newline-delimited) and the expected content-type to help API consumers understand how to consume the stream.
| This endpoint accepts RunAgentInput and returns a stream of AG-UI events. | |
| """ | |
| return StreamingResponse( | |
| process_agent_message(db, payload) | |
| This endpoint accepts a :class:`RunAgentInput` payload and returns a | |
| :class:`fastapi.responses.StreamingResponse` that streams AG-UI events. | |
| The response body is a UTF-8 text stream where events are encoded as | |
| Server-Sent Events (SSE) in standard format (e.g. ``data: ...`` lines | |
| separated by a blank line). The HTTP ``Content-Type`` of the response is | |
| ``text/event-stream; charset=utf-8``, allowing AG-UI clients to consume | |
| the stream as SSE. | |
| """ | |
| return StreamingResponse( | |
| process_agent_message(db, payload), | |
| media_type="text/event-stream", |
| This endpoint accepts RunAgentInput and returns a stream of AG-UI events. | ||
| """ | ||
| return StreamingResponse( | ||
| process_agent_message(db, payload) |
There was a problem hiding this comment.
StreamingResponse requires a media_type parameter to be set for proper content-type headers. The AG-UI protocol likely expects a specific media type (such as "text/event-stream" for server-sent events). Add the media_type parameter to StreamingResponse to ensure proper streaming behavior.
| process_agent_message(db, payload) | |
| process_agent_message(db, payload), | |
| media_type="text/event-stream", |
| last_message = payload.messages[-1] | ||
| if hasattr(last_message, 'content'): | ||
| if isinstance(last_message.content, list) and len(last_message.content) > 0: | ||
| user_message_text = last_message.content[0].text |
There was a problem hiding this comment.
Accessing last_message.content[0].text without checking if the list item has a 'text' attribute could raise an AttributeError. Add a check to ensure the content item has a 'text' attribute before accessing it, or wrap this access in a try-except block.
| user_message_text = last_message.content[0].text | |
| first_item = last_message.content[0] | |
| if hasattr(first_item, "text"): | |
| user_message_text = first_item.text |
| except Exception as e: | ||
| # In case of error, we should emit a RUN_ERROR event | ||
| # For now, we'll just re-raise the exception | ||
| raise |
There was a problem hiding this comment.
The exception handling catches all exceptions but only re-raises them without emitting any AG-UI protocol events. As noted in the comment, a RUN_ERROR event should be emitted before re-raising. This leaves clients in an inconsistent state since they received RUN_STARTED but not RUN_FINISHED or RUN_ERROR. Consider implementing proper error event emission to maintain protocol consistency.
| if isinstance(last_message.content, list) and len(last_message.content) > 0: | ||
| user_message_text = last_message.content[0].text | ||
| elif isinstance(last_message.content, str): | ||
| user_message_text = last_message.content |
There was a problem hiding this comment.
If payload.messages is an empty list, user_message_text will remain an empty string. The function will then call create_user_message with an empty message, which may not be the intended behavior. Consider adding validation to ensure a non-empty message is provided, or handle this case explicitly.
| user_message_text = last_message.content | |
| user_message_text = last_message.content | |
| # Validate that we have a non-empty user message | |
| if not user_message_text or not user_message_text.strip(): | |
| raise ValueError("User message text is empty") |
| assistant_msg, session_id = create_user_message( | ||
| db=db, | ||
| message=user_message_text, | ||
| session_id=session_id | ||
| ) |
There was a problem hiding this comment.
The create_user_message function performs database commits within the streaming generator. If the streaming response fails or is cancelled mid-stream after the commit, the database will contain partial data (user message and assistant message saved) but the client won't receive the complete event stream. Consider using database transactions that can be rolled back if the stream fails, or restructure to commit only after all events are successfully yielded.
🔍 PR Validation Results
|
This reverts commit 7b6579c.
No description provided.