Skip to content

Commit 3ec37ea

Browse files
Add streaming
1 parent 804ab1c commit 3ec37ea

2 files changed

Lines changed: 185 additions & 28 deletions

File tree

agents/autogen/mcp_agent/README.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ uv pip install -e .
105105
uv run uvicorn main:app --host 0.0.0.0 --port 8080
106106
```
107107

108-
3. Test: `curl -X POST http://localhost:8080/chat/completions -H "Content-Type: application/json" -d '{"message": "What is 2+2? Use a tool if needed."}'`
108+
3. Test: `curl -X POST http://localhost:8080/chat/completions -H "Content-Type: application/json" -d '{"message": "What is 2+7? Use a tool!"}'`
109109

110110
---
111111

@@ -119,9 +119,19 @@ Test the `/chat` endpoint:
119119
```bash
120120
curl -X POST http://localhost:8080/chat/completions \
121121
-H "Content-Type: application/json" \
122-
-d '{"message": "What is 2+2? Use a tool if needed."}'
122+
-d '{"message": "What is 2+7? Use a tool!"}'
123123
```
124124

125+
**Streaming** (OpenAI-style SSE: `chat.completion.chunk` lines, then `data: [DONE]`):
126+
127+
```bash
128+
curl -sN -X POST http://localhost:8080/chat/completions \
129+
-H "Content-Type: application/json" \
130+
-d '{"messages": [{"role": "user", "content": "What is 2+2? Use a tool if needed."}], "stream": true}'
131+
```
132+
133+
You can use either `"message": "..."` or `"messages": [...]`; set `"stream": true` for token-by-token chunks.
134+
125135
Optional: interactive chat with MCP tools (LangGraph) — from the `mcp_automl_template` directory:
126136

127137
```bash
@@ -186,6 +196,14 @@ curl -X POST https://<YOUR_ROUTE_URL>/chat/completions \
186196
-d '{"message": "Predict churn for this customer: Male, 12 months tenure, fiber optic, month-to-month contract, electronic check, monthly 70.35, total 800.40."}'
187197
```
188198

199+
**Streaming** (`stream: true`): response is Server-Sent Events (`chat.completion.chunk` per line, then `data: [DONE]`). Use `curl -N` or `-sN` so chunks print live:
200+
201+
```bash
202+
curl -sN -X POST https://<YOUR_ROUTE_URL>/chat/completions \
203+
-H "Content-Type: application/json" \
204+
-d '{"messages": [{"role": "user", "content": "What is 17 + 25? Use your tools."}], "stream": true}'
205+
```
206+
189207
---
190208

191209
## Troubleshooting

agents/autogen/mcp_agent/main.py

Lines changed: 165 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,94 @@
11
import asyncio
2+
import json
23
import logging
34
import os
4-
from os import getenv
5+
import time
56
import traceback
7+
import uuid
68
from contextlib import asynccontextmanager
9+
from os import getenv
710

8-
from fastapi import FastAPI, HTTPException
9-
from pydantic import BaseModel
11+
from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TextMessage
1012
from autogen_core import CancellationToken
11-
from autogen_agentchat.messages import TextMessage
1213
from autogen_ext.tools.mcp import (
1314
SseServerParams,
1415
create_mcp_server_session,
1516
mcp_server_tools,
1617
)
18+
from dotenv import load_dotenv
19+
from fastapi import FastAPI, HTTPException
20+
from fastapi.responses import StreamingResponse
21+
from pydantic import BaseModel, Field, model_validator
1722

1823
from autogen_agent_base.agent import get_agent_chat
19-
from dotenv import load_dotenv
2024

2125
load_dotenv()
2226

2327

24-
# Request/Response models (same API shape as before)
28+
class ChatMessage(BaseModel):
29+
"""OpenAI-style message (optional alternative to `message`)."""
30+
31+
role: str = Field(..., description="user, assistant, or system")
32+
content: str = Field(..., description="Message text")
33+
34+
2535
class ChatRequest(BaseModel):
26-
"""Incoming chat request body for the /chat endpoint."""
36+
"""Chat request: use `message` or `messages` (last user wins); set `stream` for SSE chunks."""
2737

28-
message: str
38+
message: str | None = Field(
39+
None, description="Single user message (simplest). Ignored if `messages` is set."
40+
)
41+
messages: list[ChatMessage] | None = Field(
42+
None,
43+
description="OpenAI-style list; last user message is used as the task.",
44+
)
45+
stream: bool = Field(
46+
False,
47+
description="If true, response is SSE: chat.completion.chunk events, then data: [DONE].",
48+
)
49+
50+
@model_validator(mode="after")
51+
def _need_user_input(self):
52+
if self.messages:
53+
if not any(m.role == "user" for m in self.messages):
54+
raise ValueError("messages must include at least one role=user entry")
55+
return self
56+
if self.message is not None and self.message != "":
57+
return self
58+
raise ValueError("Provide non-empty `message` or `messages` with a user turn")
59+
60+
def user_task(self) -> str:
61+
if self.messages:
62+
for m in reversed(self.messages):
63+
if m.role == "user":
64+
return m.content
65+
return self.message or ""
2966

3067

3168
class ChatResponse(BaseModel):
32-
"""Structured chat response with message history."""
69+
"""Non-streaming chat response (user + assistant messages)."""
70+
71+
messages: list[dict] = Field(
72+
...,
73+
description="Conversation turns; last entry is typically the assistant reply.",
74+
)
75+
finish_reason: str = Field(
76+
...,
77+
description="Why generation stopped (e.g. stop).",
78+
examples=["stop"],
79+
)
3380

