Skip to content

Commit 0fcea05

Browse files
author
Im An AI
authored
Merge pull request #757 from ruska-ai/fix/sse-done-signal
fix(stream): send [DONE] terminator for sync SSE streams
2 parents a5e0266 + ba3940d commit 0fcea05

1 file changed

Lines changed: 32 additions & 25 deletions

File tree

backend/src/utils/stream.py

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ async def stream_generator(
206206
)
207207
files_map = {**memory_files, **files_map}
208208
async with get_checkpoint_db() as checkpointer:
209+
agent = None
209210
try:
210211
ctx = ContextSchema(
211212
model=model,
@@ -293,31 +294,37 @@ async def stream_generator(
293294
error_msg = ujson.dumps(("error", str(e)))
294295
yield f"data: {error_msg}\n\n"
295296
finally:
296-
if service_context.user_id and checkpointer:
297-
final_state = await agent.graph.aget_state(config)
298-
configurable = {
299-
**final_state.config.get("configurable", {}),
300-
**config["configurable"],
301-
}
302-
messages = final_state.values.get("messages", [])
303-
304-
# Update the store with the final messages and files
305-
service_context.store.fields = ["messages", "files"]
306-
await service_context.thread_service.update(
307-
thread_id=configurable.get("thread_id"),
308-
data={
309-
"thread_id": configurable.get("thread_id"),
310-
"checkpoint_id": configurable.get("checkpoint_id"),
311-
"assistant_id": configurable.get("assistant_id"),
312-
"project_id": configurable.get("project_id"),
313-
"messages": messages,
314-
"todos": todos_list,
315-
"files": files_map,
316-
"updated_at": get_time(),
317-
},
318-
)
319-
# Log the update for debugging
320-
logger.info(f"checkpoint: {ujson.dumps(configurable)}")
297+
try:
298+
if service_context.user_id and checkpointer and agent:
299+
final_state = await agent.graph.aget_state(config)
300+
configurable = {
301+
**final_state.config.get("configurable", {}),
302+
**config["configurable"],
303+
}
304+
messages = final_state.values.get("messages", [])
305+
306+
# Update the store with the final messages and files
307+
service_context.store.fields = ["messages", "files"]
308+
await service_context.thread_service.update(
309+
thread_id=configurable.get("thread_id"),
310+
data={
311+
"thread_id": configurable.get("thread_id"),
312+
"checkpoint_id": configurable.get("checkpoint_id"),
313+
"assistant_id": configurable.get("assistant_id"),
314+
"project_id": configurable.get("project_id"),
315+
"messages": messages,
316+
"todos": todos_list,
317+
"files": files_map,
318+
"updated_at": get_time(),
319+
},
320+
)
321+
# Log the update for debugging
322+
logger.info(f"checkpoint: {ujson.dumps(configurable)}")
323+
except Exception as e:
324+
logger.exception("Failed to persist final checkpoint state: %s", e)
325+
326+
# Ensure frontend gets an explicit terminal signal to clear loading state.
327+
yield "data: [DONE]\n\n"
321328

322329

323330
###########################################################################

0 commit comments

Comments
 (0)