Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e300873
feat: Add WebSocket API for streaming responses
James-4u Dec 3, 2025
eaa38d6
fix the CLI issue
James-4u Dec 3, 2025
c6a7c4a
Remove README.md
James-4u Dec 3, 2025
70200e4
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 3, 2025
82d621c
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 4, 2025
9ce780f
refactor: Move WebSocket API to SDK pattern following session.py
James-4u Dec 5, 2025
081f7f7
refactor: Move WebSocket to SDK pattern with /ws/ prefix - Moved to aโ€ฆ
James-4u Dec 5, 2025
710e009
Updated date
James-4u Dec 5, 2025
7c2c6f5
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 8, 2025
22ba48e
fix: Remove f-string prefixes from logging statements without placehoโ€ฆ
James-4u Dec 8, 2025
e03df5b
Merge branch 'feature/websocket-streaming-api' of github.com-james:Smโ€ฆ
James-4u Dec 8, 2025
5ee639f
Fix the test issue
James-4u Dec 8, 2025
1e10287
Fix ImportError about completion
James-4u Dec 9, 2025
327a933
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 9, 2025
b192fb2
Fix some issue on review
James-4u Dec 9, 2025
710b5ad
Added libs for unitest
James-4u Dec 9, 2025
b33d050
Moved websocket_api.md to reference
James-4u Dec 9, 2025
1315abf
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 9, 2025
1fcdf2e
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 10, 2025
6e5dbbe
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 11, 2025
b8db496
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 16, 2025
8e9678b
Adjusted by WebSocket_Refactoring_Summary_EN.md
Dec 16, 2025
84e02f4
Fixed review issue fron JinHai
James-4u Dec 17, 2025
a22d765
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 18, 2025
a566711
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 18, 2025
f6e0e97
Added uv.lock
James-4u Dec 19, 2025
f036070
Merge branch 'feature/websocket-streaming-api' of github.com-james:Smโ€ฆ
James-4u Dec 19, 2025
c279395
Updated demo for test and fixed some issue
James-4u Dec 22, 2025
0386c04
Fixed ruff issue
James-4u Dec 22, 2025
d404a8b
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 22, 2025
17b8bb6
Feat: message manage (#12083)
Lynn-Inf Dec 23, 2025
3787a5b
Merge branch 'main' into feature/websocket-streaming-api
James-4u Dec 23, 2025
03f0336
Merge branch 'feature/websocket-streaming-api' of github.com-james:Smโ€ฆ
James-4u Dec 24, 2025
118310d
Revert rag/nlp/rag_tokenizer.py to match main branch
James-4u Dec 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ jobs:
sleep 5
done
source .venv/bin/activate && pytest -s --tb=short sdk/python/test/test_frontend_api/get_email.py sdk/python/test/test_frontend_api/test_dataset.py 2>&1 | tee es_api_test.log

- name: Run http api tests against Elasticsearch
run: |
export http_proxy=""; export https_proxy=""; export no_proxy=""; export HTTP_PROXY=""; export HTTPS_PROXY=""; export NO_PROXY=""
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ COPY pyproject.toml uv.lock ./
COPY mcp mcp
COPY plugin plugin
COPY common common
COPY memory memory

COPY docker/service_conf.yaml.template ./conf/service_conf.yaml.template
COPY docker/entrypoint.sh ./
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Try our demo at [https://demo.ragflow.io](https://demo.ragflow.io).

## ๐Ÿ”ฅ Latest Updates

- 2025-12-03 Adds WebSocket API for streaming responses, enabling real-time communication for WeChat Mini Programs and other WebSocket clients.
Comment thread
JinHai-CN marked this conversation as resolved.
- 2025-11-19 Supports Gemini 3 Pro.
- 2025-11-12 Supports data synchronization from Confluence, S3, Notion, Discord, Google Drive.
- 2025-10-23 Supports MinerU & Docling as document parsing methods.
Expand Down Expand Up @@ -132,6 +133,7 @@ releases! ๐ŸŒŸ
- Configurable LLMs as well as embedding models.
- Multiple recall paired with fused re-ranking.
- Intuitive APIs for seamless integration with business.
- WebSocket support for real-time streaming responses (ideal for WeChat Mini Programs and mobile apps).
Comment thread
JinHai-CN marked this conversation as resolved.

## ๐Ÿ”Ž System Architecture

Expand Down
5 changes: 5 additions & 0 deletions admin/server/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
admin_bp = Blueprint('admin', __name__, url_prefix='/api/v1/admin')


@admin_bp.route('/ping', methods=['GET'])
def ping():
return success_response('PONG')


@admin_bp.route('/login', methods=['POST'])
def login():
if not request.json:
Expand Down
90 changes: 68 additions & 22 deletions agent/tools/retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
from common.metadata_utils import apply_meta_data_filter
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.llm_service import LLMBundle
from api.db.services.memory_service import MemoryService
from api.db.joint_services.memory_message_service import query_message
from common import settings
from common.connection_utils import timeout
from rag.app.tag import label_question
from rag.prompts.generator import cross_languages, kb_prompt
from rag.prompts.generator import cross_languages, kb_prompt, memory_prompt


class RetrievalParam(ToolParamBase):
Expand Down Expand Up @@ -57,6 +59,7 @@ def __init__(self):
self.top_n = 8
self.top_k = 1024
self.kb_ids = []
self.memory_ids = []
self.kb_vars = []
self.rerank_id = ""
self.empty_response = ""
Expand All @@ -81,15 +84,7 @@ def get_input_form(self) -> dict[str, dict]:
class Retrieval(ToolBase, ABC):
component_name = "Retrieval"

@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 12)))
async def _invoke_async(self, **kwargs):
if self.check_if_canceled("Retrieval processing"):
return