34-
messages: list[dict]
35-
finish_reason: str
81+
82+
class HealthResponse(BaseModel):
83+
"""Service health status."""
84+
85+
status: str = Field(
86+
..., description="Current service status.", examples=["healthy"]
87+
)
88+
agent_initialized: bool = Field(
89+
...,
90+
description="Whether the MCP-backed agent is ready to serve requests.",
91+
)
3692

3793

3894
MCP_SYSTEM_PROMPT = (
@@ -97,16 +153,38 @@ async def lifespan(app: FastAPI):
97153

98154
app = FastAPI(
99155
title="AutoGen Agent API (MCP)",
100-
description="FastAPI service for AutoGen AssistantAgent with MCP tools (same behavior as interact_with_mcp.py)",
156+
description="FastAPI service for AutoGen AssistantAgent with MCP tools. "
157+
"POST /chat/completions with `message` or `messages`; set `stream: true` for OpenAI-style SSE chunks.",
101158
lifespan=lifespan,
159+
openapi_tags=[
160+
{"name": "Chat", "description": "Chat completion operations"},
161+
{"name": "Health", "description": "Service health monitoring"},
162+
],
102163
)
103164

104165

105-
@app.post("/chat/completions", response_model=ChatResponse)
166+
def _assistant_content_from_result(result) -> str:
167+
if not result.messages:
168+
return ""
169+
last = result.messages[-1]
170+
if isinstance(last, TextMessage):
171+
return last.content or ""
172+
return getattr(last, "content", None) or str(last)
173+
174+
175+
@app.post(
176+
"/chat/completions",
177+
response_model=ChatResponse,
178+
summary="Create chat completion",
179+
description=(
180+
"Creates a model response for the given chat conversation. "
181+
"When `stream=false`, returns a complete JSON body with `messages` and `finish_reason`. "
182+
"When `stream=true`, returns Server-Sent Events with `chat.completion.chunk` deltas "
183+
"(OpenAI-style `data: {json}\\n\\n`), terminated by `data: [DONE]\\n\\n`."
184+
),
185+
tags=["Chat"],
186+
)
106187
async def chat(request: ChatRequest):
107-
"""
108-
Chat endpoint: accepts a message, runs the MCP-backed AutoGen agent, returns the response.
109-
"""
110188
agent = getattr(app.state, "mcp_agent", None)
111189
if agent is None:
112190
err = (
@@ -115,19 +193,76 @@ async def chat(request: ChatRequest):
115193
)
116194
raise HTTPException(status_code=503, detail=err)
117195

196+
user_text = request.user_task()
197+
model_id = getenv("MODEL_ID") or "model"
198+
199+
if request.stream:
200+
201+
async def event_generator():
202+
completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
203+
created = int(time.time())
204+
cancel_token = CancellationToken()
205+
logger = logging.getLogger(__name__)
206+
try:
207+
async for ev in agent.run_stream(
208+
task=user_text,
209+
cancellation_token=cancel_token,
210+
):
211+
if isinstance(ev, ModelClientStreamingChunkEvent) and ev.content:
212+
data = {
213+
"id": completion_id,
214+
"object": "chat.completion.chunk",
215+
"created": created,
216+
"model": model_id,
217+
"choices": [
218+
{
219+
"index": 0,
220+
"delta": {"content": ev.content},
221+
"finish_reason": None,
222+
}
223+
],
224+
}
225+
yield f"data: {json.dumps(data)}\n\n"
226+
227+
final_data = {
228+
"id": completion_id,
229+
"object": "chat.completion.chunk",
230+
"created": created,
231+
"model": model_id,
232+
"choices": [
233+
{
234+
"index": 0,
235+
"delta": {},
236+
"finish_reason": "stop",
237+
}
238+
],
239+
}
240+
yield f"data: {json.dumps(final_data)}\n\n"
241+
yield "data: [DONE]\n\n"
242+
except Exception:
243+
logger.exception("Error in stream event_generator")
244+
err = {
245+
"error": {
246+
"message": "Internal server error",
247+
"type": "server_error",
248+
}
249+
}
250+
yield f"data: {json.dumps(err)}\n\n"
251+
252+
return StreamingResponse(
253+
event_generator(),
254+
media_type="text/event-stream",
255+
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
256+
)
257+
118258
try:
119259
cancel_token = CancellationToken()
120260
result = await agent.run(
121-
task=request.message,
261+
task=user_text,
122262
cancellation_token=cancel_token,
123263
)
124-
response_messages = [{"role": "user", "content": request.message}]
125-
content = ""
126-
if result.messages:
127-
last = result.messages[-1]
128-
content = getattr(last, "content", None) or str(last)
129-
if isinstance(last, TextMessage):
130-
content = last.content or ""
264+
response_messages = [{"role": "user", "content": user_text}]
265+
content = _assistant_content_from_result(result)
131266
response_messages.append({"role": "assistant", "content": content})
132267
return ChatResponse(messages=response_messages, finish_reason="stop")
133268
except Exception as e:
@@ -136,9 +271,13 @@ async def chat(request: ChatRequest):
136271
)
137272

138273

139-
@app.get("/health")
274+
@app.get(
275+
"/health",
276+
response_model=HealthResponse,
277+
summary="Health check",
278+
tags=["Health"],
279+
)
140280
async def health():
141-
"""Return service health and whether the MCP agent is ready."""
142281
return {
143282
"status": "healthy",
144283
"agent_initialized": getattr(app.state, "mcp_agent", None) is not None,

0 commit comments

Comments
 (0)