Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions camel/toolkits/terminal_toolkit/terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,15 +1235,23 @@ def shell_write_content_to_file(self, content: str, file_path: str) -> str:
Args:
content (str): The content to write to the file.
file_path (str): The path to the file where the content should
be written.
be written. Can be absolute or relative to working_dir.

Returns:
str: A confirmation message indicating success or an error message.
"""
if self.safe_mode and self.working_dir:
# Resolve to absolute path and normalize to prevent path traversal
abs_file_path = os.path.normpath(os.path.abspath(file_path))
working_dir_normalized = os.path.normpath(self.working_dir)
if self.safe_mode and self.working_dir and not self.use_docker_backend:
# Resolve relative paths relative to working_dir, not CWD
if os.path.isabs(file_path):
abs_file_path = os.path.normpath(file_path)
else:
abs_file_path = os.path.normpath(
os.path.join(self.working_dir, file_path)
)
working_dir_normalized = os.path.normpath(
os.path.abspath(self.working_dir)
)
abs_file_path = os.path.abspath(abs_file_path)
# Use os.path.commonpath for secure path containment check
try:
common = os.path.commonpath(
Expand All @@ -1260,13 +1268,25 @@ def shell_write_content_to_file(self, content: str, file_path: str) -> str:
"Error: Cannot write to a file outside of the "
"working directory in safe mode."
)
# Use the resolved absolute path for local writes
file_path = abs_file_path

log_entry = (
f"--- Writing content to file at {time.ctime()} ---\n"
f"> {file_path}\n"
)
if self.use_docker_backend:
try:
# Ensure parent directory exists in container
parent_dir = os.path.dirname(file_path)
if parent_dir:
quoted_dir = shlex.quote(parent_dir)
mkdir_cmd = f'sh -c "mkdir -p {quoted_dir}"'
mkdir_exec = self.docker_api_client.exec_create(
self.container.id, mkdir_cmd
)
self.docker_api_client.exec_start(mkdir_exec['Id'])

# Write content to a temporary file on the host inside log_dir
temp_file_name = f"temp_{uuid.uuid4().hex}.txt"
temp_host_path = os.path.join(self.log_dir, temp_file_name)
Expand Down Expand Up @@ -1311,6 +1331,11 @@ def shell_write_content_to_file(self, content: str, file_path: str) -> str:

else:
try:
# Ensure parent directory exists
parent_dir = os.path.dirname(file_path)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)

with open(file_path, "w", encoding="utf-8") as f:
f.write(content)

Expand Down
62 changes: 62 additions & 0 deletions test/toolkits/test_terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,65 @@ def test_shell_exec_multiple_sessions(terminal_toolkit, temp_dir):
# For non-blocking mode, sessions should be created immediately
assert session1 in terminal_toolkit.shell_sessions
assert session2 in terminal_toolkit.shell_sessions


def test_shell_write_content_to_file_basic(temp_dir, request):
"""Test basic file writing functionality."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=False)
request.addfinalizer(toolkit.cleanup)

test_content = "Hello, World!"
test_file = temp_dir / "test_write.txt"

result = toolkit.shell_write_content_to_file(test_content, str(test_file))
assert "successfully" in result.lower()
assert test_file.exists()
assert test_file.read_text() == test_content


def test_shell_write_content_to_file_with_subdirectory(temp_dir, request):
"""Test file writing with automatic parent directory creation."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=False)
request.addfinalizer(toolkit.cleanup)

test_content = "Nested content"
# Create a path with non-existent subdirectory
test_file = temp_dir / "subdir" / "nested" / "test.txt"

result = toolkit.shell_write_content_to_file(test_content, str(test_file))
assert "successfully" in result.lower()
assert test_file.exists()
assert test_file.read_text() == test_content


def test_shell_write_content_to_file_safe_mode_relative_path(
temp_dir, request
):
"""Test safe mode with relative paths resolves correctly."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=True)
request.addfinalizer(toolkit.cleanup)

test_content = "Safe mode content"
# Use a relative path - should be resolved relative to working_dir
result = toolkit.shell_write_content_to_file(test_content, "relative.txt")
assert "successfully" in result.lower()
# File should be created inside working_dir
expected_file = temp_dir / "relative.txt"
assert expected_file.exists()
assert expected_file.read_text() == test_content


def test_shell_write_content_to_file_safe_mode_blocks_path_traversal(
temp_dir, request
):
"""Test that safe mode blocks path traversal attempts."""
toolkit = TerminalToolkit(working_directory=str(temp_dir), safe_mode=True)
request.addfinalizer(toolkit.cleanup)

test_content = "Malicious content"
# Attempt path traversal
result = toolkit.shell_write_content_to_file(
test_content, "../outside_working_dir.txt"
)
assert "error" in result.lower()
assert "outside" in result.lower() or "working directory" in result.lower()