Skip to content

Commit 24e7f81

Browse files
committed
/chat/completions
for Llama index and open ai API
1 parent 55ad1b5 commit 24e7f81

5 files changed

Lines changed: 355 additions & 123 deletions

File tree

agents/base/crewai_websearch_agent/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ Send a test request:
177177
Non-streaming
178178

179179
```bash
180-
curl -X POST https://crewai-websearch-agent-tguzik-agents.apps.rosa.ai-eng-gpu.socc.p3.openshiftapps.com/chat/completions \
180+
curl -X POST https://<YOUR_ROUTE_URL>/chat/completions \
181181
-H "Content-Type: application/json" \
182182
-d '{"messages": [{"role": "user", "content": "What is the best cluster hosting service?"}], "stream": false}'
183183
```

agents/base/llamaindex_websearch_agent/README.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,30 +183,29 @@ oc get route llamaindex-websearch-agent -o jsonpath='{.spec.host}'
183183

184184
Send a test request:
185185

186-
/chat endpoint
186+
Non-streaming
187187

188188
```bash
189-
curl -X POST https://<YOUR_ROUTE_URL>/chat \
189+
curl -X POST https://<YOUR_ROUTE_URL>/chat/completions \
190190
-H "Content-Type: application/json" \
191-
-d '{"message": "Which company is consider the best?"}'
191+
-d '{"messages": [{"role": "user", "content": "Which company is consider the best?"}], "stream": false}'
192192
```
193193

194-
/stream endpoint
195-
Classic Print
194+
Streaming
196195

197196
```bash
198-
curl -X POST https://<YOUR_ROUTE_URL>/stream \
197+
curl -X POST https://<YOUR_ROUTE_URL>/chat/completions \
199198
-H "Content-Type: application/json" \
200-
-d '{"message": "Which company is consider the best?"}'
199+
-d '{"messages": [{"role": "user", "content": "Which company is consider the best?"}], "stream": true}'
201200
```
202201

203202
Pretty Printed Stream
204203

205204
```bash
206-
curl -X POST https://<YOUR_ROUTE_URL>/stream \
205+
curl -X POST https://<YOUR_ROUTE_URL>/chat/completions \
207206
-H "Content-Type: application/json" \
208-
-d '{"message": "Which company is consider the best?"}' |
209-
jq -R -r -j --stream 'scan("^data:(.*)")[] | fromjson.content // empty'
207+
-d '{"messages": [{"role": "user", "content": "Which company is consider the best?"}], "stream": true}' |
208+
jq -R -r -j --stream 'scan("^data:(.*)")[] | fromjson.choices[0].delta.content // empty'
210209
```
211210

212211
---

agents/base/llamaindex_websearch_agent/main.py

Lines changed: 163 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22
import logging
3+
import time
4+
import uuid
35
from contextlib import asynccontextmanager
46
from os import getenv
57

@@ -12,18 +14,18 @@
1214
logger = logging.getLogger(__name__)
1315

1416

15-
# Request/Response models
16-
class ChatRequest(BaseModel):
17-
"""Incoming chat request body for the /chat endpoint."""
17+
# OpenAI-compatible request/response models
18+
class ChatMessage(BaseModel):
19+
role: str
20+
content: str
1821

19-
message: str
2022

23+
class ChatCompletionRequest(BaseModel):
24+
"""OpenAI-compatible chat completion request."""
2125

22-
class ChatResponse(BaseModel):
23-
"""Structured chat response (answer and optional steps)."""
24-
25-
answer: str
26-
steps: list[str]
26+
messages: list[ChatMessage]
27+
model: str | None = None
28+
stream: bool = False
2729

2830

2931
# Global variable for workflow closure (get_agent callable)
@@ -32,27 +34,19 @@ class ChatResponse(BaseModel):
3234

3335
@asynccontextmanager
3436
async def lifespan(app: FastAPI):
35-
"""Initialize the LlamaIndex workflow closure on startup and clear it on shutdown.
36-
37-
Reads BASE_URL and MODEL_ID from the environment, builds the workflow via
38-
get_workflow_closure, and sets the global get_agent for the /chat endpoint.
39-
"""
37+
"""Initialize the LlamaIndex workflow closure on startup and clear it on shutdown."""
4038
global get_agent
4139

42-
# Get environment variables
4340
base_url = getenv("BASE_URL")
4441
model_id = getenv("MODEL_ID")
4542

46-
# Ensure base_url ends with /v1 if provided
4743
if base_url and not base_url.endswith("/v1"):
4844
base_url = base_url.rstrip("/") + "/v1"
4945