if not kwargs.get("query"):
self.set_output("formalized_content", self._param.empty_response)
return

async def _retrieve_kb(self, query_text: str):
kb_ids: list[str] = []
for id in self._param.kb_ids:
if id.find("@") < 0:
Expand Down Expand Up @@ -124,12 +119,12 @@ async def _invoke_async(self, **kwargs):
if self._param.rerank_id:
rerank_mdl = LLMBundle(kbs[0].tenant_id, LLMType.RERANK, self._param.rerank_id)

vars = self.get_input_elements_from_text(kwargs["query"])
vars = {k:o["value"] for k,o in vars.items()}
query = self.string_format(kwargs["query"], vars)
vars = self.get_input_elements_from_text(query_text)
vars = {k: o["value"] for k, o in vars.items()}
query = self.string_format(query_text, vars)

doc_ids=[]
if self._param.meta_data_filter!={}:
doc_ids = []
if self._param.meta_data_filter != {}:
metas = DocumentService.get_meta_by_kbs(kb_ids)

def _resolve_manual_filter(flt: dict) -> dict:
Expand Down Expand Up @@ -198,18 +193,20 @@ def _resolve_manual_filter(flt: dict) -> dict:

if self._param.toc_enhance:
chat_mdl = LLMBundle(self._canvas._tenant_id, LLMType.CHAT)
cks = settings.retriever.retrieval_by_toc(query, kbinfos["chunks"], [kb.tenant_id for kb in kbs], chat_mdl, self._param.top_n)
cks = settings.retriever.retrieval_by_toc(query, kbinfos["chunks"], [kb.tenant_id for kb in kbs],
chat_mdl, self._param.top_n)
if self.check_if_canceled("Retrieval processing"):
return
if cks:
kbinfos["chunks"] = cks
kbinfos["chunks"] = settings.retriever.retrieval_by_children(kbinfos["chunks"], [kb.tenant_id for kb in kbs])
kbinfos["chunks"] = settings.retriever.retrieval_by_children(kbinfos["chunks"],
[kb.tenant_id for kb in kbs])
if self._param.use_kg:
ck = settings.kg_retriever.retrieval(query,
[kb.tenant_id for kb in kbs],
kb_ids,
embd_mdl,
LLMBundle(self._canvas.get_tenant_id(), LLMType.CHAT))
[kb.tenant_id for kb in kbs],
kb_ids,
embd_mdl,
LLMBundle(self._canvas.get_tenant_id(), LLMType.CHAT))
if self.check_if_canceled("Retrieval processing"):
return
if ck["content_with_weight"]:
Expand All @@ -218,7 +215,8 @@ def _resolve_manual_filter(flt: dict) -> dict:
kbinfos = {"chunks": [], "doc_aggs": []}

if self._param.use_kg and kbs:
ck = settings.kg_retriever.retrieval(query, [kb.tenant_id for kb in kbs], filtered_kb_ids, embd_mdl, LLMBundle(kbs[0].tenant_id, LLMType.CHAT))
ck = settings.kg_retriever.retrieval(query, [kb.tenant_id for kb in kbs], filtered_kb_ids, embd_mdl,
LLMBundle(kbs[0].tenant_id, LLMType.CHAT))
if self.check_if_canceled("Retrieval processing"):
return
if ck["content_with_weight"]:
Expand Down Expand Up @@ -248,6 +246,54 @@ def _resolve_manual_filter(flt: dict) -> dict:

return form_cnt

async def _retrieve_memory(self, query_text: str):
memory_ids: list[str] = [memory_id for memory_id in self._param.memory_ids]
memory_list = MemoryService.get_by_ids(memory_ids)
if not memory_list:
raise Exception("No memory is selected.")

embd_names = list({memory.embd_id for memory in memory_list})
assert len(embd_names) == 1, "Memory use different embedding models."

