Skip to content

Commit 85b94fe

Browse files
committed
Added tests.
1 parent 79c4299 commit 85b94fe

File tree

9 files changed

+568
-10
lines changed

9 files changed

+568
-10
lines changed

drone_sim_runner/sphx/sim_config.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,3 @@ def _prepare_temp_config(original_config_path: Path) -> Path:
123123
def _validate_path(path_: Path):
124124
if not path_.exists():
125125
raise FileNotFoundError(f"File does not exist: {path_}")
126-
127-
128-
if __name__ == '__main__':
129-
base_dir = Path(__file__).parent.parent.parent
130-
path_config = PathConfig.from_yaml(
131-
"",
132-
base_dir / "examples" / "example-config.yaml",
133-
base_dir,
134-
"")
135-
print(path_config)

tests/core/__init__.py

Whitespace-only changes.

tests/core/test_process_manager.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import subprocess
2+
import unittest
3+
from unittest.mock import MagicMock, patch
4+
5+
import psutil
6+
7+
from drone_sim_runner.core.process_manager import ProcessManager
8+
9+
10+
class TestProcessManager(unittest.TestCase):
11+
12+
def test_verify_process_running_none(self):
13+
"""Should return False if process is None."""
14+
result = ProcessManager.verify_process_running(None, "test_proc")
15+
self.assertFalse(result)
16+
17+
def test_verify_process_running_still_active(self):
18+
"""Should return True if poll() returns None (process still running)."""
19+
mock_proc = MagicMock(spec=subprocess.Popen)
20+
mock_proc.poll.return_value = None
21+
22+
result = ProcessManager.verify_process_running(mock_proc, "test_proc")
23+
self.assertTrue(result)
24+
25+
def test_verify_process_running_terminated(self):
26+
"""Should return False and print error if process has terminated."""
27+
mock_proc = MagicMock(spec=subprocess.Popen)
28+
29+
mock_stderr = MagicMock()
30+
mock_stderr.read.return_value = b"Some error occurred"
31+
mock_proc.stderr = mock_stderr
32+
33+
mock_proc.poll.return_value = 1
34+
35+
result = ProcessManager.verify_process_running(mock_proc, "test_proc")
36+
37+
self.assertFalse(result)
38+
mock_stderr.read.assert_called_once()
39+
40+
@patch('psutil.process_iter')
41+
def test_clean_remaining_processes(self, mock_iter):
42+
"""Should call kill() on processes that match the name."""
43+
proc1 = MagicMock()
44+
proc1.info = {'name': 'my_app_process'}
45+
46+
proc2 = MagicMock()
47+
proc2.info = {'name': 'other_process'}
48+
49+
mock_iter.return_value = [proc1, proc2]
50+
51+
ProcessManager.clean_remaining_processes("my_app")
52+
53+
proc1.kill.assert_called_once()
54+
proc2.kill.assert_not_called()
55+
56+
@patch('psutil.process_iter')
57+
def test_clean_remaining_processes_access_denied(self, mock_iter):
58+
"""Should handle AccessDenied exceptions gracefully."""
59+
proc1 = MagicMock()
60+
proc1.info = {'name': 'admin_proc'}
61+
proc1.kill.side_effect = psutil.AccessDenied()
62+
63+
mock_iter.return_value = [proc1]
64+
65+
ProcessManager.clean_remaining_processes("admin_proc")
66+
proc1.kill.assert_called_once()
67+
68+
def test_kill_process_tree_none_pid(self):
69+
"""Should return immediately if PID is None."""
70+
with patch('psutil.Process') as mock_ps:
71+
ProcessManager.kill_process_tree(None)
72+
mock_ps.assert_not_called()
73+
74+
@patch('psutil.Process')
75+
def test_kill_process_tree_recursive(self, mock_ps_class):
76+
"""Should kill children first, then the parent."""
77+
mock_parent = MagicMock()
78+
mock_child1 = MagicMock()
79+
mock_child2 = MagicMock()
80+
81+
mock_ps_class.return_value = mock_parent
82+
mock_parent.children.return_value = [mock_child1, mock_child2]
83+
84+
ProcessManager.kill_process_tree(1234)
85+
86+
mock_child1.kill.assert_called_once()
87+
mock_child2.kill.assert_called_once()
88+
mock_parent.kill.assert_called_once()
89+
90+
@patch('psutil.Process')
91+
def test_kill_process_tree_not_found(self, mock_ps_class):
92+
"""Should handle NoSuchProcess if the PID doesn't exist."""
93+
mock_ps_class.side_effect = psutil.NoSuchProcess(pid=9999)
94+
95+
ProcessManager.kill_process_tree(9999)
96+
97+
98+
if __name__ == '__main__':
99+
unittest.main()

