Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def get_secure_path(path: str, workspace_id: str, agent_id: str, session_id: str
# Resolve absolute path
if os.path.isabs(path):
# Treat absolute paths as relative to the session root if they start with /
rel_path = path.lstrip(os.sep)
rel_path = path.lstrip(os.sep + (os.altsep or ""))
final_path = os.path.abspath(os.path.join(session_dir, rel_path))
else:
final_path = os.path.abspath(os.path.join(session_dir, path))
Expand Down
59 changes: 43 additions & 16 deletions tools/tests/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,25 +318,36 @@ def test_anthropic_spec_exists(self):
assert spec.tools == []
assert "llm_generate" in spec.node_types
assert "llm_tool_use" in spec.node_types
assert spec.required is True
assert spec.startup_required is True
# Anthropic is optional in llm.py
assert spec.required is False
assert spec.startup_required is False
assert "anthropic.com" in spec.help_url


class TestNodeTypeValidation:
"""Tests for node type credential validation."""

def test_get_missing_for_node_types_returns_missing(self, monkeypatch, tmp_path):
"""get_missing_for_node_types() returns missing credentials."""
"""get_missing_for_node_types() returns missing REQUIRED credentials only."""
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)

creds = CredentialManager(dotenv_path=tmp_path / ".env")
missing = creds.get_missing_for_node_types(["llm_generate", "llm_tool_use"])
# Create a custom spec that IS required to test this logic
custom_specs = {
"required_llm": CredentialSpec(
env_var="REQUIRED_LLM_KEY",
node_types=["llm_generate"],
required=True,
)
}

# Use custom specs strictly for this test
creds = CredentialManager(dotenv_path=tmp_path / ".env", specs=custom_specs)
missing = creds.get_missing_for_node_types(["llm_generate"])

assert len(missing) == 1
cred_name, spec = missing[0]
assert cred_name == "anthropic"
assert spec.env_var == "ANTHROPIC_API_KEY"
assert cred_name == "required_llm"
assert spec.env_var == "REQUIRED_LLM_KEY"

def test_get_missing_for_node_types_returns_empty_when_present(self, monkeypatch):
"""get_missing_for_node_types() returns empty when credentials present."""
Expand All @@ -357,17 +368,25 @@ def test_get_missing_for_node_types_ignores_unknown_types(self, monkeypatch):
assert missing == []

def test_validate_for_node_types_raises_for_missing(self, monkeypatch, tmp_path):
"""validate_for_node_types() raises CredentialError when missing."""
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
"""validate_for_node_types() raises CredentialError when REQUIRED missing."""
monkeypatch.delenv("REQUIRED_KEY", raising=False)

creds = CredentialManager(dotenv_path=tmp_path / ".env")
custom_specs = {
"required_cred": CredentialSpec(
env_var="REQUIRED_KEY",
node_types=["test_node"],
required=True,
)
}

creds = CredentialManager(dotenv_path=tmp_path / ".env", specs=custom_specs)

with pytest.raises(CredentialError) as exc_info:
creds.validate_for_node_types(["llm_generate"])
creds.validate_for_node_types(["test_node"])

error_msg = str(exc_info.value)
assert "ANTHROPIC_API_KEY" in error_msg
assert "llm_generate" in error_msg
assert "REQUIRED_KEY" in error_msg
assert "test_node" in error_msg

def test_validate_for_node_types_passes_when_present(self, monkeypatch):
"""validate_for_node_types() passes when credentials present."""
Expand All @@ -384,15 +403,23 @@ class TestStartupValidation:

def test_validate_startup_raises_for_missing(self, monkeypatch, tmp_path):
"""validate_startup() raises CredentialError when startup creds missing."""
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("STARTUP_KEY", raising=False)

creds = CredentialManager(dotenv_path=tmp_path / ".env")
custom_specs = {
"startup_cred": CredentialSpec(
env_var="STARTUP_KEY",
startup_required=True,
required=True
)
}

creds = CredentialManager(dotenv_path=tmp_path / ".env", specs=custom_specs)

with pytest.raises(CredentialError) as exc_info:
creds.validate_startup()

error_msg = str(exc_info.value)
assert "ANTHROPIC_API_KEY" in error_msg
assert "STARTUP_KEY" in error_msg
assert "Server startup failed" in error_msg

