-
Notifications
You must be signed in to change notification settings - Fork 229
Expand file tree
/
Copy pathtool_result_compactor.py
More file actions
161 lines (135 loc) · 5.86 KB
/
tool_result_compactor.py
File metadata and controls
161 lines (135 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Tool Result Compactor: truncate large tool results and save full content to files."""
import os
import sys
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from agentscope.message import Msg
from ..utils import truncate_text_output, DEFAULT_MAX_BYTES, TRUNCATION_NOTICE_MARKER
from ....core.op import BaseOp
from ....core.utils import get_logger
logger = get_logger()
class ToolResultCompactor(BaseOp):
"""Truncate large tool_result outputs and save full content to files."""
def __init__(
self,
tool_result_dir: str | Path,
retention_days: int = 3,
old_max_bytes: int = 3000,
recent_max_bytes: int = DEFAULT_MAX_BYTES,
recent_n: int = 1,
encoding: str = "utf-8",
**kwargs,
):
super().__init__(**kwargs)
self.tool_result_dir = Path(tool_result_dir)
self.retention_days = retention_days
self.old_max_bytes = old_max_bytes
self.recent_max_bytes = recent_max_bytes
self.recent_n = recent_n
self.encoding = encoding
self.tool_result_dir.mkdir(parents=True, exist_ok=True)
def _truncate(self, content: str, max_bytes: int) -> str:
if not content:
return content
try:
if TRUNCATION_NOTICE_MARKER in content:
return truncate_text_output(content, max_bytes=max_bytes, encoding=self.encoding)
if len(content.encode(self.encoding)) <= max_bytes + 100:
return content
saved_path: str | None = None
fp = self.tool_result_dir / f"{uuid.uuid4().hex}.txt"
fp.write_text(content, encoding=self.encoding)
saved_path = str(fp)
return truncate_text_output(
content,
1,
content.count("\n") + 1,
max_bytes,
file_path=saved_path,
encoding=self.encoding,
)
except Exception as e:
logger.warning("Failed to truncate content, returning original: %s", e)
return content
def _compact(self, output: str | list[dict], max_bytes: int) -> str | list[dict]:
"""Truncate output to max_bytes, saving full content to file if needed."""
if isinstance(output, str):
return self._truncate(output, max_bytes)
if isinstance(output, list):
for b in output:
if isinstance(b, dict) and b.get("type") == "text":
b["text"] = self._truncate(b.get("text", ""), max_bytes)
return output
async def execute(self) -> list[Msg]:
"""Process all messages, truncating large tool results."""
messages: list[Msg] = self.context.get("messages", [])
if not messages:
return messages
recent_n = 0
for msg in reversed(messages):
if not isinstance(msg.content, list) or not any(
isinstance(b, dict) and b.get("type") == "tool_result" for b in msg.content
):
break
recent_n += 1
split_index = max(0, len(messages) - max(recent_n, self.recent_n))
md_file_tool_ids = set()
try:
for msg in messages:
if not isinstance(msg.content, list):
continue
for block in msg.content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tool_id = block.get("id", "")
if not tool_id:
continue
if (
block.get("name", "").lower() == "read_file"
and ".md" in (block.get("raw_input") or "").lower()
):
md_file_tool_ids.add(tool_id)
except Exception as e:
logger.warning("Failed to detect md file tool ids: %s", e)
logger.info(f"md_file_tool_ids: {md_file_tool_ids}")
for idx, msg in enumerate(messages):
if not isinstance(msg.content, list):
continue
is_recent = idx >= split_index
max_bytes = self.recent_max_bytes if is_recent else self.old_max_bytes
for block in msg.content:
if isinstance(block, dict) and block.get("type") == "tool_result" and block.get("output"):
tool_use_id = block.get("id", "")
if tool_use_id in md_file_tool_ids:
effective_max_bytes = self.recent_max_bytes
else:
effective_max_bytes = max_bytes
block["output"] = self._compact(block["output"], effective_max_bytes)
return messages
def cleanup_expired_files(self) -> int:
"""Clean up files older than retention_days.
Returns:
Number of files successfully deleted.
"""
if not self.tool_result_dir.exists():
return 0
cutoff = datetime.now() - timedelta(days=self.retention_days)
deleted = failed = 0
for fp in self.tool_result_dir.glob("*.txt"):
try:
stat = os.stat(fp)
if sys.platform == "win32":
ts = stat.st_ctime # creation time on Windows
else:
ts = getattr(stat, "st_birthtime", stat.st_mtime) # macOS/BSD; Linux fallback to mtime
if datetime.fromtimestamp(ts) < cutoff:
fp.unlink()
deleted += 1
except FileNotFoundError:
pass # deleted by another process between glob and stat/unlink
except Exception as e:
failed += 1
logger.warning("Failed to delete %s: %s", fp, e)
if deleted or failed:
logger.info("Cleaned up %d expired files (%d failed)", deleted, failed)
return deleted