Skip to content

Commit 55ad1b5

Browse files
committed
/chat/completions
for CrewAI agent
1 parent 947b15c commit 55ad1b5

2 files changed

Lines changed: 190 additions & 107 deletions

File tree

agents/base/crewai_websearch_agent/README.md

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,32 +172,31 @@ Get the route URL:
172172
oc get route crewai-websearch-agent -o jsonpath='{.spec.host}'
173173
```
174174

175-
/chat response:
175+
Send a test request:
176+
177+
Non-streaming
176178

177179
```bash
178-
curl -X POST https://<YOUR_ROUTE_URL>/chat \
180+
curl -X POST https://crewai-websearch-agent-tguzik-agents.apps.rosa.ai-eng-gpu.socc.p3.openshiftapps.com/chat/completions \
179181
-H "Content-Type: application/json" \
180-
-d '{"message": "What is the best cluster hosting service?"}'
182+
-d '{"messages": [{"role": "user", "content": "What is the best cluster hosting service?"}], "stream": false}'
181183
```
182184

183-
/stream response:
185+
Streaming
184186

185187
```bash
186-
curl -sN -X POST https://<YOUR_ROUTE_URL>/stream \
188+
curl -sN -X POST https://<YOUR_ROUTE_URL>/chat/completions \
187189
-H "Content-Type: application/json" \
188-
-d '{"message": "What is the best cluster hosting service?"}'
190+
-d '{"messages": [{"role": "user", "content": "What is the best cluster hosting service?"}], "stream": true}'
189191
```
190192

191-
Pretty /stream response
193+
Pretty Printed Stream
192194

193195
```bash
194-
curl -sN -X POST https://<YOUR_ROUTE_URL>/stream \
196+
curl -sN -X POST https://<YOUR_ROUTE_URL>/chat/completions \
195197
-H "Content-Type: application/json" \
196-
-d '{"message": "What is the best cluster hosting service??"}' |
197-
grep --line-buffered '^data: ' |
198-
sed -u 's/^data: //' |
199-
grep -v '^\[DONE\]$' |
200-
jq -r -j '.choices[0].delta.content // empty'
198+
-d '{"messages": [{"role": "user", "content": "What is the best cluster hosting service?"}], "stream": true}' |
199+
jq -R -r -j --stream 'scan("^data:(.*)")[] | fromjson.choices[0].delta.content // empty'
201200
```
202201

203202
---
Lines changed: 178 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import asyncio
22
import json
3+
import logging
34
import re
5+
import time
6+
import uuid
47
from contextlib import asynccontextmanager
58
from os import getenv
69

@@ -11,11 +14,21 @@
1114

1215
from crewai_web_search.crew import AssistanceAgents
1316

17+
logger = logging.getLogger(__name__)
1418

15-
class ChatRequest(BaseModel):
16-
"""Incoming chat request body for the /chat and /stream endpoints."""
1719

18-
message: str
20+
# OpenAI-compatible request/response models
21+
class ChatMessage(BaseModel):
22+
role: str
23+
content: str
24+
25+
26+
class ChatCompletionRequest(BaseModel):
27+
"""OpenAI-compatible chat completion request."""
28+
29+
messages: list[ChatMessage]
30+
model: str | None = None
31+
stream: bool = False
1932

2033

2134
# Global LLM instance
@@ -42,6 +55,18 @@ def _clean_content(text: str) -> str:
4255
return text.strip()
4356

4457

58+
def _build_user_message(messages: list[ChatMessage]) -> str:
59+
"""Extract the last user message from the OpenAI-format messages list."""
60+
for msg in reversed(messages):
61+
if msg.role == "user":
62+
return msg.content
63+
raise ValueError("No user message found in messages list")
64+
65+
66+
def _make_completion_id() -> str:
67+
return f"chatcmpl-{uuid.uuid4().hex[:12]}"
68+
69+
4570
@asynccontextmanager
4671
async def lifespan(app: FastAPI):
4772
"""Initialize the CrewAI LLM on startup."""
@@ -73,123 +98,182 @@ async def lifespan(app: FastAPI):
7398
)
7499

75100

76-
@app.post("/chat")
77-
async def chat(request: ChatRequest):
78-
"""Non-streaming chat endpoint. Returns the final answer."""
101+
@app.post("/chat/completions")
102+
async def chat_completions(request: ChatCompletionRequest):
103+
"""
104+
OpenAI-compatible chat completions endpoint.
105+
106+
When stream=false, returns a full chat.completion response.
107+
When stream=true, returns SSE chat.completion.chunk events.
108+
"""
79109
global llm
80110

81111
if llm is None:
82112
raise HTTPException(status_code=503, detail="Agent not initialized")
83113

114+
user_message = _build_user_message(request.messages)
115+
model_id = request.model or getenv("MODEL_ID", "model")
116+
117+
if request.stream:
118+
return await _handle_stream(user_message, model_id)
119+
else:
120+
return await _handle_chat(user_message, model_id)
121+
122+
123+
async def _handle_chat(user_message: str, model_id: str):
124+
"""Handle non-streaming chat completion."""
125+
global llm
126+
84127
try:
85128
inputs = {
86-
"user_prompt": request.message,
129+
"user_prompt": user_message,
87130
"custom_instruction": "",
88131
}
89132

90133
crew = AssistanceAgents(llm=llm).crew()
91134
result = await asyncio.to_thread(crew.kickoff, inputs=inputs)
92135

93-
response_messages = [
94-
{"role": "user", "content": request.message},
95-
{"role": "assistant", "content": _clean_content(str(result))},
96-
]
97-
98-
return {"messages": response_messages, "finish_reason": "stop"}
136+
assistant_content = _clean_content(str(result))
137+
138+
return {
139+
"id": _make_completion_id(),
140+
"object": "chat.completion",
141+
"created": int(time.time()),
142+
"model": model_id,
143+
"choices": [
144+
{
145+
"index": 0,
146+
"message": {
147+
"role": "assistant",
148+
"content": assistant_content,
149+
},
150+
"finish_reason": "stop",
151+
}
152+
],
153+
"usage": None,
154+
}
99155

100156
except Exception as e:
101157
raise HTTPException(
102158
status_code=500, detail=f"Error processing request: {str(e)}"
103159
)
104160

105161

106-
@app.post("/stream")
107-
async def stream(request: ChatRequest):
108-
"""Streaming chat endpoint using CrewAI's native token-level streaming.
109-
110-
Uses Crew(stream=True) with kickoff_async() which returns a
111-
CrewStreamingOutput that yields StreamChunk objects with real
112-
token-by-token content from the LLM.
113-
"""
162+
async def _handle_stream(user_message: str, model_id: str):
163+
"""Handle streaming chat completion with OpenAI-compatible SSE chunks."""
114164
global llm
115165

116-
if llm is None:
117-
raise HTTPException(status_code=503, detail="Agent not initialized")
118-
119-
async def _event_generator():
120-
inputs = {
121-
"user_prompt": request.message,
122-
"custom_instruction": "",
123-
}
124-
125-
crew = AssistanceAgents(llm=llm, stream=True).crew()
126-
127-
# kickoff_async with stream=True returns CrewStreamingOutput
128-
streaming_output = await crew.kickoff_async(inputs=inputs)
129-
130-
# Buffer tokens until we see "Final Answer:" — everything before
131-
# that is internal ReAct reasoning (Thought/Action/Observation).
132-
buffer = ""
133-
emitting = False
134-
135-
async for chunk in streaming_output:
136-
if chunk.chunk_type.value != "text" or not chunk.content:
137-
continue
138-
139-
if emitting:
140-
# Already past "Final Answer:", emit tokens directly
141-
sse_chunk = {
142-
"choices": [{
143-
"index": 0,
144-
"delta": {"role": "assistant", "content": chunk.content},
145-
"finish_reason": None,
146-
}]
147-
}
148-
yield f"data: {json.dumps(sse_chunk)}\n\n"
149-
else:
150-
buffer += chunk.content
151-
# Check if we've reached the final answer
152-
marker = "Final Answer:"
153-
idx = buffer.find(marker)
154-
if idx != -1:
155-
emitting = True
156-
# Emit any text after the marker that arrived in this chunk
157-
remainder = buffer[idx + len(marker):]
158-
if remainder.strip():
159-
sse_chunk = {
160-
"choices": [{
166+
completion_id = _make_completion_id()
167+
created = int(time.time())
168+
169+
async def event_generator():
170+
try:
171+
inputs = {
172+
"user_prompt": user_message,
173+
"custom_instruction": "",
174+
}
175+
176+
crew = AssistanceAgents(llm=llm, stream=True).crew()
177+
streaming_output = await crew.kickoff_async(inputs=inputs)
178+
179+
# Buffer tokens until we see "Final Answer:" — everything before
180+
# that is internal ReAct reasoning (Thought/Action/Observation).
181+
buffer = ""
182+
emitting = False
183+
184+
async for chunk in streaming_output:
185+
if chunk.chunk_type.value != "text" or not chunk.content:
186+
continue
187+
188+
if emitting:
189+
data = {
190+
"id": completion_id,
191+
"object": "chat.completion.chunk",
192+
"created": created,
193+
"model": model_id,
194+
"choices": [
195+
{
161196
"index": 0,
162-
"delta": {"role": "assistant", "content": remainder.lstrip()},
197+
"delta": {"content": chunk.content},
163198
"finish_reason": None,
164-
}]
165-
}
166-
yield f"data: {json.dumps(sse_chunk)}\n\n"
167-
168-
# If no "Final Answer:" was found, send the cleaned full buffer
169-
if not emitting and buffer.strip():
170-
cleaned = _clean_content(buffer)
171-
if cleaned:
172-
sse_chunk = {
173-
"choices": [{
199+
}
200+
],
201+
}
202+
yield f"data: {json.dumps(data)}\n\n"
203+
else:
204+
buffer += chunk.content
205+
marker = "Final Answer:"
206+
idx = buffer.find(marker)
207+
if idx != -1:
208+
emitting = True
209+
remainder = buffer[idx + len(marker) :]
210+
if remainder.strip():
211+
data = {
212+
"id": completion_id,
213+
"object": "chat.completion.chunk",
214+
"created": created,
215+
"model": model_id,
216+
"choices": [
217+
{
218+
"index": 0,
219+
"delta": {"content": remainder.lstrip()},
220+
"finish_reason": None,
221+
}
222+
],
223+
}
224+
yield f"data: {json.dumps(data)}\n\n"
225+
226+
# If no "Final Answer:" was found, send the cleaned full buffer
227+
if not emitting and buffer.strip():
228+
cleaned = _clean_content(buffer)
229+
if cleaned:
230+
data = {
231+
"id": completion_id,
232+
"object": "chat.completion.chunk",
233+
"created": created,
234+
"model": model_id,
235+
"choices": [
236+
{
237+
"index": 0,
238+
"delta": {"content": cleaned},
239+
"finish_reason": None,
240+
}
241+
],
242+
}
243+
yield f"data: {json.dumps(data)}\n\n"
244+
245+
# Send final chunk with finish_reason
246+
final_data = {
247+
"id": completion_id,
248+
"object": "chat.completion.chunk",
249+
"created": created,
250+
"model": model_id,
251+
"choices": [
252+
{
174253
"index": 0,
175-
"delta": {"role": "assistant", "content": cleaned},
176-
"finish_reason": None,
177-
}]
254+
"delta": {},
255+
"finish_reason": "stop",
256+
}
257+
],
258+
}
259+
yield f"data: {json.dumps(final_data)}\n\n"
260+
yield "data: [DONE]\n\n"
261+
262+
except Exception:
263+
logger.exception("Error in stream event_generator")
264+
error_data = {
265+
"error": {
266+
"message": "Internal server error",
267+
"type": "server_error",
178268
}
179-
yield f"data: {json.dumps(sse_chunk)}\n\n"
180-
181-
# Send final stop event
182-
final_chunk = {
183-
"choices": [{
184-
"index": 0,
185-
"delta": {},
186-
"finish_reason": "stop",
187-
}]
188-
}
189-
yield f"data: {json.dumps(final_chunk)}\n\n"
190-
yield "data: [DONE]\n\n"
269+
}
270+
yield f"data: {json.dumps(error_data)}\n\n"
191271

192-
return StreamingResponse(_event_generator(), media_type="text/event-stream")
272+
return StreamingResponse(
273+
event_generator(),
274+
media_type="text/event-stream",
275+
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
276+
)
193277

194278

195279
@app.get("/health")
@@ -202,4 +286,4 @@ async def health():
202286
import uvicorn
203287

204288
port = int(getenv("PORT", 8000))
205-
uvicorn.run(app, host="0.0.0.0", port=port)
289+
uvicorn.run(app, host="0.0.0.0", port=port)

0 commit comments

Comments
 (0)