Skip to content

Commit 35f2bf2

Browse files
authored
feat: Add /stream endpoint support for base and community agents (#20)
* add streaming for agent * add stream to rag agent + set it to only vector store id support * add stream to websearch agent * bare api agent chat stream deploy works * removed not needed lib
1 parent 8a89678 commit 35f2bf2

13 files changed

Lines changed: 425 additions & 81 deletions

File tree

agents/base/langgraph_react_agent/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,22 @@ COPY the route URL and PASTE into the CURL below
182182
oc get route langgraph-react-agent -o jsonpath='{.spec.host}'
183183
```
184184

185-
Send a test request:
185+
Send a test request on `/chat` endpoint
186186

187187
```bash
188188
curl -X POST https://<YOUR_ROUTE_URL>/chat \
189189
-H "Content-Type: application/json" \
190190
-d '{"message": "What is the best company? Answer with the first correct answer."}'
191191
```
192192

193+
Send a test request on `/stream` endpoint
194+
195+
```bash
196+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
197+
-H "Content-Type: application/json" \
198+
-d '{"message": "What is the best company? Answer with the first correct answer."}'
199+
```
200+
193201
## Agent-Specific Documentation
194202

195203
Each agent has detailed documentation for setup and deployment:

agents/base/langgraph_react_agent/main.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import json
2+
import logging
23
from contextlib import asynccontextmanager
34
from os import getenv
45

56
from fastapi import FastAPI, HTTPException
7+
from fastapi.responses import StreamingResponse
68
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
79
from pydantic import BaseModel
810

11+
logger = logging.getLogger(__name__)
12+
913
from langgraph_react_agent_base.agent import get_graph_closure
1014

1115

@@ -137,6 +141,71 @@ async def chat(request: ChatRequest):
137141
)
138142

139143

144+
@app.post("/stream")
145+
async def stream(request: ChatRequest):
146+
"""
147+
Streaming chat endpoint that accepts a message and returns the agent's
148+
response as Server-Sent Events (SSE).
149+
150+
Event types:
151+
- token: streamed text token from the LLM
152+
- tool_call: tool invocation by the agent
153+
- tool_result: result returned by a tool
154+
- done: signals the stream is complete
155+
156+
Args:
157+
request: ChatRequest containing the user message
158+
"""
159+
global agent_graph
160+
161+
if agent_graph is None:
162+
raise HTTPException(status_code=503, detail="Agent not initialized")
163+
164+
async def event_generator():
165+
try:
166+
messages = [HumanMessage(content=request.message)]
167+
168+
async for event in agent_graph.astream_events(
169+
{"messages": messages},
170+
config={"recursion_limit": 10},
171+
version="v2",
172+
):
173+
kind = event["event"]
174+
175+
# LLM streaming tokens
176+
if kind == "on_chat_model_stream":
177+
chunk = event["data"]["chunk"]
178+
if chunk.content:
179+
yield f"event: token\ndata: {json.dumps({'content': chunk.content})}\n\n"
180+
181+
# Complete tool call (after LLM finishes generating the call)
182+
elif kind == "on_chat_model_end":
183+
message = event["data"]["output"]
184+
if hasattr(message, "tool_calls") and message.tool_calls:
185+
for tc in message.tool_calls:
186+
yield f"event: tool_call\ndata: {json.dumps({'name': tc['name'], 'args': tc['args']})}\n\n"
187+
188+
# Tool execution results
189+
elif kind == "on_tool_end":
190+
output = event["data"].get("output", "")
191+
# Extract content from ToolMessage if present
192+
if hasattr(output, "content"):
193+
output = output.content
194+
yield f"event: tool_result\ndata: {json.dumps({'name': event.get('name', ''), 'output': str(output)})}\n\n"
195+
196+
yield "event: done\ndata: {}\n\n"
197+
198+
except Exception as e:
199+
logger.exception("Error in stream event_generator")
200+
yield f"event: error\ndata: {json.dumps({'detail': 'Internal server error'})}\n\n"
201+
202+
return StreamingResponse(
203+
event_generator(),
204+
media_type="text/event-stream",
205+
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
206+
)
207+
208+
140209
@app.get("/health")
141210
async def health():
142211
"""Return service health and whether the agent graph has been initialized."""

agents/base/llamaindex_websearch_agent/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,32 @@ oc get route llamaindex-websearch-agent -o jsonpath='{.spec.host}'
183183

184184
Send a test request:
185185

186+
/chat endpoint
187+
186188
```bash
187189
curl -X POST https://<YOUR_ROUTE_URL>/chat \
188190
-H "Content-Type: application/json" \
189191
-d '{"message": "Which company is consider the best?"}'
190192
```
191193

194+
/stream endpoint
195+
Classic Print
196+
197+
```bash
198+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
199+
-H "Content-Type: application/json" \
200+
-d '{"message": "Which company is consider the best?"}'
201+
```
202+
203+
Pretty Printed Stream
204+
205+
```bash
206+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
207+
-H "Content-Type: application/json" \
208+
-d '{"message": "Which company is consider the best?"}' |
209+
jq -R -r -j --stream 'scan("^data:(.*)")[] | fromjson.content // empty'
210+
```
211+
192212
---
193213

194214
## Agent-Specific Documentation

agents/base/llamaindex_websearch_agent/main.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import json
2+
import logging
23
from contextlib import asynccontextmanager
34
from os import getenv
45

56
from fastapi import FastAPI, HTTPException
7+
from fastapi.responses import StreamingResponse
68
from llama_index_workflow_agent_base.agent import get_workflow_closure
9+
from llama_index_workflow_agent_base.workflow import ToolCallEvent, InputEvent
710
from pydantic import BaseModel
811

12+
logger = logging.getLogger(__name__)
13+
914

1015
# Request/Response models
1116
class ChatRequest(BaseModel):
@@ -189,6 +194,66 @@ async def chat(request: ChatRequest):
189194
)
190195

191196

197+
@app.post("/stream")
198+
async def stream(request: ChatRequest):
199+
"""
200+
Streaming chat endpoint that accepts a message and returns the agent's
201+
response as Server-Sent Events (SSE).
202+
203+
Event types:
204+
- tool_call: tool invocation by the agent
205+
- tool_result: result returned by a tool
206+
- token: final answer text
207+
- done: signals the stream is complete
208+
209+
Args:
210+
request: ChatRequest containing the user message
211+
"""
212+
global get_agent
213+
214+
if get_agent is None:
215+
raise HTTPException(status_code=503, detail="Agent not initialized")
216+
217+
async def event_generator():
218+
try:
219+
agent = get_agent()
220+
messages = [{"role": "user", "content": request.message}]
221+
222+
handler = agent.run(input=messages)
223+
224+
async for event in handler.stream_events():
225+
if isinstance(event, ToolCallEvent):
226+
for tc in event.tool_calls:
227+
yield f"event: tool_call\ndata: {json.dumps({'name': tc.tool_name, 'args': tc.tool_kwargs})}\n\n"
228+
229+
elif isinstance(event, InputEvent):
230+
# Check if the last message is a tool result
231+
if event.input:
232+
last_msg = event.input[-1]
233+
if getattr(last_msg, "role", None) == "tool":
234+
additional = getattr(last_msg, "additional_kwargs", {}) or {}
235+
yield f"event: tool_result\ndata: {json.dumps({'name': additional.get('name', ''), 'output': _get_message_content(last_msg)})}\n\n"
236+
237+
result = await handler
238+
# Extract final answer from the result
239+
if result and "response" in result:
240+
content = _get_message_content(result["response"].message)
241+
if content:
242+
yield f"event: token\ndata: {json.dumps({'content': content})}\n\n"
243+
244+
yield "event: done\ndata: {}\n\n"
245+
246+
except Exception as e:
247+
logger.exception("Error in stream event_generator")
248+
yield f"event: error\ndata: {json.dumps({'detail': 'Internal server error'})}\n\n"
249+
250+
return StreamingResponse(
251+
event_generator(),
252+
media_type="text/event-stream",
253+
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
254+
)
255+
256+
192257
@app.get("/health")
193258
async def health():
194259
"""Return service health and whether the workflow closure has been initialized."""

agents/base/openai_responses_agent/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,32 @@ oc get route openai-responses-agent -o jsonpath='{.spec.host}'
174174
```
175175

176176
Send a test request:
177+
/chat endpoint
177178

178179
```bash
179180
curl -X POST https://<YOUR_ROUTE_URL>/chat \
180181
-H "Content-Type: application/json" \
181182
-d '{"message": "How much does a Lenovo Laptop cost and what are the reviews?"}'
182183
```
183184

185+
/stream endpoint
186+
Classic Print
187+
188+
```bash
189+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
190+
-H "Content-Type: application/json" \
191+
-d '{"message": "How much does a Lenovo Laptop cost and what are the reviews?"}'
192+
```
193+
194+
Pretty Printed Stream
195+
196+
```bash
197+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
198+
-H "Content-Type: application/json" \
199+
-d '{"message": "How much does a Lenovo Laptop cost and what are the reviews?"}' |
200+
jq -R -r -j --stream 'scan("^data:(.*)")[] | fromjson.content // empty'
201+
```
202+
184203
---
185204

186205
## Agent-Specific Documentation

agents/base/openai_responses_agent/main.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import asyncio
2+
import json
3+
import logging
14
from contextlib import asynccontextmanager
25
from os import getenv
36

47
from fastapi import FastAPI, HTTPException
5-
from openai_responses_agent_base.agent import get_agent_closure
8+
from fastapi.responses import StreamingResponse
9+
from openai_responses_agent_base.agent import get_agent_closure, AIAgent
610
from pydantic import BaseModel
711

12+
logger = logging.getLogger(__name__)
13+
814

915
# Request/Response models
1016
class ChatRequest(BaseModel):
@@ -81,6 +87,75 @@ async def chat(request: ChatRequest):
8187
)
8288