tests/core/test_script_config.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import unittest
2+
from unittest.mock import patch
3+
4+
from drone_sim_runner.core.script_config import ScriptConfig
5+
6+
7+
class TestScriptConfig(unittest.TestCase):
8+
9+
@patch('pathlib.Path.exists')
10+
def test_valid_config(self, mock_exists):
11+
"""Should initialize correctly with valid path and args."""
12+
mock_exists.return_value = True
13+
config = ScriptConfig(script_path="/fake/path/script.sh", script_args=["--speed", "10"])
14+
self.assertEqual(config.script_path, "/fake/path/script.sh")
15+
self.assertEqual(config.script_args, ["--speed", "10"])
16+
17+
@patch('pathlib.Path.exists')
18+
def test_missing_file_raises_error(self, mock_exists):
19+
"""Should raise FileNotFoundError if path doesn't exist."""
20+
mock_exists.return_value = False
21+
with self.assertRaises(FileNotFoundError):
22+
ScriptConfig(script_path="invalid_path.py")
23+
24+
@patch('pathlib.Path.exists')
25+
def test_invalid_args_type(self, mock_exists):
26+
"""Should raise ValueError if script_args is not a list."""
27+
mock_exists.return_value = True
28+
with self.assertRaises(ValueError) as cm:
29+
ScriptConfig(script_path="script.py", script_args="not a list")
30+
self.assertEqual(str(cm.exception), "script_args must be a list")
31+
32+
@patch('pathlib.Path.exists')
33+
def test_empty_args_list(self, mock_exists):
34+
"""Should raise ValueError if script_args is empty."""
35+
mock_exists.return_value = True
36+
with self.assertRaises(ValueError):
37+
ScriptConfig(script_path="script.py", script_args=[])
38+
39+
40+
if __name__ == '__main__':
41+
unittest.main()
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import subprocess
2+
import unittest
3+
from unittest.mock import MagicMock, patch
4+
5+
from drone_sim_runner.core.process_type import ProcessType
6+
from drone_sim_runner.core.simulation_process import SimulationProcess
7+
8+
9+
class TestSimulationProcess(unittest.TestCase):
10+
11+
def setUp(self):
12+
self.cmd = ["python3", "sim.py"]
13+
self.sim = SimulationProcess(
14+
cmd=self.cmd,
15+
name=ProcessType.SCRIPT,
16+
init_indicator="READY",
17+
wait_time=0,
18+
timeout=5
19+
)
20+
21+
@patch('subprocess.Popen')
22+
@patch('threading.Thread')
23+
@patch('time.sleep')
24+
def test_start_script_process(self, mock_sleep, mock_thread, mock_popen):
25+
"""Should start the process and launch output thread for SCRIPTS."""
26+
mock_popen.return_value = MagicMock(spec=subprocess.Popen)
27+
28+
self.sim.start()
29+
30+
self.assertTrue(self.sim.is_running)
31+
mock_popen.assert_called_once()
32+
mock_thread.assert_called_once_with(target=self.sim._log_output, daemon=True)
33+
34+
@patch('subprocess.Popen')
35+
@patch('drone_sim_runner.core.process_manager.ProcessManager.verify_process_running')
36+
@patch('time.sleep')
37+
def test_wait_for_initialization_success(self, mock_sleep, mock_verify, mock_popen):
38+
"""Should continue when init_indicator is found in stdout."""
39+
self.sim.name = ProcessType.SPHINX
40+
41+
mock_proc = MagicMock(spec=subprocess.Popen)
42+
mock_stdout = MagicMock()
43+
mock_stdout.readline.side_effect = [b"Starting...\n", b"System READY\n"]
44+
mock_proc.stdout = mock_stdout
45+
46+
mock_popen.return_value = mock_proc
47+
mock_verify.return_value = True
48+
49+
self.sim.start()
50+
51+
self.assertEqual(mock_stdout.readline.call_count, 2)
52+
53+
@patch('subprocess.Popen')
54+
@patch('time.time')
55+
@patch('drone_sim_runner.core.process_manager.ProcessManager.verify_process_running')
56+
def test_wait_for_initialization_timeout(self, mock_verify, mock_time, mock_popen):
57+
"""Should raise TimeoutError if indicator never appears."""
58+
self.sim.name = ProcessType.FIRMWARE
59+
mock_proc = MagicMock(spec=subprocess.Popen)
60+
61+
mock_stdout = MagicMock()
62+
mock_stdout.readline.return_value = b"just some noise\n"
63+
mock_proc.stdout = mock_stdout
64+
65+
mock_popen.return_value = mock_proc
66+
mock_verify.return_value = True
67+
68+
mock_time.side_effect = [0, 10]
69+
70+
with self.assertRaises(TimeoutError):
71+
self.sim._wait_for_initialization()
72+
73+
@patch('drone_sim_runner.core.process_manager.ProcessManager.kill_process_tree')
74+
def test_stop_process(self, mock_kill):
75+
"""Should kill the process tree and stop running."""
76+
mock_proc = MagicMock()
77+
mock_proc.pid = 1234
78+
self.sim.process = mock_proc
79+
self.sim.is_running = True
80+
81+
self.sim.stop()
82+
83+
self.assertFalse(self.sim.is_running)
84+
self.assertIsNone(self.sim.process)
85+
mock_kill.assert_called_once_with(1234)
86+
87+
@patch('subprocess.Popen')
88+
def test_start_failure_raises_runtime_error(self, mock_popen):
89+
"""Should wrap exceptions in a RuntimeError."""
90+
mock_popen.side_effect = OSError("Binary not found")
91+
92+
with self.assertRaises(RuntimeError) as cm:
93+
self.sim.start()
94+
self.assertIn("Failed to start 'Script'", str(cm.exception))
95+
96+
97+
if __name__ == '__main__':
98+
unittest.main()

