Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions camel/toolkits/terminal_toolkit/terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys
import threading
import time
import uuid
from queue import Empty, Full, Queue
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -1228,6 +1229,134 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
except EOFError:
return f"User input interrupted for session '{id}'."

def shell_write_content_to_file(self, content: str, file_path: str) -> str:
r"""Writes the specified content to a file at the given path.

Args:
content (str): The content to write to the file.
file_path (str): The path to the file where the content should
be written. Can be absolute or relative to working_dir.

Returns:
str: A confirmation message indicating success or an error message.
"""
# For local backend, resolve relative paths to working_dir
if not self.use_docker_backend and self.working_dir:
if not os.path.isabs(file_path):
file_path = os.path.normpath(
os.path.join(self.working_dir, file_path)
)
else:
file_path = os.path.normpath(file_path)
file_path = os.path.abspath(file_path)

# Safe mode path containment check for local backend
if self.safe_mode:
working_dir_normalized = os.path.normpath(
os.path.abspath(self.working_dir)
)
# Use os.path.commonpath for secure path containment check
try:
common = os.path.commonpath(
[file_path, working_dir_normalized]
)
if common != working_dir_normalized:
return (
"Error: Cannot write to a file outside of the "
"working directory in safe mode."
)
except ValueError:
# Paths are on different drives (Windows) or invalid
return (
"Error: Cannot write to a file outside of the "
"working directory in safe mode."
)

log_entry = (
f"--- Writing content to file at {time.ctime()} ---\n"
f"> {file_path}\n"
)
if self.use_docker_backend:
temp_host_path = None
try:
# Ensure parent directory exists in container
parent_dir = os.path.dirname(file_path)
if parent_dir:
quoted_dir = shlex.quote(parent_dir)
mkdir_cmd = f'sh -lc "mkdir -p {quoted_dir}"'
mkdir_exec = self.docker_api_client.exec_create(
self.container.id, mkdir_cmd
)
self.docker_api_client.exec_start(mkdir_exec['Id'])

# Write content to a temporary file on the host inside log_dir
temp_file_name = f"temp_{uuid.uuid4().hex}.txt"
temp_host_path = os.path.join(self.log_dir, temp_file_name)
with open(temp_host_path, "w", encoding="utf-8") as f:
f.write(content)
# Copy the temporary file into the Docker container
dest_path_in_container = file_path
container_dest = (
f"{self.container.name}:{dest_path_in_container}"
)
subprocess.run(
['docker', 'cp', temp_host_path, container_dest],
check=True,
capture_output=True,
text=True,
)

log_entry += f"\n-------- \n{content}\n--------\n"
with open(self.blocking_log_file, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
return (
f"Content successfully written to '{file_path}' "
f"in Docker container."
)
except subprocess.CalledProcessError as e:
log_entry += f"--- Error ---\n{e.stderr}\n"
with open(self.blocking_log_file, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
return (
f"Error writing to file '{file_path}' "
f"in Docker container: {e.stderr}"
)
except Exception as e:
log_entry += f"--- Error ---\n{e}\n"
with open(self.blocking_log_file, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
return (
f"Error writing to file '{file_path}' "
f"in Docker container: {e}"
)
finally:
# Clean up the temporary file
if temp_host_path and os.path.exists(temp_host_path):
try:
os.remove(temp_host_path)
except OSError:
pass

else:
try:
# Ensure parent directory exists
parent_dir = os.path.dirname(file_path)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)

with open(file_path, "w", encoding="utf-8") as f:
f.write(content)

log_entry += f"\n-------- \n{content}\n--------\n"
with open(self.blocking_log_file, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
return f"Content successfully written to '{file_path}'."
except Exception as e:
log_entry += f"--- Error ---\n{e}\n"
with open(self.blocking_log_file, "a", encoding="utf-8") as f:
f.write(log_entry + "\n")
return f"Error writing to file '{file_path}': {e}"

def __enter__(self):
r"""Context manager entry."""
return self
Expand Down Expand Up @@ -1275,6 +1404,7 @@ def get_tools(self) -> List[FunctionTool]:
return [
FunctionTool(self.shell_exec),
FunctionTool(self.shell_view),
FunctionTool(self.shell_write_content_to_file),
FunctionTool(self.shell_write_to_process),
FunctionTool(self.shell_kill_process),
FunctionTool(self.shell_ask_user_for_help),
Expand Down
62 changes: 62 additions & 0 deletions test/toolkits/test_terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,65 @@ def test_shell_exec_multiple_sessions(terminal_toolkit, temp_dir):
# For non-blocking mode, sessions should be created immediately
assert session1 in terminal_toolkit.shell_sessions
assert session2 in terminal_toolkit.shell_sessions


def test_shell_write_content_to_file_basic(temp_dir, request):
"""Test basic file writing functionality."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=False)
request.addfinalizer(toolkit.cleanup)

test_content = "Hello, World!"
test_file = temp_dir / "test_write.txt"

result = toolkit.shell_write_content_to_file(test_content, str(test_file))
assert "successfully" in result.lower()
assert test_file.exists()
assert test_file.read_text() == test_content


def test_shell_write_content_to_file_with_subdirectory(temp_dir, request):
"""Test file writing with automatic parent directory creation."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=False)
request.addfinalizer(toolkit.cleanup)

test_content = "Nested content"
# Create a path with non-existent subdirectory
test_file = temp_dir / "subdir" / "nested" / "test.txt"

result = toolkit.shell_write_content_to_file(test_content, str(test_file))
assert "successfully" in result.lower()
assert test_file.exists()
assert test_file.read_text() == test_content


def test_shell_write_content_to_file_safe_mode_relative_path(
temp_dir, request
):
"""Test safe mode with relative paths resolves correctly."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=True)
request.addfinalizer(toolkit.cleanup)

test_content = "Safe mode content"
# Use a relative path - should be resolved relative to working_dir
result = toolkit.shell_write_content_to_file(test_content, "relative.txt")
assert "successfully" in result.lower()
# File should be created inside working_dir
expected_file = temp_dir / "relative.txt"
assert expected_file.exists()
assert expected_file.read_text() == test_content


def test_shell_write_content_to_file_safe_mode_blocks_path_traversal(
temp_dir, request
):
"""Test that safe mode blocks path traversal attempts."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=True)
request.addfinalizer(toolkit.cleanup)

test_content = "Malicious content"
# Attempt path traversal
result = toolkit.shell_write_content_to_file(
test_content, "../outside_working_dir.txt"
)
assert "error" in result.lower()
assert "outside" in result.lower() or "working directory" in result.lower()
Loading