Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/MoBI_View/web/broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def start(self) -> None:
return

self._running = True
self._thread = threading.Thread(target=lambda: None, daemon=True)
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
logger.info("Broadcaster started")

Expand Down
87 changes: 85 additions & 2 deletions src/MoBI_View/web/server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""WebSocket server for MoBI-View real-time data streaming.

This module provides the ws_handler coroutine that manages WebSocket
client connections and handles incoming discover messages.
This module provides WebSocket connection handling and static file serving
via the websockets process_request hook.
"""

import json
import logging
import mimetypes
import pathlib
from urllib import parse

from websockets import datastructures, http11
from websockets.asyncio import server

from MoBI_View.core import discovery
Expand All @@ -15,6 +19,8 @@

logger = logging.getLogger("MoBI-View.web.server")

STATIC_DIR = pathlib.Path(__file__).resolve().parent / "static"


async def ws_handler(
websocket: server.ServerConnection,
Expand Down Expand Up @@ -96,3 +102,80 @@ async def _handle_discover(
)
await websocket.send(response)
logger.info("Discover: found %d new stream(s)", len(new_inlets))


async def process_request(
connection: server.ServerConnection,
request: http11.Request,
) -> http11.Response | None:
"""Serve static files or pass through to WebSocket upgrade.

Intercepts HTTP requests before the WebSocket handshake. Requests with
an ``Upgrade: websocket`` header proceed to ws_handler. All other
requests are resolved against the static directory.

Args:
connection: The server connection being handled.
request: The incoming HTTP request.

Returns:
A Response for static file requests, or None for WebSocket upgrades.
"""
if _is_websocket_upgrade(request):
return None
return _serve_static_file(request.path)


def _is_websocket_upgrade(request: http11.Request) -> bool:
"""Check whether the request is a WebSocket upgrade."""
return request.headers.get("Upgrade", "").lower() == "websocket"


def _serve_static_file(request_path: str) -> http11.Response:
"""Resolve a URL path to a file in the static directory.

Decodes percent-encoded characters, normalises the path, and verifies
the result stays within STATIC_DIR before reading.

Two safety checks are applied in order:
1. Path containment - if the resolved path escapes STATIC_DIR
(e.g. via ``../`` traversal), returns HTTP 403 Forbidden.
2. File existence - if the path is inside STATIC_DIR but does
not point to an existing file, returns HTTP 404 Not Found.

Args:
request_path: The URL path from the HTTP request.