8389

90+
@app.post("/stream")
91+
async def stream(request: ChatRequest):
92+
"""
93+
Streaming chat endpoint that accepts a message and returns the agent's
94+
response as Server-Sent Events (SSE).
95+
96+
Event types:
97+
- tool_call: tool invocation by the agent
98+
- tool_result: result returned by a tool (observation)
99+
- token: final answer text
100+
- done: signals the stream is complete
101+
102+
Args:
103+
request: ChatRequest containing the user message
104+
"""
105+
global get_agent
106+
107+
if get_agent is None:
108+
raise HTTPException(status_code=503, detail="Agent not initialized")
109+
110+
async def event_generator():
111+
try:
112+
queue: asyncio.Queue = asyncio.Queue()
113+
114+
def on_event(event_type: str, data: dict):
115+
queue.put_nowait((event_type, data))
116+
117+
def run_agent():
118+
adapter = get_agent()
119+
agent = AIAgent(
120+
model=adapter._model_id,
121+
base_url=adapter._base_url,
122+
api_key=adapter._api_key,
123+
)
124+
for name, func in adapter._tools:
125+
agent.register_tool(name, func)
126+
return agent.query(request.message, on_event=on_event)
127+
128+
task = asyncio.get_event_loop().run_in_executor(None, run_agent)
129+
130+
while not task.done():
131+
try:
132+
event_type, data = await asyncio.wait_for(queue.get(), timeout=0.1)
133+
yield f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
134+
except asyncio.TimeoutError:
135+
continue
136+
137+
# Drain remaining events
138+
while not queue.empty():
139+
event_type, data = queue.get_nowait()
140+
yield f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
141+
142+
answer = task.result()
143+
if answer:
144+
yield f"event: token\ndata: {json.dumps({'content': answer})}\n\n"
145+
146+
yield "event: done\ndata: {}\n\n"
147+
148+
except Exception:
149+
logger.exception("Error in stream event_generator")
150+
yield f"event: error\ndata: {json.dumps({'detail': 'Internal server error'})}\n\n"
151+
152+
return StreamingResponse(
153+
event_generator(),
154+
media_type="text/event-stream",
155+
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
156+
)
157+
158+
84159
@app.get("/health")
85160
async def health():
86161
"""Return service health and whether the agent has been initialized."""

