Skip to content
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e300873
feat: Add WebSocket API for streaming responses
James-4u Dec 3, 2025
eaa38d6
fix the CLI issue
James-4u Dec 3, 2025
c6a7c4a
Remove README.md
James-4u Dec 3, 2025
70200e4
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 3, 2025
82d621c
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 4, 2025
9ce780f
refactor: Move WebSocket API to SDK pattern following session.py
James-4u Dec 5, 2025
081f7f7
refactor: Move WebSocket to SDK pattern with /ws/ prefix - Moved to aโ€ฆ
James-4u Dec 5, 2025
710e009
Updated date
James-4u Dec 5, 2025
7c2c6f5
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 8, 2025
22ba48e
fix: Remove f-string prefixes from logging statements without placehoโ€ฆ
James-4u Dec 8, 2025
e03df5b
Merge branch 'feature/websocket-streaming-api' of github.com-james:Smโ€ฆ
James-4u Dec 8, 2025
5ee639f
Fix the test issue
James-4u Dec 8, 2025
1e10287
Fix ImportError about completion
James-4u Dec 9, 2025
327a933
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 9, 2025
b192fb2
Fix some issue on review
James-4u Dec 9, 2025
710b5ad
Added libs for unitest
James-4u Dec 9, 2025
b33d050
Moved websocket_api.md to reference
James-4u Dec 9, 2025
1315abf
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 9, 2025
1fcdf2e
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 10, 2025
6e5dbbe
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 11, 2025
b8db496
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 16, 2025
8e9678b
Adjusted by WebSocket_Refactoring_Summary_EN.md
Dec 16, 2025
84e02f4
Fixed review issue fron JinHai
James-4u Dec 17, 2025
a22d765
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 18, 2025
a566711
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 18, 2025
f6e0e97
Added uv.lock
James-4u Dec 19, 2025
f036070
Merge branch 'feature/websocket-streaming-api' of github.com-james:Smโ€ฆ
James-4u Dec 19, 2025
c279395
Updated demo for test and fixed some issue
James-4u Dec 22, 2025
0386c04
Fixed ruff issue
James-4u Dec 22, 2025
d404a8b
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 22, 2025
17b8bb6
Feat: message manage (#12083)
Lynn-Inf Dec 23, 2025
3787a5b
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 23, 2025
03f0336
Merge branch 'feature/websocket-streaming-api' of github.com-james:Smโ€ฆ
James-4u Dec 24, 2025
118310d
Revert rag/nlp/rag_tokenizer.py to match main branch
James-4u Dec 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Try our demo at [https://demo.ragflow.io](https://demo.ragflow.io).

## ๐Ÿ”ฅ Latest Updates

- 2025-12-03 Adds WebSocket API for streaming responses, enabling real-time communication for WeChat Mini Programs and other WebSocket clients.
Comment thread
JinHai-CN marked this conversation as resolved.
- 2025-11-19 Supports Gemini 3 Pro.
- 2025-11-12 Supports data synchronization from Confluence, S3, Notion, Discord, Google Drive.
- 2025-10-23 Supports MinerU & Docling as document parsing methods.
Expand Down Expand Up @@ -132,6 +133,7 @@ releases! ๐ŸŒŸ
- Configurable LLMs as well as embedding models.
- Multiple recall paired with fused re-ranking.
- Intuitive APIs for seamless integration with business.
- WebSocket support for real-time streaming responses (ideal for WeChat Mini Programs and mobile apps).
Comment thread
JinHai-CN marked this conversation as resolved.

## ๐Ÿ”Ž System Architecture

Expand Down
250 changes: 250 additions & 0 deletions api/apps/sdk/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
WebSocket SDK API for RAGFlow Streaming Responses

This module provides WebSocket endpoints following the SDK API pattern,
mirroring the structure of session.py for consistency.
"""

import logging
import json
from quart import websocket

from api.db.services.dialog_service import DialogService
from api.db.services.canvas_service import UserCanvasService
from api.db.services.conversation_service import async_completion as rag_completion
from api.db.services.canvas_service import completion as agent_completion
from api.utils.api_utils import ws_token_required
from common.constants import StatusEnum


async def send_ws_error(error_message, code=500):
"""Send error message to WebSocket client."""
error_response = {
"code": code,
"message": error_message,
"data": {
"answer": f"**ERROR**: {error_message}",
"reference": []
}
}
await websocket.send(json.dumps(error_response, ensure_ascii=False))


async def send_ws_message(data, code=0, message=""):
"""Send message to WebSocket client."""
response = {
"code": code,
"message": message,
"data": data
}
await websocket.send(json.dumps(response, ensure_ascii=False))


@manager.websocket("/ws/chats/<chat_id>/completions") # noqa: F821
@ws_token_required
async def chat_completions_ws(tenant_id, chat_id):
"""
WebSocket endpoint for streaming chat completions.
Follows the same pattern as the HTTP POST /chats/<chat_id>/completions endpoint.
Uses /ws/ prefix to avoid routing conflicts with HTTP endpoints.
"""
# Verify chat ownership
if not DialogService.query(tenant_id=tenant_id, id=chat_id, status=StatusEnum.VALID.value):
await send_ws_error(f"You don't own the chat {chat_id}", code=404)
await websocket.close(1008)
return

logging.info(f"WebSocket chat connection established for chat_id: {chat_id}, tenant: {tenant_id}")

try:
while True:
message = await websocket.receive()

try:
req = json.loads(message)
except json.JSONDecodeError as e:
await send_ws_error(f"Invalid JSON format: {str(e)}", code=400)
continue

question = req.get("question", "")
session_id = req.get("session_id")
stream = req.get("stream", True)

if question is None:
await send_ws_error("Missing required parameter: question", code=400)
continue

try:
if stream:
async for response_chunk in rag_completion(
tenant_id=tenant_id,
chat_id=chat_id,
question=question,
session_id=session_id,
stream=True,
**{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]}
):
if response_chunk.startswith("data:"):
json_str = response_chunk[5:].strip()
try:
response_data = json.loads(json_str)
await websocket.send(json.dumps(response_data, ensure_ascii=False))
except json.JSONDecodeError:
continue

logging.info(f"Chat completion streamed successfully for chat_id: {chat_id}")
else:
response = None
async for resp in rag_completion(
tenant_id=tenant_id,
chat_id=chat_id,
question=question,
session_id=session_id,
stream=False,
**{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]}
):
response = resp
break

if response:
await send_ws_message(response)
else:
await send_ws_error("No response generated", code=500)

except Exception as e:
logging.exception(f"Error during chat completion: {str(e)}")
await send_ws_error(str(e))

except Exception as e:
logging.exception(f"WebSocket error: {str(e)}")
try:
await send_ws_error(str(e))
except Exception:
pass
await websocket.close(1011)

finally:
logging.info(f"WebSocket chat connection closed for chat_id: {chat_id}")


@manager.websocket("/ws/agents/<agent_id>/completions") # noqa: F821
@ws_token_required
async def agent_completions_ws(tenant_id, agent_id):
"""
WebSocket endpoint for streaming agent completions.
Follows the same pattern as the HTTP POST /agents/<agent_id>/completions endpoint.
Uses /ws/ prefix to avoid routing conflicts with HTTP endpoints.
"""
# Verify agent ownership
if not UserCanvasService.query(user_id=tenant_id, id=agent_id):
await send_ws_error(f"You don't own the agent {agent_id}", code=404)
await websocket.close(1008)
return

logging.info(f"WebSocket agent connection established for agent_id: {agent_id}, tenant: {tenant_id}")

try:
while True:
message = await websocket.receive()

try:
req = json.loads(message)
except json.JSONDecodeError as e:
await send_ws_error(f"Invalid JSON format: {str(e)}", code=400)
continue

question = req.get("question", "")
session_id = req.get("session_id")
stream = req.get("stream", True)

if not question:
await send_ws_error("Missing required parameter: question", code=400)
continue

try:
if stream:
async for response_chunk in agent_completion(
tenant_id=tenant_id,
agent_id=agent_id,
question=question,
session_id=session_id,
stream=True,
**{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]}
):
if isinstance(response_chunk, str) and response_chunk.startswith("data:"):
json_str = response_chunk[5:].strip()
try:
response_data = json.loads(json_str)
if response_data.get("event") in ["message", "message_end"]:
await websocket.send(json.dumps({
"code": 0,
"message": "",
"data": response_data
}, ensure_ascii=False))
except json.JSONDecodeError:
continue

await send_ws_message(True)
logging.info(f"Agent completion streamed successfully for agent_id: {agent_id}")
else:
full_content = ""
reference = {}
final_ans = None

async for response_chunk in agent_completion(
tenant_id=tenant_id,
agent_id=agent_id,
question=question,
session_id=session_id,
stream=False,
**{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]}
):
if isinstance(response_chunk, str) and response_chunk.startswith("data:"):
try:
ans = json.loads(response_chunk[5:])
if ans["event"] == "message":
full_content += ans["data"]["content"]
if ans.get("data", {}).get("reference", None):
reference.update(ans["data"]["reference"])
final_ans = ans
except Exception as e:
await send_ws_error(str(e))
continue

if final_ans:
final_ans["data"]["content"] = full_content
final_ans["data"]["reference"] = reference
await send_ws_message(final_ans)
else:
await send_ws_error("No response generated", code=500)

except Exception as e:
logging.exception(f"Error during agent completion: {str(e)}")
await send_ws_error(str(e))

except Exception as e:
logging.exception(f"WebSocket error: {str(e)}")
try:
await send_ws_error(str(e))
except Exception:
pass
await websocket.close(1011)

finally:
logging.info(f"WebSocket agent connection closed for agent_id: {agent_id}")

90 changes: 88 additions & 2 deletions api/utils/api_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,6 @@
jsonify,
request
)

from peewee import OperationalError

from common.constants import ActiveEnum
Expand Down Expand Up @@ -283,6 +282,93 @@ async def adecorated_function(*args, **kwargs):
return decorated_function


def ws_token_required(func):
"""
WebSocket authentication decorator for SDK endpoints.
Follows the same pattern as token_required but for WebSocket connections.
"""
from quart import websocket
from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer
from api.db.services.user_service import UserService
from common.constants import StatusEnum

async def get_tenant_id_from_websocket(**kwargs):
"""Extract tenant_id from WebSocket authentication."""
# Method 1: Try API Token authentication from Authorization header
authorization = websocket.headers.get("Authorization", "")

if authorization:
try:
authorization_parts = authorization.split()
if len(authorization_parts) >= 2:
token = authorization_parts[1]
objs = APIToken.query(token=token)
if objs:
kwargs["tenant_id"] = objs[0].tenant_id
logging.info("WebSocket authenticated via API token")
return True, kwargs
except Exception as e:
logging.error(f"WebSocket API token auth error: {str(e)}")

# Method 2: Try User Session authentication (JWT)
try:
jwt = Serializer(secret_key=settings.SECRET_KEY)
auth_token = websocket.headers.get("Authorization") or \
websocket.args.get("authorization") or \
websocket.args.get("token")

if auth_token:
try:
if auth_token.startswith("Bearer "):
auth_token = auth_token[7:]
access_token = str(jwt.loads(auth_token))
if access_token and len(access_token.strip()) >= 32:
user = UserService.query(access_token=access_token, status=StatusEnum.VALID.value)
if user and user[0]:
kwargs["tenant_id"] = user[0].id
logging.info("WebSocket authenticated via user session")
return True, kwargs
except Exception:
pass
except Exception:
pass

# Method 3: Try query parameter authentication
token_param = websocket.args.get("token")
if token_param:
try:
objs = APIToken.query(token=token_param)
if objs:
kwargs["tenant_id"] = objs[0].tenant_id
logging.info("WebSocket authenticated via query parameter")
return True, kwargs
except Exception:
pass

return False, "Authentication required. Please provide valid API token or user session."

@wraps(func)
async def adecorated_function(*args, **kwargs):
"""Async wrapper for WebSocket endpoint."""
success, result = await get_tenant_id_from_websocket(**kwargs)

if not success:
# Authentication failed - send error and close connection
error_response = {
"code": RetCode.AUTHENTICATION_ERROR,
"message": result,
"data": {"answer": f"**ERROR**: {result}", "reference": []}
}
await websocket.send(json.dumps(error_response, ensure_ascii=False))
await websocket.close(1008, result) # 1008 = Policy Violation
return

# Authentication successful - call the actual handler
return await func(*args, **result)

return adecorated_function


def get_result(code=RetCode.SUCCESS, message="", data=None, total=None):
"""
Standard API response format:
Expand Down
6 changes: 6 additions & 0 deletions docker/nginx/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ http {
include /etc/nginx/mime.types;
default_type application/octet-stream;

# WebSocket support - map Upgrade header to Connection header
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
Expand Down
3 changes: 2 additions & 1 deletion docker/nginx/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_buffering off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
Expand Down
Loading