Skip to content

Commit 6f6d081

Browse files
committed
fix: restrict local file paths in message tool
1 parent 8f5178d commit 6f6d081

2 files changed

Lines changed: 144 additions & 11 deletions

File tree

astrbot/core/tools/message_tools.py

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import shlex
44
import uuid
5+
from pathlib import Path
56

67
from pydantic import Field
78
from pydantic.dataclasses import dataclass
@@ -14,9 +15,50 @@
1415
from astrbot.core.computer.computer_client import get_booter
1516
from astrbot.core.message.message_event_result import MessageChain
1617
from astrbot.core.platform.message_session import MessageSession
17-
from astrbot.core.tools.computer_tools.util import check_admin_permission
18+
from astrbot.core.tools.computer_tools.util import (
19+
check_admin_permission,
20+
is_local_runtime,
21+
workspace_root,
22+
)
1823
from astrbot.core.tools.registry import builtin_tool
19-
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
24+
from astrbot.core.utils.astrbot_path import (
25+
get_astrbot_system_tmp_path,
26+
get_astrbot_temp_path,
27+
)
28+
29+
30+
def _file_send_allowed_roots(umo: str) -> tuple[Path, ...]:
31+
return (
32+
workspace_root(umo),
33+
Path(get_astrbot_temp_path()).resolve(strict=False),
34+
Path(get_astrbot_system_tmp_path()).resolve(strict=False),
35+
)
36+
37+
38+
def _is_path_within(path: Path, roots: tuple[Path, ...]) -> bool:
39+
return any(path == root or path.is_relative_to(root) for root in roots)
40+
41+
42+
def _is_restricted_local_env(context: ContextWrapper[AstrAgentContext]) -> bool:
43+
if not is_local_runtime(context):
44+
return False
45+
cfg = context.context.context.get_config(
46+
umo=context.context.event.unified_msg_origin
47+
)
48+
provider_settings = cfg.get("provider_settings", {})
49+
require_admin = provider_settings.get("computer_use_require_admin", True)
50+
return require_admin and context.context.event.role != "admin"
51+
52+
53+
def _can_send_local_file(
54+
context: ContextWrapper[AstrAgentContext],
55+
local_path: Path,
56+
) -> bool:
57+
umo = context.context.event.unified_msg_origin
58+
allowed_roots = _file_send_allowed_roots(umo)
59+
if _is_path_within(local_path, allowed_roots):
60+
return True
61+
return is_local_runtime(context) and not _is_restricted_local_env(context)
2062

2163

2264
@builtin_tool
@@ -85,23 +127,38 @@ async def _resolve_path_from_sandbox(
85127
*,
86128
component_type: str = "file",
87129
) -> tuple[str, bool]:
88-
path = str(path)
89-
# if the path is relative, check if the file exists in user's local workspace
130+
path = str(path).strip()
131+
if not path:
132+
raise FileNotFoundError(f"{component_type} path is empty")
133+
134+
# Relative host paths are resolved only inside the user's workspace.
90135
if not os.path.isabs(path):
91136
unified_msg_origin = context.context.event.unified_msg_origin
92137
if unified_msg_origin:
93-
from astrbot.core.tools.computer_tools.util import workspace_root
94-
95138
try:
96139
ws_path = workspace_root(unified_msg_origin)
97-
ws_candidate = (ws_path / path).resolve()
140+
ws_candidate = (ws_path / path).resolve(strict=False)
98141
if ws_candidate.is_file() and ws_candidate.is_relative_to(ws_path):
99142
return str(ws_candidate), False
100143
except Exception:
101144
pass
102-
# check if the file exists in local environment (only allow absolute paths to prevent traversal)
103-
elif os.path.isfile(path):
104-
return path, False
145+
else:
146+
local_candidate = Path(path).expanduser().resolve(strict=False)
147+
if local_candidate.is_file():
148+
if _can_send_local_file(context, local_candidate):
149+
return str(local_candidate), False
150+
if is_local_runtime(context):
151+
allowed = ", ".join(
152+
str(root)
153+
for root in _file_send_allowed_roots(
154+
context.context.event.unified_msg_origin
155+
)
156+
)
157+
raise PermissionError(
158+
"Local file send is restricted for this user. "
159+
f"Allowed directories: {allowed}. "
160+
f"Blocked path: {local_candidate}."
161+
)
105162

