-
Notifications
You must be signed in to change notification settings - Fork 229
Expand file tree
/
Copy pathsummarizer.py
More file actions
97 lines (81 loc) · 3.4 KB
/
summarizer.py
File metadata and controls
97 lines (81 loc) · 3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""Summarizer module for memory summarization operations."""
import datetime
import zoneinfo
from agentscope.agent import ReActAgent
from agentscope.message import Msg
from agentscope.tool import Toolkit
from ..utils import AsMsgHandler
from ....core.op import BaseOp
from ....core.utils import get_logger
logger = get_logger()
class Summarizer(BaseOp):
"""Summarizer class for summarizing memory messages."""
def __init__(
self,
working_dir: str,
memory_dir: str,
memory_compact_threshold: int,
toolkit: Toolkit | None = None,
console_enabled: bool = False,
timezone: str | None = None,
add_thinking_block: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.working_dir: str = working_dir
self.memory_dir: str = memory_dir
self.memory_compact_threshold: int = memory_compact_threshold
self.toolkit: Toolkit | None = toolkit
self.console_enabled: bool = console_enabled
self.timezone: str | None = timezone
self.add_thinking_block: bool = add_thinking_block
def _get_current_datetime(self) -> datetime.datetime:
"""Get current datetime with timezone, fallback to local time if timezone is invalid."""
if self.timezone:
try:
return datetime.datetime.now(zoneinfo.ZoneInfo(self.timezone))
except Exception as e:
logger.error(f"Invalid timezone: {self.timezone}, falling back to local time error={e}")
return datetime.datetime.now()
async def execute(self):
messages: list[Msg] = self.context.get("messages", [])
if not messages:
return ""
msg_handler = AsMsgHandler(self.as_token_counter)
before_token_count = await msg_handler.count_msgs_token(messages)
history_formatted_str: str = await msg_handler.format_msgs_to_str(
messages=messages,
memory_compact_threshold=self.memory_compact_threshold,
include_thinking=self.add_thinking_block,
)
after_token_count = await msg_handler.count_str_token(history_formatted_str)
logger.info(f"Summarizer before_token_count={before_token_count} after_token_count={after_token_count}")
if not history_formatted_str:
logger.warning(f"No history to summarize. messages={messages}")
return ""
agent = ReActAgent(
name="reme_summarizer",
model=self.as_llm,
sys_prompt="You are a helpful assistant.",
formatter=self.as_llm_formatter,
toolkit=self.toolkit,
)
agent.set_console_output_enabled(self.console_enabled)
user_message: str = f"# conversation\n{history_formatted_str}\n\n" + self.prompt_format(
"user_message",
date=self._get_current_datetime().strftime("%Y-%m-%d"),
working_dir=self.working_dir,
memory_dir=self.memory_dir,
)
summary_msg: Msg = await agent.reply(
Msg(
name="reme",
role="user",
content=user_message,
),
)
for i, (msg, _) in enumerate(agent.memory.content):
logger.info(f"Summarizer memory[{i}]: {msg.content}")
history_summary: str = summary_msg.get_text_content()
logger.info(f"Summarizer Result:\n{history_summary}")
return history_summary