Skip to content

Commit 240c781

Browse files
committed
supports batch storage of streaming messages.
1 parent b22f680 commit 240c781

File tree

10 files changed

+137
-22
lines changed

10 files changed

+137
-22
lines changed

CHANGELOG_zh.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,18 @@
99

1010
---
1111

12-
## [1.0.9.2] - 2025-12-04
12+
## [1.0.9.2] - 2025-12-09
13+
14+
### Added
15+
- 新增流式消息结束标识的stream_end消息
16+
- stream消息支持分批存储
17+
18+
### Changed
19+
- 修改message表结构,新增字段
20+
21+
---
22+
23+
## [1.0.9.1] - 2025-12-04
1324

1425
### Added
1526
- 打印payload日志

config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"is_send_think": true,
3434
"is_send_answer": true,
3535
"is_stored": false,
36+
"stream_batch_size": 256,
3637
"is_show_in_terminal": false,
3738
"is_send_full_arguments": false
3839
},

oxygent/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class Config:
6868
"is_send_think": True,
6969
"is_send_answer": True,
7070
"is_stored": False,
71+
"stream_batch_size": 256,
7172
"is_show_in_terminal": False,
7273
"is_send_full_arguments": False,
7374
},
@@ -359,6 +360,14 @@ def set_message_is_stored(cls, is_stored=True):
359360
def get_message_is_stored(cls):
360361
return cls.get_module_config("message", "is_stored")
361362

363+
@classmethod
364+
def set_message_stream_batch_size(cls, stream_batch_size=128):
365+
cls.set_module_config("message", "stream_batch_size", stream_batch_size)
366+
367+
@classmethod
368+
def get_message_stream_batch_size(cls):
369+
return cls.get_module_config("message", "stream_batch_size")
370+
362371
@classmethod
363372
def set_message_is_show_in_terminal(cls, is_show_in_terminal=True):
364373
cls.set_module_config("message", "is_show_in_terminal", is_show_in_terminal)

oxygent/mas.py

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from .utils.common_utils import (
4747
generate_uuid,
4848
get_format_time,
49+
get_timestamp,
4950
msgpack_preprocess,
5051
print_tree,
5152
to_json,
@@ -100,6 +101,8 @@ class MAS(BaseModel):
100101
routers: list = Field(default_factory=list)
101102
middlewares: list = Field(default_factory=list)
102103

104+
stream_dict: dict[str, list] = Field(default_factory=dict)
105+
103106
def __init__(self, **kwargs):
104107
"""Construct a new :class:`MAS`.
105108
@@ -284,9 +287,14 @@ async def init_db(self):
284287
"mappings": {
285288
"properties": {
286289
"message_id": {"type": "keyword"},
290+
"group_id": {"type": "keyword"},
287291
"trace_id": {"type": "keyword"},
292+
"node_id": {"type": "keyword"},
293+
"node_name": {"type": "keyword"},
288294
"message": {"type": "text"},
289295
"message_type": {"type": "keyword"},
296+
"message_event": {"type": "keyword"},
297+
"message_timestamp": {"type": "long"},
290298
"create_time": {
291299
"format": "yyyy-MM-dd HH:mm:ss.SSSSSSSSS",
292300
"type": "date",
@@ -566,7 +574,9 @@ async def call(self, callee, arguments, **kwargs):
566574
oxy_response = await oxy.execute(oxy_request)
567575
return oxy_response.output
568576

569-
async def send_message(self, sse_message: SSEMessage, redis_key: str):
577+
async def send_message(
578+
self, sse_message: SSEMessage, redis_key: str, group_id: str = ""
579+
):
570580
"""Push *message* onto a capped Redis list.
571581
572582
The data is MsgPack‑encoded before being stored. At most **10** items
@@ -598,18 +608,78 @@ async def send_message(self, sse_message: SSEMessage, redis_key: str):
598608
parts = redis_key.split(":")
599609
current_trace_id = parts[-1] if len(parts) >= 3 else ""
600610

601-
# Insert into Elasticsearch
602-
await self.es_client.index(
603-
Config.get_app_name() + "_message",
604-
doc_id=sse_message.id,
605-
body={
606-
"message_id": sse_message.id,
607-
"trace_id": current_trace_id,
608-
"message": to_json(message),
609-
"message_type": message_type,
610-
"create_time": get_format_time(),
611-
},
612-
)
611+
# 考虑 message 是 str 的情况
612+
node_id = ""
613+
node_name = ""
614+
message_timestamp = get_timestamp()
615+
if isinstance(message, dict):
616+
message_timestamp = message.get("timestamp", get_timestamp())
617+
if isinstance(message.get("content"), dict):
618+
node_id = message.get("content", {}).get("node_id", "")
619+
node_name = message.get("content", {}).get("agent", "")
620+
621+
if message_type in ["stream", "stream_end"]:
622+
# 排队
623+
if message_type == "stream":
624+
delta = message.get("content", {}).get("delta", "")
625+
if node_id not in self.stream_dict:
626+
self.stream_dict[node_id] = []
627+
self.stream_dict[node_id].append(delta)
628+
if message_type == "stream_end" or (
629+
self.stream_dict[node_id]
630+
and len(self.stream_dict[node_id])
631+
% Config.get_message_stream_batch_size()
632+
== 0
633+
):
634+
message_id = generate_uuid()
635+
merged_type = "merged_stream"
636+
save_message_task = asyncio.create_task(
637+
self.es_client.index(
638+
Config.get_app_name() + "_message",
639+
doc_id=message_id,
640+
body={
641+
"message_id": message_id,
642+
"group_id": group_id,
643+
"trace_id": current_trace_id,
644+
"node_id": node_id,
645+
"node_name": node_name,
646+
"message": to_json(
647+
{
648+
"type": merged_type,
649+
"content": "".join(self.stream_dict[node_id]),
650+
}
651+
),
652+
"message_type": merged_type,
653+
"message_event": sse_message.event,
654+
"message_timestamp": message_timestamp,
655+
"create_time": get_format_time(),
656+
},
657+
)
658+
)
659+
save_message_task.add_done_callback(self.background_tasks.discard)
660+
self.background_tasks.add(save_message_task)
661+
self.stream_dict[node_id].clear()
662+
else:
663+
save_message_task = asyncio.create_task(
664+
self.es_client.index(
665+
Config.get_app_name() + "_message",
666+
doc_id=sse_message.id,
667+
body={
668+
"message_id": sse_message.id,
669+
"group_id": group_id,
670+
"trace_id": current_trace_id,
671+
"node_id": node_id,
672+
"node_name": node_name,
673+
"message": to_json(message),
674+
"message_type": message_type,
675+
"message_event": sse_message.event,
676+
"message_timestamp": message_timestamp,
677+
"create_time": get_format_time(),
678+
},
679+
)
680+
)
681+
save_message_task.add_done_callback(self.background_tasks.discard)
682+
self.background_tasks.add(save_message_task)
613683
if message_is_send:
614684
bytes_msg = msgpack.packb(msgpack_preprocess(sse_message.to_sse()))
615685
await self.redis_client.lpush(redis_key, bytes_msg)
@@ -745,7 +815,9 @@ async def chat_with_agent(
745815

746816
if send_msg_key:
747817
await self.send_message(
748-
SSEMessage(event="close", data="done"), send_msg_key
818+
SSEMessage(event="close", data="done"),
819+
send_msg_key,
820+
group_id=oxy_request.group_id,
749821
)
750822
return oxy_response
751823
except Exception:

oxygent/oxy/agents/sse_oxy_agent.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ async def _execute(self, oxy_request: OxyRequest) -> OxyResponse:
4545
del payload["node_id_stack"]
4646
payload["caller"] = "user"
4747
del payload["arguments"]
48+
if "shared_data" in payload and "_headers" in payload["shared_data"]:
49+
del payload["shared_data"]["_headers"]
4850

4951
url = build_url(self.server_url, "/sse/chat")
5052
answer = ""

oxygent/oxy/llms/http_llm.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,18 @@ async def _execute(self, oxy_request: OxyRequest) -> OxyResponse:
147147
"agent": oxy_request.caller,
148148
"node_id": oxy_request.node_id,
149149
},
150-
"_is_stored": False,
151150
}
152151
)
152+
await oxy_request.send_message(
153+
{
154+
"type": "stream_end",
155+
"content": {
156+
"delta": "",
157+
"agent": oxy_request.caller,
158+
"node_id": oxy_request.node_id,
159+
},
160+
}
161+
)
153162
result = "".join(result_parts)
154163
return OxyResponse(state=OxyState.COMPLETED, output=result)
155164