106163
try:
107164
sb = await get_booter(
@@ -221,6 +278,8 @@ async def call(
221278
)
222279
except FileNotFoundError as exc:
223280
return f"error: {exc}"
281+
except PermissionError as exc:
282+
return f"error: {exc}"
224283
except Exception as exc:
225284
return f"error: failed to build messages[{idx}] component: {exc}"
226285

tests/unit/test_message_tools.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@ def _make_context(
1212
current_session="feishu:GroupMessage:oc_xxx",
1313
role="admin",
1414
require_admin=True,
15+
runtime="local",
1516
):
1617
"""Build a minimal ContextWrapper for SendMessageToUserTool."""
17-
cfg = {"provider_settings": {"computer_use_require_admin": require_admin}}
18+
cfg = {
19+
"provider_settings": {
20+
"computer_use_require_admin": require_admin,
21+
"computer_use_runtime": runtime,
22+
}
23+
}
1824
return SimpleNamespace(
1925
context=SimpleNamespace(
2026
event=SimpleNamespace(
@@ -161,3 +167,71 @@ async def mock_get_booter(*args, **kwargs):
161167

162168
assert "error: failed to build messages[1] component: sandbox unavailable" in result
163169
ctx.context.context.send_message.assert_not_called()
170+
171+
172+
@pytest.mark.asyncio
173+
async def test_non_admin_cannot_send_arbitrary_local_absolute_file(tmp_path):
174+
"""Non-admin users cannot send host files outside the allowed local roots."""
175+
tool = SendMessageToUserTool()
176+
ctx = _make_context(role="member", require_admin=True)
177+
secret_path = tmp_path / "secret.txt"
178+
secret_path.write_text("secret", encoding="utf-8")
179+
180+
result = await tool.call(
181+
ctx,
182+
messages=[{"type": "file", "path": str(secret_path)}],
183+
)
184+
185+
assert "error: Local file send is restricted for this user" in result
186+
assert str(secret_path) in result
187+
ctx.context.context.send_message.assert_not_called()
188+
189+
190+
@pytest.mark.asyncio
191+
async def test_non_admin_can_send_workspace_file(tmp_path, monkeypatch):
192+
"""Non-admin users can send files inside their per-session workspace."""
193+
tool = SendMessageToUserTool()
194+
ctx = _make_context(
195+
current_session="feishu:GroupMessage:oc_workspace",
196+
role="member",
197+
require_admin=True,
198+
)
199+
workspace_root = tmp_path / "workspaces"
200+
workspace_file = workspace_root / "feishu_GroupMessage_oc_workspace" / "result.txt"
201+
workspace_file.parent.mkdir(parents=True)
202+
workspace_file.write_text("result", encoding="utf-8")
203+
monkeypatch.setattr(
204+
"astrbot.core.tools.computer_tools.util.get_astrbot_workspaces_path",
205+
lambda: str(workspace_root),
206+
)
207+
208+
result = await tool.call(
209+
ctx,
210+
messages=[{"type": "file", "path": "result.txt"}],
211+
)
212+
213+
assert "Message sent to session" in result
214+
ctx.context.context.send_message.assert_called_once()
215+
216+
217+
@pytest.mark.asyncio
218+
async def test_non_admin_can_send_temp_file(tmp_path, monkeypatch):
219+
"""Non-admin users can send generated files under AstrBot temp."""
220+
tool = SendMessageToUserTool()
221+
ctx = _make_context(role="member", require_admin=True)
222+
temp_root = tmp_path / "temp"
223+
temp_root.mkdir()
224+
output_path = temp_root / "output.txt"
225+
output_path.write_text("output", encoding="utf-8")
226+
monkeypatch.setattr(
227+
"astrbot.core.tools.message_tools.get_astrbot_temp_path",
228+
lambda: str(temp_root),
229+
)
230+
231+
result = await tool.call(
232+
ctx,
233+
messages=[{"type": "file", "path": str(output_path)}],
234+
)
235+
236+
assert "Message sent to session" in result
237+
ctx.context.context.send_message.assert_called_once()

0 commit comments

Comments
 (0)