vars = self.get_input_elements_from_text(query_text)
vars = {k: o["value"] for k, o in vars.items()}
query = self.string_format(query_text, vars)
# query message
message_list = query_message({"memory_id": memory_ids}, {
"query": query,
"similarity_threshold": self._param.similarity_threshold,
"keywords_similarity_weight": self._param.keywords_similarity_weight,
"top_n": self._param.top_n
})
print(f"found {len(message_list)} messages.")

if not message_list:
self.set_output("formalized_content", self._param.empty_response)
return
formated_content = "\n".join(memory_prompt(message_list, 200000))

# set formalized_content output
self.set_output("formalized_content", formated_content)
print(f"formated_content {formated_content}")
return formated_content

@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 12)))
async def _invoke_async(self, **kwargs):
if self.check_if_canceled("Retrieval processing"):
return
print(f"debug retrieval, query is {kwargs.get('query')}.", flush=True)
if not kwargs.get("query"):
self.set_output("formalized_content", self._param.empty_response)
return

if self._param.kb_ids:
return await self._retrieve_kb(kwargs["query"])
elif self._param.memory_ids:
return await self._retrieve_memory(kwargs["query"])
else:
self.set_output("formalized_content", self._param.empty_response)
return

@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 12)))
def _invoke(self, **kwargs):
return asyncio.run(self._invoke_async(**kwargs))
Expand Down
2 changes: 1 addition & 1 deletion api/apps/canvas_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ async def rerun():
if 0 < doc["progress"] < 1:
return get_data_error_result(message=f"`{doc['name']}` is processing...")

if settings.docStoreConn.indexExist(search.index_name(current_user.id), doc["kb_id"]):
if settings.docStoreConn.index_exist(search.index_name(current_user.id), doc["kb_id"]):
settings.docStoreConn.delete({"doc_id": doc["id"]}, search.index_name(current_user.id), doc["kb_id"])
doc["progress_msg"] = ""
doc["chunk_num"] = 0
Expand Down
6 changes: 3 additions & 3 deletions api/apps/document_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def _run_sync():
DocumentService.update_by_id(id, info)
if req.get("delete", False):
TaskService.filter_delete([Task.doc_id == id])
if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id)

if str(req["run"]) == TaskStatus.RUNNING.value:
Expand Down Expand Up @@ -615,7 +615,7 @@ def _rename_sync():
"title_tks": title_tks,
"title_sm_tks": rag_tokenizer.fine_grained_tokenize(title_tks),
}
if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.update(
{"doc_id": req["doc_id"]},
es_body,
Expand Down Expand Up @@ -696,7 +696,7 @@ def reset_doc():
tenant_id = DocumentService.get_tenant_id(req["doc_id"])
if not tenant_id:
return get_data_error_result(message="Tenant not found!")
if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):
if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
return None

Expand Down
22 changes: 11 additions & 11 deletions api/apps/kb_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
from rag.nlp import search
from api.constants import DATASET_NAME_LIMIT
from rag.utils.redis_conn import REDIS_CONN
from rag.utils.doc_store_conn import OrderByExpr
from common.constants import RetCode, PipelineTaskType, StatusEnum, VALID_TASK_STATUS, FileSource, LLMType, PAGERANK_FLD
from common import settings
from common.doc_store.doc_store_base import OrderByExpr
from api.apps import login_required, current_user


Expand Down Expand Up @@ -285,7 +285,7 @@ def _rm_sync():
message="Database error (Knowledgebase removal)!")
for kb in kbs:
settings.docStoreConn.delete({"kb_id": kb.id}, search.index_name(kb.tenant_id), kb.id)
settings.docStoreConn.deleteIdx(search.index_name(kb.tenant_id), kb.id)
settings.docStoreConn.delete_idx(search.index_name(kb.tenant_id), kb.id)
if hasattr(settings.STORAGE_IMPL, 'remove_bucket'):
settings.STORAGE_IMPL.remove_bucket(kb.id)
return get_json_result(data=True)
Expand Down Expand Up @@ -386,7 +386,7 @@ def knowledge_graph(kb_id):
}

