11import asyncio
2+ import json
3+ import re
24from contextlib import asynccontextmanager
35from os import getenv
46
57from crewai import LLM
68from fastapi import FastAPI , HTTPException
9+ from fastapi .responses import StreamingResponse
710from pydantic import BaseModel
811
912from crewai_web_search .crew import AssistanceAgents
1013
1114
1215class ChatRequest (BaseModel ):
13- """Incoming chat request body for the /chat endpoint ."""
16+ """Incoming chat request body for the /chat and /stream endpoints ."""
1417
1518 message : str
1619
1720
18- class ChatResponse (BaseModel ):
19- """Structured chat response."""
21+ # Global LLM instance
22+ llm = None
23+
24+ # Patterns that indicate CrewAI internal scaffolding in the output
25+ _REACT_NOISE = re .compile (
26+ r"(^|\n)\s*(Thought:\s*|Action:\s*|Action Input:\s*|Observation:\s*|Final Answer:\s*).*" ,
27+ re .DOTALL ,
28+ )
29+ _CREWAI_PROMPT_MARKER = "\n \n \n You ONLY have access to"
2030
21- answer : str
22- steps : list [str ]
2331
32+ def _clean_content (text : str ) -> str :
33+ """Strip CrewAI internal ReAct scaffolding and prompt noise from output."""
34+ # Strip appended retry instructions
35+ idx = text .find (_CREWAI_PROMPT_MARKER )
36+ if idx != - 1 :
37+ text = text [:idx ]
2438
25- # Global LLM instance
26- llm = None
39+ # Strip ReAct format artifacts (Thought:/Action:/Final Answer: prefixes)
40+ text = _REACT_NOISE .sub ("" , text )
41+
42+ return text .strip ()
2743
2844
2945@asynccontextmanager
@@ -59,7 +75,7 @@ async def lifespan(app: FastAPI):
5975
6076@app .post ("/chat" )
6177async def chat (request : ChatRequest ):
62- """Chat endpoint that runs the CrewAI crew and returns the response ."""
78+ """Non-streaming chat endpoint. Returns the final answer ."""
6379 global llm
6480
6581 if llm is None :
@@ -71,33 +87,111 @@ async def chat(request: ChatRequest):
7187 "custom_instruction" : "" ,
7288 }
7389
74- intermediate_steps : list = []
75- crew = AssistanceAgents (
76- llm = llm , intermediate_steps = intermediate_steps
77- ).crew ()
78-
90+ crew = AssistanceAgents (llm = llm ).crew ()
7991 result = await asyncio .to_thread (crew .kickoff , inputs = inputs )
8092
81- steps = []
82- for step in intermediate_steps :
83- from crewai .agents .parser import AgentAction , AgentFinish
84- from crewai .tools .tool_types import ToolResult
85-
86- if isinstance (step , AgentAction ):
87- steps .append (f"[action] { step .result } " )
88- elif isinstance (step , AgentFinish ):
89- steps .append (f"[finish] { step .output } " )
90- elif isinstance (step , ToolResult ):
91- steps .append (f"[tool] { step .result } " )
93+ response_messages = [
94+ {"role" : "user" , "content" : request .message },
95+ {"role" : "assistant" , "content" : _clean_content (str (result ))},
96+ ]
9297
93- return ChatResponse ( answer = str ( result ), steps = steps )
98+ return { "messages" : response_messages , "finish_reason" : "stop" }
9499
95100 except Exception as e :
96101 raise HTTPException (
97102 status_code = 500 , detail = f"Error processing request: { str (e )} "
98103 )
99104
100105
106+ @app .post ("/stream" )
107+ async def stream (request : ChatRequest ):
108+ """Streaming chat endpoint using CrewAI's native token-level streaming.
109+
110+ Uses Crew(stream=True) with kickoff_async() which returns a
111+ CrewStreamingOutput that yields StreamChunk objects with real
112+ token-by-token content from the LLM.
113+ """
114+ global llm
115+
116+ if llm is None :
117+ raise HTTPException (status_code = 503 , detail = "Agent not initialized" )
118+
119+ async def _event_generator ():
120+ inputs = {
121+ "user_prompt" : request .message ,
122+ "custom_instruction" : "" ,
123+ }
124+
125+ crew = AssistanceAgents (llm = llm , stream = True ).crew ()
126+
127+ # kickoff_async with stream=True returns CrewStreamingOutput
128+ streaming_output = await crew .kickoff_async (inputs = inputs )
129+
130+ # Buffer tokens until we see "Final Answer:" — everything before
131+ # that is internal ReAct reasoning (Thought/Action/Observation).
132+ buffer = ""
133+ emitting = False
134+
135+ async for chunk in streaming_output :
136+ if chunk .chunk_type .value != "text" or not chunk .content :
137+ continue
138+
139+ if emitting :
140+ # Already past "Final Answer:", emit tokens directly
141+ sse_chunk = {
142+ "choices" : [{
143+ "index" : 0 ,
144+ "delta" : {"role" : "assistant" , "content" : chunk .content },
145+ "finish_reason" : None ,
146+ }]
147+ }
148+ yield f"data: { json .dumps (sse_chunk )} \n \n "
149+ else :
150+ buffer += chunk .content
151+ # Check if we've reached the final answer
152+ marker = "Final Answer:"
153+ idx = buffer .find (marker )
154+ if idx != - 1 :
155+ emitting = True
156+ # Emit any text after the marker that arrived in this chunk
157+ remainder = buffer [idx + len (marker ):]
158+ if remainder .strip ():
159+ sse_chunk = {
160+ "choices" : [{
161+ "index" : 0 ,
162+ "delta" : {"role" : "assistant" , "content" : remainder .lstrip ()},
163+ "finish_reason" : None ,
164+ }]
165+ }
166+ yield f"data: { json .dumps (sse_chunk )} \n \n "
167+
168+ # If no "Final Answer:" was found, send the cleaned full buffer
169+ if not emitting and buffer .strip ():
170+ cleaned = _clean_content (buffer )
171+ if cleaned :
172+ sse_chunk = {
173+ "choices" : [{
174+ "index" : 0 ,
175+ "delta" : {"role" : "assistant" , "content" : cleaned },
176+ "finish_reason" : None ,
177+ }]
178+ }
179+ yield f"data: { json .dumps (sse_chunk )} \n \n "
180+
181+ # Send final stop event
182+ final_chunk = {
183+ "choices" : [{
184+ "index" : 0 ,
185+ "delta" : {},
186+ "finish_reason" : "stop" ,
187+ }]
188+ }
189+ yield f"data: { json .dumps (final_chunk )} \n \n "
190+ yield "data: [DONE]\n \n "
191+
192+ return StreamingResponse (_event_generator (), media_type = "text/event-stream" )
193+
194+
101195@app .get ("/health" )
102196async def health ():
103197 """Return service health status."""
0 commit comments