Skip to content

Commit 1d3cf2e

Browse files
authored
feat(web): implement websocket handler and discover command routing (#106)
* feat(web): Implement WebSocket client lifecyclet handler, command routing, and runtime stream discovery. * ruff edits * adding docstring for message handler * hardening `_handle_message` input validation and rename `bc` param
1 parent d58e40a commit 1d3cf2e

2 files changed

Lines changed: 304 additions & 0 deletions

File tree

src/MoBI_View/web/server.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""WebSocket server for MoBI-View real-time data streaming.
2+
3+
This module provides the ws_handler coroutine that manages WebSocket
4+
client connections and handles incoming discover messages.
5+
"""
6+
7+
import json
8+
import logging
9+
10+
from websockets.asyncio import server
11+
12+
from MoBI_View.core import discovery
13+
from MoBI_View.presenters import main_app_presenter
14+
from MoBI_View.web import broadcaster
15+
16+
logger = logging.getLogger("MoBI-View.web.server")
17+
18+
19+
async def ws_handler(
20+
websocket: server.ServerConnection,
21+
active_broadcaster: broadcaster.Broadcaster,
22+
presenter: main_app_presenter.MainAppPresenter,
23+
) -> None:
24+
"""Handle a WebSocket client connection.
25+
26+
Registers the client with the broadcaster, listens for incoming
27+
messages, and removes the client on disconnect.
28+
29+
Args:
30+
websocket: The WebSocket connection.
31+
active_broadcaster: The Broadcaster instance managing clients.
32+
presenter: The MainAppPresenter providing data and inlets.
33+
"""
34+
active_broadcaster.add_client(websocket)
35+
try:
36+
async for raw_message in websocket:
37+
await _handle_message(raw_message, websocket, presenter)
38+
finally:
39+
active_broadcaster.remove_client(websocket)
40+
41+
42+
async def _handle_message(
43+
raw_message: str | bytes,
44+
websocket: server.ServerConnection,
45+
presenter: main_app_presenter.MainAppPresenter,
46+
) -> None:
47+
"""Parse and dispatch a single client message.
48+
49+
Currently supports only the `discover` command. As the number of
50+
supported commands grows, this function should be refactored to use a
51+
dispatch table (e.g. `handlers = {"discover": _handle_discover}`)
52+
instead of explicit `if/else` branching.
53+
54+
Args:
55+
raw_message: The raw message received from the WebSocket.
56+
websocket: The WebSocket connection for sending responses.
57+
presenter: The MainAppPresenter for stream management.
58+
"""
59+
try:
60+
data = json.loads(raw_message)
61+
except (json.JSONDecodeError, TypeError, UnicodeDecodeError, RecursionError):
62+
logger.warning("Received invalid JSON")
63+
return
64+
65+
if not isinstance(data, dict):
66+
logger.warning("Expected JSON object, got %s", type(data).__name__)
67+
return
68+
69+
command = data.get("command")
70+
if command == "discover":
71+
await _handle_discover(websocket, presenter)
72+
else:
73+
logger.warning("Unknown command: %s", command)
74+
75+
76+
async def _handle_discover(
77+
websocket: server.ServerConnection,
78+
presenter: main_app_presenter.MainAppPresenter,
79+
) -> None:
80+
"""Run stream discovery and send results to the requesting client.
81+
82+
Args:
83+
websocket: The WebSocket connection to send the result to.
84+
presenter: The MainAppPresenter managing data inlets.
85+
"""
86+
new_inlets = discovery.discover_and_create_inlets(
87+
existing_inlets=presenter.data_inlets,
88+
)
89+
presenter.data_inlets.extend(new_inlets)
90+
stream_names = [inlet.stream_name for inlet in new_inlets]
91+
response = json.dumps(
92+
{
93+
"type": "discover_result",
94+
"streams": stream_names,
95+
}
96+
)
97+
await websocket.send(response)
98+
logger.info("Discover: found %d new stream(s)", len(new_inlets))

tests/unit/test_server.py

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
"""Unit tests for the WebSocket server module."""
2+
3+
import asyncio
4+
import json
5+
from unittest.mock import AsyncMock, MagicMock, patch
6+
7+
import pytest
8+
9+
from MoBI_View.presenters import main_app_presenter
10+
from MoBI_View.web import broadcaster, server
11+
12+
13+
@pytest.fixture
14+
def mock_presenter() -> MagicMock:
15+
"""Creates a mock MainAppPresenter."""
16+
mock = MagicMock(spec=main_app_presenter.MainAppPresenter)
17+
mock.poll_data.return_value = []
18+
mock.data_inlets = []
19+
return mock
20+
21+
22+
@pytest.fixture
23+
def mock_broadcaster(mock_presenter: MagicMock) -> MagicMock:
24+
"""Creates a mock Broadcaster."""
25+
mock = MagicMock(spec=broadcaster.Broadcaster)
26+
mock.presenter = mock_presenter
27+
return mock
28+
29+
30+
@pytest.fixture
31+
def mock_websocket() -> AsyncMock:
32+
"""Creates a mock ServerConnection."""
33+
return AsyncMock()
34+
35+
36+
def test_ws_handler_registers_and_unregisters_client(
37+
mock_websocket: AsyncMock,
38+
mock_broadcaster: MagicMock,
39+
mock_presenter: MagicMock,
40+
) -> None:
41+
"""Tests ws_handler adds client on connect and removes on disconnect."""
42+
mock_websocket.__aiter__.return_value = iter([])
43+
44+
asyncio.run(server.ws_handler(mock_websocket, mock_broadcaster, mock_presenter))
45+
46+
mock_broadcaster.add_client.assert_called_once_with(mock_websocket)
47+
mock_broadcaster.remove_client.assert_called_once_with(mock_websocket)
48+
49+
50+
def test_ws_handler_removes_client_on_exception(
51+
mock_websocket: AsyncMock,
52+
mock_broadcaster: MagicMock,
53+
mock_presenter: MagicMock,
54+
) -> None:
55+
"""Tests ws_handler removes client even when iteration raises."""
56+
mock_websocket.__aiter__.side_effect = RuntimeError("connection lost")
57+
58+
with pytest.raises(RuntimeError, match="connection lost"):
59+
asyncio.run(server.ws_handler(mock_websocket, mock_broadcaster, mock_presenter))
60+
61+
mock_broadcaster.remove_client.assert_called_once_with(mock_websocket)
62+
63+
64+
def test_ws_handler_dispatches_discover_command(
65+
mock_websocket: AsyncMock,
66+
mock_broadcaster: MagicMock,
67+
mock_presenter: MagicMock,
68+
) -> None:
69+
"""Tests ws_handler forwards a discover command to _handle_discover."""
70+
message = json.dumps({"command": "discover"})
71+
mock_websocket.__aiter__.return_value = iter([message])
72+
73+
with patch.object(server, "_handle_discover", new_callable=AsyncMock) as mock_hd:
74+
asyncio.run(server.ws_handler(mock_websocket, mock_broadcaster, mock_presenter))
75+
76+
mock_hd.assert_awaited_once_with(mock_websocket, mock_presenter)
77+
78+
79+
def test_handle_message_ignores_invalid_json(
80+
mock_websocket: AsyncMock,
81+
mock_presenter: MagicMock,
82+
caplog: pytest.LogCaptureFixture,
83+
) -> None:
84+
"""Tests _handle_message logs warning for non-JSON payload."""
85+
asyncio.run(server._handle_message("not json", mock_websocket, mock_presenter))
86+
87+
assert "invalid JSON" in caplog.text
88+
89+
90+
def test_handle_message_rejects_non_string_input(
91+
mock_websocket: AsyncMock,
92+
mock_presenter: MagicMock,
93+
caplog: pytest.LogCaptureFixture,
94+
) -> None:
95+
"""Tests _handle_message logs warning when input is not str or bytes."""
96+
asyncio.run(
97+
server._handle_message(None, mock_websocket, mock_presenter) # type: ignore[arg-type]
98+
)
99+
100+
assert "invalid JSON" in caplog.text
101+
102+
103+
def test_handle_message_rejects_invalid_bytes(
104+
mock_websocket: AsyncMock,
105+
mock_presenter: MagicMock,
106+
caplog: pytest.LogCaptureFixture,
107+
) -> None:
108+
"""Tests _handle_message logs warning for bytes with invalid encoding."""
109+
asyncio.run(server._handle_message(b"\xff\xfe", mock_websocket, mock_presenter))
110+
111+
assert "invalid JSON" in caplog.text
112+
113+
114+
def test_handle_message_rejects_deeply_nested_json(
115+
mock_websocket: AsyncMock,
116+
mock_presenter: MagicMock,
117+
caplog: pytest.LogCaptureFixture,
118+
) -> None:
119+
"""Tests _handle_message logs warning for deeply nested JSON."""
120+
deeply_nested = "[" * 10000 + "]" * 10000
121+
122+
asyncio.run(server._handle_message(deeply_nested, mock_websocket, mock_presenter))
123+
124+
assert "invalid JSON" in caplog.text
125+
126+
127+
def test_handle_message_rejects_non_dict_json(
128+
mock_websocket: AsyncMock,
129+
mock_presenter: MagicMock,
130+
caplog: pytest.LogCaptureFixture,
131+
) -> None:
132+
"""Tests _handle_message logs warning when JSON is not an object."""
133+
asyncio.run(
134+
server._handle_message('"just a string"', mock_websocket, mock_presenter)
135+
)
136+
137+
assert "Expected JSON object" in caplog.text
138+
139+
140+
def test_handle_message_logs_unknown_command(
141+
mock_websocket: AsyncMock,
142+
mock_presenter: MagicMock,
143+
caplog: pytest.LogCaptureFixture,
144+
) -> None:
145+
"""Tests _handle_message logs warning for unrecognised command."""
146+
msg = json.dumps({"command": "foobar"})
147+
148+
asyncio.run(server._handle_message(msg, mock_websocket, mock_presenter))
149+
150+
assert "Unknown command: foobar" in caplog.text
151+
152+
153+
def test_handle_message_routes_discover_command(
154+
mock_websocket: AsyncMock,
155+
mock_presenter: MagicMock,
156+
) -> None:
157+
"""Tests _handle_message calls _handle_discover for discover command."""
158+
msg = json.dumps({"command": "discover"})
159+
160+
with patch.object(server, "_handle_discover", new_callable=AsyncMock) as mock_hd:
161+
asyncio.run(server._handle_message(msg, mock_websocket, mock_presenter))
162+
163+
mock_hd.assert_awaited_once_with(mock_websocket, mock_presenter)
164+
165+
166+
def test_handle_discover_calls_discovery_and_sends_result(
167+
mock_websocket: AsyncMock,
168+
mock_presenter: MagicMock,
169+
caplog: pytest.LogCaptureFixture,
170+
) -> None:
171+
"""Tests _handle_discover discovers new streams and sends them to client."""
172+
fake_inlet = MagicMock()
173+
fake_inlet.stream_name = "EEG"
174+
175+
with (
176+
patch.object(
177+
server.discovery,
178+
"discover_and_create_inlets",
179+
return_value=[fake_inlet],
180+
),
181+
caplog.at_level("INFO"),
182+
):
183+
asyncio.run(server._handle_discover(mock_websocket, mock_presenter))
184+
185+
mock_websocket.send.assert_awaited_once()
186+
sent = json.loads(mock_websocket.send.call_args[0][0])
187+
assert sent["type"] == "discover_result"
188+
assert sent["streams"] == ["EEG"]
189+
assert fake_inlet in mock_presenter.data_inlets
190+
assert "found 1 new stream(s)" in caplog.text
191+
192+
193+
def test_handle_discover_with_no_new_streams(
194+
mock_websocket: AsyncMock,
195+
mock_presenter: MagicMock,
196+
) -> None:
197+
"""Tests _handle_discover sends empty list when no new streams found."""
198+
with patch.object(
199+
server.discovery,
200+
"discover_and_create_inlets",
201+
return_value=[],
202+
):
203+
asyncio.run(server._handle_discover(mock_websocket, mock_presenter))
204+
205+
sent = json.loads(mock_websocket.send.call_args[0][0])
206+
assert sent["streams"] == []

0 commit comments

Comments
 (0)