tests/sphx/__init__.py

Whitespace-only changes.

tests/sphx/test_sim_config.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import unittest
2+
from pathlib import Path
3+
from unittest.mock import patch, mock_open
4+
5+
from drone_sim_runner.sphx.sim_config import SimulationConfig, PathConfig
6+
7+
8+
class TestSimulationConfig(unittest.TestCase):
9+
def test_valid_init(self):
10+
"""Should initialize correctly with valid types."""
11+
config = SimulationConfig(firmware_command="run_fw", sphinx_command="/bin/sphinx")
12+
self.assertEqual(config.sphinx_cmd, "/bin/sphinx")
13+
14+
def test_invalid_types(self):
15+
"""Should raise ValueError for incorrect types."""
16+
with self.assertRaises(ValueError):
17+
SimulationConfig(firmware_command=123, sphinx_command="path")
18+
with self.assertRaises(ValueError):
19+
SimulationConfig(firmware_command="cmd", sphinx_command=123)
20+
21+
def test_negative_waits(self):
22+
"""Should raise ValueError if any time-based attribute is negative."""
23+
with self.assertRaises(ValueError):
24+
SimulationConfig(firmware_command="cmd", sphinx_command="cmd", sphinx_init_wait=-1)
25+
26+
def test_negative_values_raise_error(self):
27+
"""Post-init should catch negative timeouts."""
28+
with self.assertRaises(ValueError):
29+
SimulationConfig(
30+
firmware_command="run",
31+
sphinx_command="path",
32+
timeout_seconds=-10
33+
)
34+
35+
36+
class TestPathConfig(unittest.TestCase):
37+
38+
def setUp(self):
39+
self.yaml_data = """
40+
paths:
41+
sphinx:
42+
command: "sphinx-binary"
43+
level: "low"
44+
quality: "high"
45+
firmware:
46+
base_command: "launch"
47+
drone_path: "/drones/anafi"
48+
firmware_url: "http://fw.local"
49+
pose: "0,0,0"
50+
scripts:
51+
controller: "main.py"
52+
save_dir_name: "test_run"
53+
args:
54+
--out: "${output_dir}"
55+
"""
56+
57+
@patch('pathlib.Path.exists', return_value=True)
58+
@patch('builtins.open', new_callable=mock_open)
59+
def test_from_yaml_parsing(self, mock_file, mock_exists):
60+
"""Verify YAML data is correctly transformed into shell commands."""
61+
mock_file.side_effect = [
62+
mock_open(read_data=self.yaml_data).return_value,
63+
mock_open(read_data="").return_value
64+
]
65+
66+
config = PathConfig.from_yaml(
67+
sphinx_base_dir="/opt/sphinx",
68+
config_path="config.yaml",
69+
project_base_dir="/home/user/project",
70+
output_dir="/tmp/results"
71+
)
72+
73+
# 1. Verify Firmware Command string assembly
74+
expected_fw = 'launch "/drones/anafi::pose=0,0,0::firmware=http://fw.local"'
75+
self.assertEqual(config.firmware_command, expected_fw)
76+
77+
# 2. Verify Sphinx Command string assembly
78+
self.assertIn("/opt/sphinx/sphinx-binary", config.sphinx_command)
79+
self.assertIn("-level=low", config.sphinx_command)
80+
self.assertIn("-quality=high", config.sphinx_command)
81+
82+
# 3. Verify Script Args Replacement
83+
self.assertIn("--out=/tmp/results/test_run", config.script_args)
84+
85+
@patch('tempfile.mkstemp')
86+
@patch('builtins.open', new_callable=mock_open, read_data="${MODELS_DIR}/drone.cpp")
87+
@patch('pathlib.Path.exists', return_value=True)
88+
def test_prepare_temp_config_regex(self, mock_exists, mock_file, mock_temp):
89+
"""Verify that ${MODELS_DIR} is correctly replaced in config files."""
90+
mock_temp.return_value = (None, "/tmp/sphinx_config_123.yaml")
91+
92+
# original_path: /home/user/project/config/file.yaml
93+
# bunker_dir (parent.parent): /home/user/project
94+
# models_dir: /home/user/project/models
95+
original_path = Path("/home/user/project/config/file.yaml")
96+
97+
temp_file = PathConfig._prepare_temp_config(original_path)
98+
99+
write_handle = mock_file.return_value
100+
written_content = "".join(call.args[0] for call in write_handle.write.call_args_list)
101+
102+
self.assertEqual(temp_file, Path("/tmp/sphinx_config_123.yaml"))
103+
self.assertIn("/home/user/project/models", written_content)
104+
self.assertNotIn("${MODELS_DIR}", written_content)
105+
106+
107+
if __name__ == '__main__':
108+
unittest.main()

0 commit comments

Comments
 (0)