50-
# Get workflow closure (returns a callable that returns an agent)
5146
get_agent = get_workflow_closure(model_id=model_id, base_url=base_url)
5247

5348
yield
5449

55-
# Cleanup on shutdown (if needed)
5650
get_agent = None
5751

5852

@@ -83,7 +77,7 @@ def _get_message_content(msg) -> str:
8377

8478

8579
def _message_to_response_dict(msg):
86-
"""Map a LlamaIndex ChatMessage to the same format as LangGraph (role, content, tool_calls, etc.)."""
80+
"""Map a LlamaIndex ChatMessage to OpenAI-compatible format."""
8781
role = getattr(msg, "role", "user")
8882
content = _get_message_content(msg)
8983

@@ -154,98 +148,209 @@ def _message_to_response_dict(msg):
154148
return None # skip system or unknown
155149

156150

157-
@app.post("/chat")
158-
async def chat(request: ChatRequest):
159-
"""
160-
Chat endpoint that accepts a message and returns the agent's response.
151+
def _build_user_message(messages: list[ChatMessage]) -> str:
152+
"""Extract the last user message from the OpenAI-format messages list."""
153+
for msg in reversed(messages):
154+
if msg.role == "user":
155+
return msg.content
156+
raise ValueError("No user message found in messages list")
157+
158+
159+
def _make_completion_id() -> str:
160+
return f"chatcmpl-{uuid.uuid4().hex[:12]}"
161161

162-
Args:
163-
request: ChatRequest containing the user message
164162

165-
Returns:
166-
JSON response with full conversation history including tool calls
163+
@app.post("/chat/completions")
164+
async def chat_completions(request: ChatCompletionRequest):
165+
"""
166+
OpenAI-compatible chat completions endpoint.
167+
168+
When stream=false, returns a full chat.completion response.
169+
When stream=true, returns SSE chat.completion.chunk events.
167170
"""
168171
global get_agent
169172

170173
if get_agent is None:
171174
raise HTTPException(status_code=503, detail="Agent not initialized")
172175

176+
user_message = _build_user_message(request.messages)
177+
model_id = request.model or getenv("MODEL_ID", "model")
178+
179+
if request.stream:
180+
return await _handle_stream(user_message, model_id)
181+
else:
182+
return await _handle_chat(user_message, model_id)
183+
184+
185+
async def _handle_chat(user_message: str, model_id: str):
186+
"""Handle non-streaming chat completion."""
187+
global get_agent
188+
173189
try:
174190
agent = get_agent()
175-
messages = [{"role": "user", "content": request.message}]
191+
messages = [{"role": "user", "content": user_message}]
176192

177193
result = await agent.run(input=messages)
178194

179-
response_messages = []
195+
# Extract the final assistant message content
196+
assistant_content = ""
197+
context_messages = []
180198

181199
if result and "messages" in result and len(result["messages"]) > 0:
182200
for message in result["messages"]:
183201
if getattr(message, "role", None) == "system":
184202
continue
185203
item = _message_to_response_dict(message)
186204
if item is not None:
187-
response_messages.append(item)
205+
context_messages.append(item)
188206

189-
return {"messages": response_messages, "finish_reason": "stop"}
207+
# Final assistant content is the last assistant message with content
208+
for item in reversed(context_messages):
209+
if item["role"] == "assistant" and item.get("content"):
210+
assistant_content = item["content"]
211+
break
212+
213+
return {
214+
"id": _make_completion_id(),
215+
"object": "chat.completion",
216+
"created": int(time.time()),
217+
"model": model_id,
218+
"choices": [
219+
{
220+
"index": 0,
221+
"message": {
222+
"role": "assistant",
223+
"content": assistant_content,
224+
},
225+
"finish_reason": "stop",
226+
}
227+
],
228+
"context": context_messages,
229+
"usage": None,
230+
}
190231

191232
except Exception as e:
192233
raise HTTPException(
193234
status_code=500, detail=f"Error processing request: {str(e)}"
194235
)
195236

196237

197-
@app.post("/stream")
198-
async def stream(request: ChatRequest):
199-
"""
200-
Streaming chat endpoint that accepts a message and returns the agent's
201-
response as Server-Sent Events (SSE).
202-
203-
Event types:
204-
- tool_call: tool invocation by the agent
205-
- tool_result: result returned by a tool
206-
- token: final answer text
207-
- done: signals the stream is complete
208-
209-
Args:
210-
request: ChatRequest containing the user message
211-
"""
238+
async def _handle_stream(user_message: str, model_id: str):
239+
"""Handle streaming chat completion with OpenAI-compatible SSE chunks."""
212240
global get_agent
213241

214-
if get_agent is None:
215-
raise HTTPException(status_code=503, detail="Agent not initialized")
242+
completion_id = _make_completion_id()
243+
created = int(time.time())
216244

217245
async def event_generator():
218246
try:
219247
agent = get_agent()
220-
messages = [{"role": "user", "content": request.message}]
248+
messages = [{"role": "user", "content": user_message}]
221249

222250
handler = agent.run(input=messages)
223251

224252
async for event in handler.stream_events():
225253
if isinstance(event, ToolCallEvent):
226254
for tc in event.tool_calls:
227-
yield f"event: tool_call\ndata: {json.dumps({'name': tc.tool_name, 'args': tc.tool_kwargs})}\n\n"
255+
tool_calls_delta = [
256+
{
257+
"index": 0,
258+
"id": getattr(tc, "tool_id", ""),
259+
"type": "function",
260+
"function": {
261+
"name": tc.tool_name,
262+
"arguments": json.dumps(tc.tool_kwargs),
263+
},
264+
}
265+
]
266+
data = {
267+
"id": completion_id,
268+
"object": "chat.completion.chunk",
269+
"created": created,
270+
"model": model_id,
271+
"choices": [
272+
{
273+
"index": 0,
274+
"delta": {
275+
"role": "assistant",
276+
"tool_calls": tool_calls_delta,
277+
},
278+
"finish_reason": None,
279+
}
280+
],
281+
}
282+
yield f"data: {json.dumps(data)}\n\n"
228283

229284
elif isinstance(event, InputEvent):
230-
# Check if the last message is a tool result
231285
if event.input:
232286
last_msg = event.input[-1]
233287
if getattr(last_msg, "role", None) == "tool":
234288
additional = getattr(last_msg, "additional_kwargs", {}) or {}
235-
yield f"event: tool_result\ndata: {json.dumps({'name': additional.get('name', ''), 'output': _get_message_content(last_msg)})}\n\n"
289+
data = {
290+
"id": completion_id,
291+
"object": "chat.completion.chunk",
292+
"created": created,
293+
"model": model_id,
294+
"choices": [
295+
{
296+
"index": 0,
297+
"delta": {
298+
"role": "tool",
299+
"content": _get_message_content(last_msg),
300+
"name": additional.get("name", ""),
301+
},
302+
"finish_reason": None,
303+
}
304+
],
305+
}
306+
yield f"data: {json.dumps(data)}\n\n"
236307

237308
result = await handler
238309
# Extract final answer from the result
239310
if result and "response" in result:
240311
content = _get_message_content(result["response"].message)
241312
if content:
242-
yield f"event: token\ndata: {json.dumps({'content': content})}\n\n"
243-
244-
yield "event: done\ndata: {}\n\n"
313+
data = {
314+
"id": completion_id,
315+
"object": "chat.completion.chunk",
316+
"created": created,
317+
"model": model_id,
318+
"choices": [
319+
{
320+
"index": 0,
321+
"delta": {"content": content},
322+
"finish_reason": None,
323+
}
324+
],
325+
}
326+
yield f"data: {json.dumps(data)}\n\n"
327+
328+
# Send final chunk with finish_reason
329+
final_data = {
330+
"id": completion_id,
331+
"object": "chat.completion.chunk",
332+
"created": created,
333+
"model": model_id,
334+
"choices": [
335+
{
336+
"index": 0,
337+
"delta": {},
338+
"finish_reason": "stop",
339+
}
340+
],
341+
}
342+
yield f"data: {json.dumps(final_data)}\n\n"
343+
yield "data: [DONE]\n\n"
245344

246-
except Exception as e:
345+
except Exception:
247346
logger.exception("Error in stream event_generator")
248-
yield f"event: error\ndata: {json.dumps({'detail': 'Internal server error'})}\n\n"
347+
error_data = {
348+
"error": {
349+
"message": "Internal server error",
350+
"type": "server_error",
351+
}
352+
}
353+
yield f"data: {json.dumps(error_data)}\n\n"
249354

250355
return StreamingResponse(
251356
event_generator(),
@@ -264,4 +369,4 @@ async def health():
264369
import uvicorn
265370

266371
port = int(getenv("PORT", 8000))
267-
uvicorn.run(app, host="0.0.0.0", port=port)
372+
uvicorn.run(app, host="0.0.0.0", port=port)

0 commit comments

Comments
 (0)