Returns:
An HTTP 200 response with file contents on success,
HTTP 403 Forbidden if the path escapes the static directory,
or HTTP 404 Not Found if the file does not exist.
"""
decoded = parse.unquote(request_path)
if decoded in ("", "/"):
decoded = "/index.html"
relative = decoded.lstrip("/")
resolved = (STATIC_DIR / relative).resolve()
if not _is_within_static_dir(resolved):
return _error_response(403, "Forbidden")
if not resolved.is_file():
return _error_response(404, "Not Found")
content_type = mimetypes.guess_type(str(resolved))[0] or "application/octet-stream"
Comment thread
kimit0310 marked this conversation as resolved.
body = resolved.read_bytes()
headers = datastructures.Headers([("Content-Type", content_type)])
return http11.Response(200, "OK", headers, body)


def _is_within_static_dir(resolved_path: pathlib.Path) -> bool:
"""Verify the resolved path is within the static directory."""
try:
resolved_path.relative_to(STATIC_DIR)
return True
except ValueError:
return False


def _error_response(status_code: int, reason: str) -> http11.Response:
"""Build a plain-text HTTP error response."""
headers = datastructures.Headers([("Content-Type", "text/plain")])
return http11.Response(status_code, reason, headers, reason.encode())
137 changes: 137 additions & 0 deletions tests/unit/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import asyncio
import json
import pathlib
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from websockets import datastructures

from MoBI_View.presenters import main_app_presenter
from MoBI_View.web import broadcaster, server
Expand Down Expand Up @@ -204,3 +206,138 @@ def test_handle_discover_with_no_new_streams(

sent = json.loads(mock_websocket.send.call_args[0][0])
assert sent["streams"] == []


@pytest.fixture
def static_dir(tmp_path: pathlib.Path) -> pathlib.Path:
"""Creates a temporary static directory mimicking SvelteKit build output."""
(tmp_path / "index.html").write_text("<html></html>")
Comment thread
kimit0310 marked this conversation as resolved.
(tmp_path / "style.css").write_text("body {}")
app_dir = tmp_path / "_app" / "immutable"
app_dir.mkdir(parents=True)
(app_dir / "entry.js").write_text("console.log('ok')")
return tmp_path


def _make_request(path: str, upgrade: str = "") -> MagicMock:
"""Builds a minimal Request-like object with path and headers."""
header_list = []
if upgrade:
header_list.append(("Upgrade", upgrade))
request = MagicMock()
request.path = path
request.headers = datastructures.Headers(header_list)
return request


def test_process_request_passes_websocket_upgrades() -> None:
"""Tests process_request returns None for WebSocket upgrades."""
request = _make_request("/", upgrade="websocket")
connection = MagicMock()

result = asyncio.run(server.process_request(connection, request))

assert result is None


def test_process_request_serves_static_file() -> None:
"""Tests process_request delegates to _serve_static_file for HTTP requests."""
request = _make_request("/style.css")
connection = MagicMock()
fake_response = MagicMock()

with patch.object(
server, "_serve_static_file", return_value=fake_response
) as mock_serve:
result = asyncio.run(server.process_request(connection, request))

mock_serve.assert_called_once_with("/style.css")
assert result is fake_response


@pytest.mark.parametrize("path", ["/", ""])
def test_serve_static_file_returns_index_for_root_paths(
path: str,
static_dir: pathlib.Path,
) -> None:
"""Tests root and empty paths both resolve to index.html."""
with patch.object(server, "STATIC_DIR", static_dir):
result = server._serve_static_file(path)

assert result.status_code == 200
assert b"<html></html>" in result.body


def test_serve_static_file_returns_nested_file(
static_dir: pathlib.Path,
) -> None:
"""Tests subdirectory files are served correctly."""
with patch.object(server, "STATIC_DIR", static_dir):
result = server._serve_static_file("/_app/immutable/entry.js")

assert result.status_code == 200
assert b"console.log" in result.body


def test_serve_static_file_returns_correct_content_type(
static_dir: pathlib.Path,
) -> None:
"""Tests Content-Type header matches the file extension."""
with patch.object(server, "STATIC_DIR", static_dir):
result = server._serve_static_file("/style.css")

assert result.status_code == 200
assert "css" in result.headers.get("Content-Type", "")


def test_serve_static_file_returns_404_for_missing_file(
static_dir: pathlib.Path,
) -> None:
"""Tests missing files return a 404 response."""
with patch.object(server, "STATIC_DIR", static_dir):
result = server._serve_static_file("/nope.txt")

assert result.status_code == 404


@pytest.mark.parametrize(
"malicious_path",
[
"/../../../etc/passwd",
"/%2e%2e/%2e%2e/etc/passwd",
"/..%2F..%2Fetc/passwd",
],
)
def test_serve_static_file_blocks_path_traversal(
malicious_path: str, static_dir: pathlib.Path
) -> None:
"""Tests path traversal attempts return a 403 response."""
with patch.object(server, "STATIC_DIR", static_dir):
result = server._serve_static_file(malicious_path)

assert result.status_code == 403


@pytest.mark.parametrize(
"resolved,expected",
[
("inside", True),
("outside", False),
],
)
def test_is_within_static_dir(
resolved: str,
expected: bool,
tmp_path: pathlib.Path,
) -> None:
"""Tests path containment check against the static directory."""
static = tmp_path / "static"
static.mkdir()

if resolved == "inside":
target = static / "file.html"
else:
target = tmp_path / "file.html"

with patch.object(server, "STATIC_DIR", static):
assert server._is_within_static_dir(target) is expected
Loading