Skip to content

Commit a7bc7f9

Browse files
committed
bare api agent chat stream deploy works
1 parent f8eee76 commit a7bc7f9

3 files changed

Lines changed: 104 additions & 2 deletions

File tree

agents/base/openai_responses_agent/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,32 @@ oc get route openai-responses-agent -o jsonpath='{.spec.host}'
174174
```
175175

176176
Send a test request:
177+
/chat endpoint
177178

178179
```bash
179180
curl -X POST https://<YOUR_ROUTE_URL>/chat \
180181
-H "Content-Type: application/json" \
181182
-d '{"message": "How much does a Lenovo Laptop cost and what are the reviews?"}'
182183
```
183184

185+
/stream endpoint
186+
Classic Print
187+
188+
```bash
189+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
190+
-H "Content-Type: application/json" \
191+
-d '{"message": "How much does a Lenovo Laptop cost and what are the reviews?"}'
192+
```
193+
194+
Pretty Printed Stream
195+
196+
```bash
197+
curl -X POST https://<YOUR_ROUTE_URL>/stream \
198+
-H "Content-Type: application/json" \
199+
-d '{"message": "How much does a Lenovo Laptop cost and what are the reviews?"}' |
200+
jq -R -r -j --stream 'scan("^data:(.*)")[] | fromjson.content // empty'
201+
```
202+
184203
---
185204

186205
## Agent-Specific Documentation

agents/base/openai_responses_agent/main.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import asyncio
2+
import json
3+
import logging
14
from contextlib import asynccontextmanager
25
from os import getenv
36

47
from fastapi import FastAPI, HTTPException
5-
from openai_responses_agent_base.agent import get_agent_closure
8+
from fastapi.responses import StreamingResponse
9+
from openai_responses_agent_base.agent import get_agent_closure, AIAgent
610
from pydantic import BaseModel
711

12+
logger = logging.getLogger(__name__)
13+
814

915
# Request/Response models
1016
class ChatRequest(BaseModel):
@@ -81,6 +87,75 @@ async def chat(request: ChatRequest):
8187
)
8288

8389

90+
@app.post("/stream")
91+
async def stream(request: ChatRequest):
92+
"""
93+
Streaming chat endpoint that accepts a message and returns the agent's
94+
response as Server-Sent Events (SSE).
95+
96+
Event types:
97+
- tool_call: tool invocation by the agent
98+
- tool_result: result returned by a tool (observation)
99+
- token: final answer text
100+
- done: signals the stream is complete
101+
102+
Args:
103+
request: ChatRequest containing the user message
104+
"""
105+
global get_agent
106+
107+
if get_agent is None:
108+
raise HTTPException(status_code=503, detail="Agent not initialized")
109+
110+
async def event_generator():
111+
try:
112+
queue: asyncio.Queue = asyncio.Queue()
113+
114+
def on_event(event_type: str, data: dict):
115+
queue.put_nowait((event_type, data))
116+
117+
def run_agent():
118+
adapter = get_agent()
119+
agent = AIAgent(
120+
model=adapter._model_id,
121+
base_url=adapter._base_url,
122+
api_key=adapter._api_key,
123+
)
124+
for name, func in adapter._tools:
125+
agent.register_tool(name, func)
126+
return agent.query(request.message, on_event=on_event)
127+
128+
task = asyncio.get_event_loop().run_in_executor(None, run_agent)
129+
130+
while not task.done():
131+
try:
132+
event_type, data = await asyncio.wait_for(queue.get(), timeout=0.1)
133+
yield f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
134+
except asyncio.TimeoutError:
135+
continue
136+
137+
# Drain remaining events
138+
while not queue.empty():
139+
event_type, data = queue.get_nowait()
140+
yield f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
141+
142+
answer = task.result()
143+
if answer:
144+
yield f"event: token\ndata: {json.dumps({'content': answer})}\n\n"
145+
146+
yield "event: done\ndata: {}\n\n"
147+
148+
except Exception:
149+
logger.exception("Error in stream event_generator")
150+
yield f"event: error\ndata: {json.dumps({'detail': 'Internal server error'})}\n\n"
151+
152+
return StreamingResponse(
153+
event_generator(),
154+
media_type="text/event-stream",
155+
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
156+
)
157+
158+
84159
@app.get("/health")
85160
async def health():
86161
"""Return service health and whether the agent has been initialized."""

agents/base/openai_responses_agent/src/openai_responses_agent_base/agent.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,14 @@ def _execute(self) -> str:
233233
)
234234
return _get_output_text_from_response(response)
235235

236-
def query(self, question: str, max_turns: int = 10) -> Optional[str]:
236+
def query(self, question: str, max_turns: int = 10, on_event: Optional[Callable] = None) -> Optional[str]:
237237
"""
238238
Process a question through multiple turns until getting final answer.
239239
240240
Args:
241241
question: The input question to process.
242242
max_turns: Maximum number of turns before timing out.
243+
on_event: Optional callback(event_type, data) for streaming events.
243244
244245
Returns:
245246
The final answer or None if no answer found.
@@ -268,11 +269,18 @@ def query(self, question: str, max_turns: int = 10) -> Optional[str]:
268269
action, args_str = actions[0].groups()
269270
action_inputs = self._parse_arguments(args_str)
270271

272+
if on_event:
273+
on_event("tool_call", {"name": action, "args": action_inputs})
274+
271275
tool = self.tools.get(action)
272276
if not tool:
273277
raise ValueError(f"Unknown action: {action}")
274278

275279
observation = tool(*action_inputs)
280+
281+
if on_event:
282+
on_event("tool_result", {"name": action, "output": str(observation)})
283+
276284
next_prompt = f"Observation: {observation}"
277285
else:
278286
# No Action: line – treat the whole response as the final answer

0 commit comments

Comments
 (0)