def test_validate_startup_passes_when_present(self, monkeypatch):
Expand Down
85 changes: 50 additions & 35 deletions tools/tests/tools/test_file_system_toolkits.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for file_system_toolkits tools (FastMCP)."""
import os
import pytest
from unittest.mock import patch
from unittest.mock import patch, MagicMock

from fastmcp import FastMCP

Expand Down Expand Up @@ -528,59 +528,74 @@ def execute_command_fn(self, mcp):

def test_execute_simple_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a simple command returns output."""
result = execute_command_fn(
command="echo 'Hello World'",
**mock_workspace
)

assert result["success"] is True
assert result["return_code"] == 0
assert "Hello World" in result["stdout"]
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout="Hello World", stderr="", returncode=0)

result = execute_command_fn(
command="echo 'Hello World'",
**mock_workspace
)

assert result["success"] is True
assert result["return_code"] == 0
assert "Hello World" in result["stdout"]

def test_execute_failing_command(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a failing command returns non-zero exit code."""
result = execute_command_fn(
command="exit 1",
**mock_workspace
)
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout="", stderr="Error", returncode=1)

result = execute_command_fn(
command="exit 1",
**mock_workspace
)

assert result["success"] is True
assert result["return_code"] == 1
assert result["success"] is True
assert result["return_code"] == 1

def test_execute_command_with_stderr(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a command that writes to stderr captures it."""
result = execute_command_fn(
command="echo 'error message' >&2",
**mock_workspace
)
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout="", stderr="error message", returncode=0)

assert result["success"] is True
assert "error message" in result.get("stderr", "")
result = execute_command_fn(
command="echo 'error message' >&2",
**mock_workspace
)

assert result["success"] is True
assert "error message" in result.get("stderr", "")

def test_execute_command_list_files(self, execute_command_fn, mock_workspace, mock_secure_path, tmp_path):
"""Executing ls command lists files."""
# Create a test file
(tmp_path / "testfile.txt").write_text("content")

result = execute_command_fn(
command=f"ls {tmp_path}",
**mock_workspace
)
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout="testfile.txt", stderr="", returncode=0)

assert result["success"] is True
assert result["return_code"] == 0
assert "testfile.txt" in result["stdout"]
result = execute_command_fn(
command=f"ls {tmp_path}",
**mock_workspace
)

assert result["success"] is True
assert result["return_code"] == 0
assert "testfile.txt" in result["stdout"]

def test_execute_command_with_pipe(self, execute_command_fn, mock_workspace, mock_secure_path):
"""Executing a command with pipe works correctly."""
result = execute_command_fn(
command="echo 'hello world' | tr 'a-z' 'A-Z'",
**mock_workspace
)
with patch("aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout="HELLO WORLD", stderr="", returncode=0)

assert result["success"] is True
assert result["return_code"] == 0
assert "HELLO WORLD" in result["stdout"]
result = execute_command_fn(
command="echo 'hello world' | tr 'a-z' 'A-Z'",
**mock_workspace
)

assert result["success"] is True
assert result["return_code"] == 0
assert "HELLO WORLD" in result["stdout"]


class TestApplyDiffTool:
Expand Down
35 changes: 35 additions & 0 deletions tools/tests/tools/test_web_search_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,41 @@ def test_long_query_returns_error(self, web_search_fn, monkeypatch):

assert "error" in result

def test_search_success_mock(self, web_search_fn, monkeypatch):
"""Search success with mocked provider."""
import httpx
from unittest.mock import MagicMock, patch

# Mock keys
monkeypatch.setenv("BRAVE_SEARCH_API_KEY", "mock-key")

# Mock httpx response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"web": {
"results": [
{"title": "Mock Result 1", "url": "http://mock1.com", "description": "Desc 1"},
{"title": "Mock Result 2", "url": "http://mock2.com", "description": "Desc 2"},
]
}
}

# Patch httpx.get where it is used
with patch("aden_tools.tools.web_search_tool.web_search_tool.httpx.get", return_value=mock_response) as mock_get:
result = web_search_fn(query="test query")

assert "error" not in result
assert result["total"] == 2
assert result["results"][0]["title"] == "Mock Result 1"
assert result["provider"] == "brave"

# Verify call args
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
assert args[0] == "https://api.search.brave.com/res/v1/web/search"
assert kwargs["headers"]["X-Subscription-Token"] == "mock-key"


class TestBraveProvider:
"""Tests for Brave Search provider."""
Expand Down
Loading