oxygent/oxy/llms/openai_llm.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ async def _execute(self, oxy_request: OxyRequest) -> OxyResponse:
8383
"agent": oxy_request.caller,
8484
"node_id": oxy_request.node_id,
8585
},
86-
"_is_stored": False,
8786
}
8887
)
8988
answer += "<think>"
@@ -100,7 +99,6 @@ async def _execute(self, oxy_request: OxyRequest) -> OxyResponse:
10099
"agent": oxy_request.caller,
101100
"node_id": oxy_request.node_id,
102101
},
103-
"_is_stored": False,
104102
}
105103
)
106104
answer += "</think>"
@@ -116,9 +114,18 @@ async def _execute(self, oxy_request: OxyRequest) -> OxyResponse:
116114
"agent": oxy_request.caller,
117115
"node_id": oxy_request.node_id,
118116
},
119-
"_is_stored": False,
120117
}
121118
)
119+
await oxy_request.send_message(
120+
{
121+
"type": "stream_end",
122+
"content": {
123+
"delta": "",
124+
"agent": oxy_request.caller,
125+
"node_id": oxy_request.node_id,
126+
},
127+
}
128+
)
122129
return OxyResponse(state=OxyState.COMPLETED, output=answer)
123130
else:
124131
return OxyResponse(

oxygent/schemas/oxy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ async def send_message(self, message=None, event=None, id=None):
361361
redis_key = (
362362
f"{self.mas.message_prefix}:{self.mas.name}:{self.current_trace_id}"
363363
)
364-
await self.mas.send_message(sse_message, redis_key)
364+
await self.mas.send_message(sse_message, redis_key, group_id=self.group_id)
365365

366366
def set_query(self, query, master_level=False):
367367
if master_level:

oxygent/utils/common_utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,14 @@ def get_mac_address():
3939
return mac_address
4040

4141

42-
def get_timestamp():
42+
def get_timestamp_str():
4343
return str(datetime.now().timestamp())
4444

4545

46+
def get_timestamp():
47+
return datetime.now().timestamp() * 1000
48+
49+
4650
def get_format_time():
4751
"""Yyyy-MM-dd HH:mm:ss."""
4852
"""yyyy-MM-dd HH:mm:ss.SSS"""

test/unittest/test_oxy_request.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self):
2323
self.message_prefix = "msg"
2424
self.name = "ut_mas"
2525

26-
async def send_message(self, message, redis_key):
26+
async def send_message(self, message, redis_key, group_id=""):
2727
self.last_msg = (redis_key, message)
2828

2929

0 commit comments

Comments
 (0)