agents/base/openai_responses_agent/src/openai_responses_agent_base/agent.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,14 @@ def _execute(self) -> str:
233233
)
234234
return _get_output_text_from_response(response)
235235

236-
def query(self, question: str, max_turns: int = 10) -> Optional[str]:
236+
def query(self, question: str, max_turns: int = 10, on_event: Optional[Callable] = None) -> Optional[str]:
237237
"""
238238
Process a question through multiple turns until getting final answer.
239239
240240
Args:
241241
question: The input question to process.
242242
max_turns: Maximum number of turns before timing out.
243+
on_event: Optional callback(event_type, data) for streaming events.
243244
244245
Returns:
245246
The final answer or None if no answer found.
@@ -268,11 +269,18 @@ def query(self, question: str, max_turns: int = 10) -> Optional[str]:
268269
action, args_str = actions[0].groups()
269270
action_inputs = self._parse_arguments(args_str)
270271

272+
if on_event:
273+
on_event("tool_call", {"name": action, "args": action_inputs})
274+
271275
tool = self.tools.get(action)
272276
if not tool:
273277
raise ValueError(f"Unknown action: {action}")
274278

275279
observation = tool(*action_inputs)
280+
281+
if on_event:
282+
on_event("tool_result", {"name": action, "output": str(observation)})
283+
276284
next_prompt = f"Observation: {observation}"
277285
else:
278286
# No Action: line – treat the whole response as the final answer

0 commit comments

Comments
 (0)