obj = {"graph": {}, "mind_map": {}}
if not settings.docStoreConn.indexExist(search.index_name(kb.tenant_id), kb_id):
if not settings.docStoreConn.index_exist(search.index_name(kb.tenant_id), kb_id):
return get_json_result(data=obj)
sres = settings.retriever.search(req, search.index_name(kb.tenant_id), [kb_id])
if not len(sres.ids):
Expand Down Expand Up @@ -858,11 +858,11 @@ def sample_random_chunks_with_vectors(
index_nm = search.index_name(tenant_id)

res0 = docStoreConn.search(
selectFields=[], highlightFields=[],
select_fields=[], highlight_fields=[],
condition={"kb_id": kb_id, "available_int": 1},
matchExprs=[], orderBy=OrderByExpr(),
match_expressions=[], order_by=OrderByExpr(),
offset=0, limit=1,
indexNames=index_nm, knowledgebaseIds=[kb_id]
index_names=index_nm, knowledgebase_ids=[kb_id]
)
total = docStoreConn.get_total(res0)
if total <= 0:
Expand All @@ -874,14 +874,14 @@ def sample_random_chunks_with_vectors(

for off in offsets:
res1 = docStoreConn.search(
selectFields=list(base_fields),
highlightFields=[],
select_fields=list(base_fields),
highlight_fields=[],
condition={"kb_id": kb_id, "available_int": 1},
matchExprs=[], orderBy=OrderByExpr(),
match_expressions=[], order_by=OrderByExpr(),
offset=off, limit=1,
indexNames=index_nm, knowledgebaseIds=[kb_id]
index_names=index_nm, knowledgebase_ids=[kb_id]
)
ids = docStoreConn.get_chunk_ids(res1)
ids = docStoreConn.get_doc_ids(res1)
if not ids:
continue

Expand Down
33 changes: 29 additions & 4 deletions api/apps/memories_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
from api.db import TenantPermission
from api.db.services.memory_service import MemoryService
from api.db.services.user_service import UserTenantService
from api.db.services.canvas_service import UserCanvasService
from api.utils.api_utils import validate_request, get_request_json, get_error_argument_result, get_json_result, \
not_allowed_parameters
from api.utils.memory_utils import format_ret_data_from_memory, get_memory_type_human
from api.constants import MEMORY_NAME_LIMIT, MEMORY_SIZE_LIMIT
from memory.services.messages import MessageService
from common.constants import MemoryType, RetCode, ForgettingPolicy


Expand Down Expand Up @@ -57,7 +59,6 @@ async def create_memory():

if res:
return get_json_result(message=True, data=format_ret_data_from_memory(memory))

else:
return get_json_result(message=memory, code=RetCode.SERVER_ERROR)

Expand Down Expand Up @@ -124,7 +125,7 @@ async def update_memory(memory_id):
return get_json_result(message=True, data=memory_dict)

try:
MemoryService.update_memory(memory_id, to_update)
MemoryService.update_memory(current_memory.tenant_id, memory_id, to_update)
updated_memory = MemoryService.get_by_memory_id(memory_id)
return get_json_result(message=True, data=format_ret_data_from_memory(updated_memory))

Expand All @@ -133,21 +134,22 @@ async def update_memory(memory_id):
return get_json_result(message=str(e), code=RetCode.SERVER_ERROR)


@manager.route("/<memory_id>", methods=["DELETE"]) # noqa: F821
@manager.route("/<memory_id>", methods=["DELETE"]) # noqa: F821
@login_required
async def delete_memory(memory_id):
memory = MemoryService.get_by_memory_id(memory_id)
if not memory:
return get_json_result(message=True, code=RetCode.NOT_FOUND)
try:
MemoryService.delete_memory(memory_id)
MessageService.delete_message({"memory_id": memory_id}, memory.tenant_id, memory_id)
return get_json_result(message=True)
except Exception as e:
logging.error(e)
return get_json_result(message=str(e), code=RetCode.SERVER_ERROR)


@manager.route("", methods=["GET"]) # noqa: F821
@manager.route("", methods=["GET"]) # noqa: F821
@login_required
async def list_memory():
args = request.args
Expand Down Expand Up @@ -183,3 +185,26 @@ async def get_memory_config(memory_id):
if not memory:
return get_json_result(code=RetCode.NOT_FOUND, message=f"Memory '{memory_id}' not found.")
return get_json_result(message=True, data=format_ret_data_from_memory(memory))


@manager.route("/<memory_id>", methods=["GET"]) # noqa: F821
@login_required
async def get_memory_detail(memory_id):
args = request.args
agent_ids = args.getlist("agent_id")
keywords = args.get("keywords", "")
keywords = keywords.strip()
page = int(args.get("page", 1))
page_size = int(args.get("page_size", 50))
memory = MemoryService.get_by_memory_id(memory_id)
if not memory:
return get_json_result(code=RetCode.NOT_FOUND, message=f"Memory '{memory_id}' not found.")
messages = MessageService.list_message(
memory.tenant_id, memory_id, agent_ids, keywords, page, page_size)
agent_name_mapping = {}
if messages["message_list"]:
agent_list = UserCanvasService.get_basic_info_by_canvas_ids([message["agent_id"] for message in messages["message_list"]])
agent_name_mapping = {agent["id"]: agent["title"] for agent in agent_list}
for message in messages["message_list"]:
message["agent_name"] = agent_name_mapping.get(message["agent_id"], "Unknown")
return get_json_result(data={"messages": messages, "storage_type": memory.storage_type}, message=True)
Loading
Loading