|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +""" |
| 4 | +Tests for utils/net.py |
| 5 | +""" |
| 6 | + |
| 7 | +import os |
| 8 | +import signal |
| 9 | +import socket |
| 10 | +import sys |
| 11 | +import unittest |
| 12 | +from unittest.mock import MagicMock, patch |
| 13 | + |
| 14 | +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| 15 | + |
| 16 | +from utils.net import ( # noqa: E402 |
| 17 | + check_and_free_port, |
| 18 | + get_port_owner, |
| 19 | + is_port_available, |
| 20 | + kill_port_owner, |
| 21 | +) |
| 22 | + |
| 23 | + |
| 24 | +class TestIsPortAvailable(unittest.TestCase): |
| 25 | + """Tests for is_port_available.""" |
| 26 | + |
| 27 | + def test_available_port(self): |
| 28 | + """An unused port should be available.""" |
| 29 | + # Use a random high port unlikely to be in use |
| 30 | + self.assertTrue(is_port_available(59123)) |
| 31 | + |
| 32 | + def test_occupied_port(self): |
| 33 | + """A port with a listener should not be available.""" |
| 34 | + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 35 | + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 36 | + s.bind(("127.0.0.1", 0)) |
| 37 | + s.listen(1) |
| 38 | + port = s.getsockname()[1] |
| 39 | + try: |
| 40 | + self.assertFalse(is_port_available(port)) |
| 41 | + finally: |
| 42 | + s.close() |
| 43 | + |
| 44 | + def test_port_after_close(self): |
| 45 | + """Port should be available after listener closes.""" |
| 46 | + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 47 | + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 48 | + s.bind(("127.0.0.1", 0)) |
| 49 | + s.listen(1) |
| 50 | + port = s.getsockname()[1] |
| 51 | + s.close() |
| 52 | + self.assertTrue(is_port_available(port)) |
| 53 | + |
| 54 | + |
| 55 | +class TestGetPortOwner(unittest.TestCase): |
| 56 | + """Tests for get_port_owner.""" |
| 57 | + |
| 58 | + def test_no_listener(self): |
| 59 | + """Should return None for a port with no listener.""" |
| 60 | + result = get_port_owner(59124) |
| 61 | + self.assertIsNone(result) |
| 62 | + |
| 63 | + def test_own_process(self): |
| 64 | + """Should find our own process as the owner.""" |
| 65 | + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 66 | + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 67 | + s.bind(("127.0.0.1", 0)) |
| 68 | + s.listen(1) |
| 69 | + port = s.getsockname()[1] |
| 70 | + try: |
| 71 | + owner = get_port_owner(port) |
| 72 | + if owner: # May fail in some CI environments |
| 73 | + self.assertEqual(owner["pid"], os.getpid()) |
| 74 | + self.assertIn("pid", owner) |
| 75 | + self.assertIn("name", owner) |
| 76 | + finally: |
| 77 | + s.close() |
| 78 | + |
| 79 | + |
| 80 | +class TestKillPortOwner(unittest.TestCase): |
| 81 | + """Tests for kill_port_owner.""" |
| 82 | + |
| 83 | + def test_no_owner(self): |
| 84 | + """Should return False when no process owns the port.""" |
| 85 | + self.assertFalse(kill_port_owner(59125)) |
| 86 | + |
| 87 | + def test_skip_own_pid(self): |
| 88 | + """Should not kill our own process.""" |
| 89 | + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 90 | + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 91 | + s.bind(("127.0.0.1", 0)) |
| 92 | + s.listen(1) |
| 93 | + port = s.getsockname()[1] |
| 94 | + try: |
| 95 | + # kill_port_owner should skip our own PID |
| 96 | + result = kill_port_owner(port) |
| 97 | + self.assertFalse(result) |
| 98 | + finally: |
| 99 | + s.close() |
| 100 | + |
| 101 | + @patch("utils.net.get_port_owner") |
| 102 | + @patch("utils.net.os.kill") |
| 103 | + def test_kill_stale_process(self, mock_kill, mock_owner): |
| 104 | + """Should kill a stale process and return True.""" |
| 105 | + mock_owner.return_value = { |
| 106 | + "pid": 99999, |
| 107 | + "name": "python", |
| 108 | + "cmdline": "python main.py", |
| 109 | + } |
| 110 | + # Simulate process dying after SIGTERM |
| 111 | + mock_kill.side_effect = [None, OSError("No such process")] |
| 112 | + |
| 113 | + result = kill_port_owner(12345, timeout=0.5) |
| 114 | + self.assertTrue(result) |
| 115 | + mock_kill.assert_any_call(99999, signal.SIGTERM) |
| 116 | + |
| 117 | + @patch("utils.net.get_port_owner") |
| 118 | + @patch("utils.net.os.kill") |
| 119 | + def test_kill_fails(self, mock_kill, mock_owner): |
| 120 | + """Should return False when kill raises OSError.""" |
| 121 | + mock_owner.return_value = {"pid": 99999, "name": "x", "cmdline": "x"} |
| 122 | + mock_kill.side_effect = OSError("Operation not permitted") |
| 123 | + |
| 124 | + result = kill_port_owner(12345) |
| 125 | + self.assertFalse(result) |
| 126 | + |
| 127 | + |
| 128 | +class TestCheckAndFreePort(unittest.TestCase): |
| 129 | + """Tests for check_and_free_port.""" |
| 130 | + |
| 131 | + def test_available_port(self): |
| 132 | + """Should return True immediately for a free port.""" |
| 133 | + self.assertTrue(check_and_free_port(59126)) |
| 134 | + |
| 135 | + @patch("utils.net.kill_port_owner", return_value=True) |
| 136 | + @patch("utils.net.is_port_available", return_value=False) |
| 137 | + def test_occupied_then_freed(self, mock_avail, mock_kill): |
| 138 | + """Should try to kill and return True on success.""" |
| 139 | + self.assertTrue(check_and_free_port(12345)) |
| 140 | + mock_kill.assert_called_once_with(12345) |
| 141 | + |
| 142 | + @patch("utils.net.kill_port_owner", return_value=False) |
| 143 | + @patch("utils.net.is_port_available", return_value=False) |
| 144 | + def test_occupied_kill_fails(self, mock_avail, mock_kill): |
| 145 | + """Should return False when kill fails.""" |
| 146 | + self.assertFalse(check_and_free_port(12345)) |
| 147 | + |
| 148 | + |
| 149 | +class TestGDBPortConflict(unittest.TestCase): |
| 150 | + """Integration test: GDB server port conflict detection.""" |
| 151 | + |
| 152 | + @patch("utils.net.get_port_owner") |
| 153 | + def test_gdb_manager_uses_check_and_free_port(self, mock_owner): |
| 154 | + """start_external_gdb_server should call check_and_free_port.""" |
| 155 | + |
| 156 | + mock_owner.return_value = None |
| 157 | + |
| 158 | + with patch( |
| 159 | + "core.gdb_manager.check_and_free_port", return_value=True |
| 160 | + ) as mock_check: |
| 161 | + with patch("core.gdb_manager.GDBRSPBridge") as mock_bridge_cls: |
| 162 | + mock_bridge = MagicMock() |
| 163 | + mock_bridge.start.return_value = 3333 |
| 164 | + mock_bridge.is_running = False |
| 165 | + mock_bridge_cls.return_value = mock_bridge |
| 166 | + |
| 167 | + from core.gdb_manager import start_external_gdb_server |
| 168 | + |
| 169 | + state = MagicMock() |
| 170 | + state.device.external_gdb_port = 3333 |
| 171 | + state.device.elf_path = None |
| 172 | + state.device.download_chunk_size = 1024 |
| 173 | + state.external_gdb_bridge = None |
| 174 | + |
| 175 | + result = start_external_gdb_server(state) |
| 176 | + self.assertTrue(result) |
| 177 | + mock_check.assert_called_once_with(3333) |
| 178 | + |
| 179 | + def test_gdb_manager_rejects_occupied_port(self): |
| 180 | + """start_external_gdb_server should fail if port can't be freed.""" |
| 181 | + with patch("core.gdb_manager.check_and_free_port", return_value=False): |
| 182 | + from core.gdb_manager import start_external_gdb_server |
| 183 | + |
| 184 | + state = MagicMock() |
| 185 | + state.device.external_gdb_port = 3333 |
| 186 | + state.external_gdb_bridge = None |
| 187 | + |
| 188 | + result = start_external_gdb_server(state) |
| 189 | + self.assertFalse(result) |
| 190 | + |
| 191 | + |
| 192 | +if __name__ == "__main__": |
| 193 | + unittest.main() |
0 commit comments