diff --git a/.github/actions/setup-venv/action.yaml b/.github/actions/setup-venv/action.yaml index 60d822c..f216eab 100644 --- a/.github/actions/setup-venv/action.yaml +++ b/.github/actions/setup-venv/action.yaml @@ -48,50 +48,95 @@ runs: path: | ~/Library/Caches/Homebrew /opt/homebrew/Cellar/lsl - /opt/homebrew/lib/liblsl.dylib + /opt/homebrew/Frameworks/LSL.framework + /usr/local/Cellar/lsl + /usr/local/Frameworks/LSL.framework key: brew-${{ runner.os }}-${{ hashFiles('.github/actions/setup-venv/action.yaml') }} - name: Install LSL (macOS) if: runner.os == 'macOS' && inputs.install-lsl == 'true' shell: bash run: | - # Check if LSL is already installed and working - if ! brew list lsl &>/dev/null || ! ls /opt/homebrew/lib/liblsl.dylib &>/dev/null; then + BREW_PREFIX=$(brew --prefix) + + # Check if LSL is already installed. + # Homebrew's lsl formula may install as a framework (LSL.framework) rather + # than a standalone liblsl.dylib. + if ! brew list lsl &>/dev/null; then echo "Installing LSL..." brew install labstreaminglayer/tap/lsl else echo "LSL already installed and library found" fi - # Verify installation - if ls /opt/homebrew/lib/liblsl.dylib &>/dev/null; then - echo "LSL library confirmed at: /opt/homebrew/lib/liblsl.dylib" - # Set environment variable for LSL - echo "PYLSL_LIB=/opt/homebrew/lib/liblsl.dylib" >> $GITHUB_ENV - echo "DYLD_LIBRARY_PATH=/opt/homebrew/lib:$DYLD_LIBRARY_PATH" >> $GITHUB_ENV + LSL_LIB="" + if [[ -f "${BREW_PREFIX}/Frameworks/LSL.framework/LSL" ]]; then + LSL_LIB="${BREW_PREFIX}/Frameworks/LSL.framework/LSL" + elif [[ -f "${BREW_PREFIX}/Frameworks/LSL.framework/Versions/Current/LSL" ]]; then + LSL_LIB="${BREW_PREFIX}/Frameworks/LSL.framework/Versions/Current/LSL" + elif [[ -f "${BREW_PREFIX}/lib/liblsl.dylib" ]]; then + LSL_LIB="${BREW_PREFIX}/lib/liblsl.dylib" + fi + + if [[ -z "$LSL_LIB" ]]; then + LSL_LIB=$(find "${BREW_PREFIX}/Cellar" -path "*/Frameworks/LSL.framework/LSL" -type f 2>/dev/null | head -n 1) + fi + + if [[ -z "$LSL_LIB" ]]; then + LSL_LIB=$(find "${BREW_PREFIX}" -name "liblsl.dylib" -type f 2>/dev/null | head -n 1) + fi + + if [[ -n "$LSL_LIB" ]]; then + echo "LSL library found at: $LSL_LIB" + echo "PYLSL_LIB=$LSL_LIB" >> $GITHUB_ENV else - echo "Warning: LSL library not found after installation" - find /opt/homebrew -name "liblsl.*" -type f 2>/dev/null || true + echo "ERROR: LSL library not found after installation" + find "${BREW_PREFIX}" -name "liblsl.*" -type f 2>/dev/null || true + find "${BREW_PREFIX}" -name "LSL.framework" -type d 2>/dev/null || true + exit 1 fi - - name: Install system dependencies (Windows) - if: runner.os == 'Windows' && inputs.install-lsl == 'true' - uses: jwlawson/actions-setup-cmake@v1 - with: - cmake-version: '3.25.0' - # Install LSL if needed - name: Install LSL on Windows if: inputs.install-lsl == 'true' && runner.os == 'Windows' shell: pwsh run: | - git clone --depth=1 https://github.com/sccn/liblsl.git - cd liblsl - mkdir build - cd build - cmake .. - cmake --build . --config Release - cmake --install . --config Release + $ErrorActionPreference = 'Stop' + + # Download a prebuilt liblsl release to avoid compiling from source. + $headers = @{ + 'User-Agent' = 'github-actions' + } + $release = Invoke-RestMethod -Uri 'https://api.github.com/repos/sccn/liblsl/releases/latest' -Headers $headers + + # GitHub-hosted Windows runners are x64/amd64. liblsl release assets are + # named like: liblsl--Win_amd64.zip, liblsl--Win_arm64.zip, etc. + $zipAssets = $release.assets | Where-Object { $_.name -match '\.zip$' } + $asset = $zipAssets | Where-Object { $_.name -match '(?i)Win_amd64' } | Select-Object -First 1 + if (-not $asset) { + $asset = $zipAssets | Where-Object { $_.name -match '(?i)win' -and $_.name -match '(?i)(amd64|x86_64|x64)' } | Select-Object -First 1 + } + if (-not $asset) { + $names = ($release.assets | ForEach-Object { $_.name }) -join ", " + throw "Could not find a Windows amd64 liblsl zip asset in the latest release. Assets: $names" + } + + $destDir = Join-Path $env:RUNNER_TEMP 'liblsl' + New-Item -ItemType Directory -Path $destDir -Force | Out-Null + $zipPath = Join-Path $destDir $asset.name + + Write-Host "Downloading $($asset.browser_download_url)" + Invoke-WebRequest -Uri $asset.browser_download_url -OutFile $zipPath + Expand-Archive -LiteralPath $zipPath -DestinationPath $destDir -Force + + $lslDll = Get-ChildItem -Path $destDir -Recurse -Filter 'lsl.dll' | Select-Object -First 1 + if (-not $lslDll) { + throw 'lsl.dll not found after extracting liblsl release asset.' + } + + Write-Host "LSL library found at: $($lslDll.FullName)" + "PYLSL_LIB=$($lslDll.FullName)" | Out-File -FilePath $env:GITHUB_ENV -Append + "PATH=$($lslDll.Directory.FullName);$env:PATH" | Out-File -FilePath $env:GITHUB_ENV -Append - name: Install LSL on Ubuntu if: inputs.install-lsl == 'true' && runner.os == 'Linux' diff --git a/pyproject.toml b/pyproject.toml index 30188c8..9dd3fe3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,8 @@ readme = "README.md" requires-python = ">=3.10,<3.13" dependencies = [ "pylsl>=1.17.6", - "numpy>=1.26.4,<2.3.0" + "numpy>=1.26.4,<2.3.0", + "websockets>=14.0" ] [project.scripts] diff --git a/src/MoBI_View/web/__init__.py b/src/MoBI_View/web/__init__.py new file mode 100644 index 0000000..2881910 --- /dev/null +++ b/src/MoBI_View/web/__init__.py @@ -0,0 +1 @@ +"""This is the web submodule for MoBI_View.""" diff --git a/src/MoBI_View/web/broadcaster.py b/src/MoBI_View/web/broadcaster.py new file mode 100644 index 0000000..a9baf50 --- /dev/null +++ b/src/MoBI_View/web/broadcaster.py @@ -0,0 +1,191 @@ +"""WebSocket broadcaster for real-time data streaming. + +This module provides the Broadcaster class that reads data from the presenter +and broadcasts it as JSON frames to all connected WebSocket clients. +""" + +import asyncio +import json +import logging +import threading +import time +from typing import Any, Dict, List, Optional, Set + +from websockets.asyncio import server + +from MoBI_View.core import config +from MoBI_View.presenters import main_app_presenter + +logger = logging.getLogger("MoBI-View.web.broadcaster") + + +class Broadcaster: + """Broadcasts real-time data from presenter to WebSocket clients. + + The Broadcaster runs a background thread that continuously polls the presenter + for new data, formats it as JSON, and broadcasts to all connected clients. + + Attributes: + presenter: The MainAppPresenter instance providing data. + clients: Set of connected WebSocket clients. + broadcast_interval: Time between broadcasts in seconds. + """ + + CLIENT_SEND_TIMEOUT: float = 1.0 + + def __init__( + self, + presenter: main_app_presenter.MainAppPresenter, + broadcast_interval: Optional[float] = None, + ) -> None: + """Initializes the Broadcaster with a presenter and interval. + + Args: + presenter: The MainAppPresenter instance to poll for data. + broadcast_interval: Time between broadcasts in seconds. Defaults to + Config.TIMER_INTERVAL converted to seconds. + """ + self.presenter = presenter + self.clients: Set[server.ServerConnection] = set() + self._clients_lock = threading.Lock() + self._running = False + self._thread: Optional[threading.Thread] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + self.broadcast_interval = ( + broadcast_interval or config.Config.TIMER_INTERVAL / 1000 + ) + + def start(self) -> None: + """Starts the broadcast loop in a background thread. + + Creates a new thread running the broadcast loop. If already running, + this method does nothing. + """ + if self._running: + logger.warning("Broadcaster already running") + return + + self._running = True + self._thread = threading.Thread(target=lambda: None, daemon=True) + self._thread.start() + logger.info("Broadcaster started") + + def stop(self) -> None: + """Stops the broadcast loop and waits for thread termination. + + Signals the broadcast loop to stop and waits for the thread to finish. + If not running, this method does nothing. + """ + if not self._running: + logger.warning("Broadcaster not running") + return + + self._running = False + if self._thread is None: + return + + with self._clients_lock: + client_count = len(self.clients) + timeout = ( + self.broadcast_interval + + (client_count * self.CLIENT_SEND_TIMEOUT) + + self.CLIENT_SEND_TIMEOUT + ) + self._thread.join(timeout=timeout) + self._thread = None + self._loop = None + logger.info("Broadcaster stopped") + + def add_client(self, client: server.ServerConnection) -> None: + """Adds a WebSocket client to the broadcast set. + + Args: + client: The WebSocket connection to add. + """ + with self._clients_lock: + self.clients.add(client) + logger.info("Client added, total clients: %d", len(self.clients)) + + def remove_client(self, client: server.ServerConnection) -> None: + """Removes a WebSocket client from the broadcast set. + + Args: + client: The WebSocket connection to remove. + """ + with self._clients_lock: + self.clients.discard(client) + logger.info("Client removed, total clients: %d", len(self.clients)) + + def format_frame(self, streams_data: List[Dict[str, Any]]) -> str: + """Formats stream data as a JSON frame for broadcasting. + + Creates a JSON structure containing timestamp and all stream data. + + Args: + streams_data: List of stream data dictionaries from presenter.poll_data(). + Each dictionary contains 'stream_name', 'data', and 'channel_labels'. + + Returns: + JSON string containing the formatted frame. + """ + frame = { + "streams": streams_data, + } + return json.dumps(frame) + + def _run(self) -> None: + """Main broadcast loop running in background thread. + + Continuously polls the presenter for data, formats it, and broadcasts + to all connected clients. Runs until stop() is called. + """ + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + logger.info("Broadcast loop started") + + while self._running: + try: + streams_data = self.presenter.poll_data() + + if streams_data: + frame = self.format_frame(streams_data) + self._broadcast_to_clients(frame) + + time.sleep(self.broadcast_interval) + + except Exception as err: + logger.error("Error in broadcast loop: %s", err) + time.sleep(self.broadcast_interval) + + self._loop.close() + logger.info("Broadcast loop ended") + + def _broadcast_to_clients(self, message: str) -> None: + """Sends a message to all connected clients. + + Iterates through all clients and sends the message asynchronously. + Disconnected clients are removed from the set. + + Args: + message: The JSON message string to broadcast. + """ + with self._clients_lock: + clients_snapshot = set(self.clients) + + disconnected: List[server.ServerConnection] = [] + + for client in clients_snapshot: + try: + if self._loop is not None: + future = asyncio.run_coroutine_threadsafe( + client.send(message), self._loop + ) + future.result(timeout=self.CLIENT_SEND_TIMEOUT) + except Exception as err: + logger.warning("Failed to send to client: %s", err) + disconnected.append(client) + + for client in disconnected: + self.remove_client(client) diff --git a/tests/unit/test_broadcaster.py b/tests/unit/test_broadcaster.py new file mode 100644 index 0000000..94ab438 --- /dev/null +++ b/tests/unit/test_broadcaster.py @@ -0,0 +1,478 @@ +"""Unit tests for the Broadcaster class.""" + +import asyncio +import json +import time +from unittest.mock import MagicMock, patch + +import pytest + +from MoBI_View.core import config +from MoBI_View.presenters import main_app_presenter +from MoBI_View.web import broadcaster + + +async def _async_noop(msg: str) -> None: + """Async no-op for mocking successful client.send().""" + + +async def _async_raise_connection_error(msg: str) -> None: + """Async function that raises ConnectionError for mocking failed send.""" + raise ConnectionError("Disconnected") + + +@pytest.fixture +def mock_presenter() -> MagicMock: + """Creates a mock MainAppPresenter.""" + mock = MagicMock(spec=main_app_presenter.MainAppPresenter) + mock.poll_data.return_value = [] + return mock + + +@pytest.fixture +def broadcaster_instance(mock_presenter: MagicMock) -> broadcaster.Broadcaster: + """Creates an unstarted Broadcaster instance.""" + return broadcaster.Broadcaster(presenter=mock_presenter, broadcast_interval=0.01) + + +def test_init_sets_presenter_and_defaults( + broadcaster_instance: broadcaster.Broadcaster, + mock_presenter: MagicMock, +) -> None: + """Tests __init__ sets presenter and initializes default state.""" + assert broadcaster_instance.presenter is mock_presenter + assert broadcaster_instance.clients == set() + assert broadcaster_instance._running is False + assert broadcaster_instance._thread is None + assert broadcaster_instance._loop is None + + +def test_init_uses_default_broadcast_interval_from_config( + mock_presenter: MagicMock, +) -> None: + """Tests __init__ uses Config.TIMER_INTERVAL when no interval provided.""" + expected_interval = config.Config.TIMER_INTERVAL / 1000 + + bc = broadcaster.Broadcaster(presenter=mock_presenter) + + assert bc.broadcast_interval == expected_interval + + +def test_init_uses_custom_broadcast_interval_when_provided( + mock_presenter: MagicMock, +) -> None: + """Tests __init__ uses custom interval when provided.""" + custom_interval = 0.1 + + bc = broadcaster.Broadcaster( + presenter=mock_presenter, + broadcast_interval=custom_interval, + ) + + assert bc.broadcast_interval == custom_interval + + +def test_start_sets_running_and_creates_thread( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests start() sets _running=True and creates a daemon thread.""" + broadcaster_instance.start() + + assert broadcaster_instance._running is True + assert broadcaster_instance._thread is not None + assert broadcaster_instance._thread.daemon is True + + broadcaster_instance.stop() + + +def test_start_when_already_running_logs_warning_and_does_nothing( + broadcaster_instance: broadcaster.Broadcaster, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests start(), when already running, logs warning and keeps same thread.""" + broadcaster_instance.start() + original_thread = broadcaster_instance._thread + + broadcaster_instance.start() + + assert broadcaster_instance._thread is original_thread + assert "already running" in caplog.text + + broadcaster_instance.stop() + + +def test_stop_when_not_running_logs_warning_and_returns( + broadcaster_instance: broadcaster.Broadcaster, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests stop(), when _running=False, logs warning and returns early.""" + broadcaster_instance.stop() + + assert "not running" in caplog.text + + +def test_stop_when_thread_is_none_returns_early( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests stop() returns early when _thread is None.""" + broadcaster_instance._running = True + broadcaster_instance._thread = None + broadcaster_instance._loop = MagicMock() + + broadcaster_instance.stop() + + assert broadcaster_instance._running is False + assert broadcaster_instance._loop is not None + + +def test_stop_joins_thread_and_cleans_up( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests stop() joins thread and sets _thread and _loop to None.""" + broadcaster_instance.start() + time.sleep(0.02) + + broadcaster_instance.stop() + + assert broadcaster_instance._running is False + assert broadcaster_instance._thread is None + assert broadcaster_instance._loop is None + + +def test_stop_calculates_timeout_from_clients( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests stop() calculates join timeout based on client count.""" + broadcaster_instance.start() + time.sleep(0.02) + mock_client1 = MagicMock() + mock_client2 = MagicMock() + broadcaster_instance.add_client(mock_client1) + broadcaster_instance.add_client(mock_client2) + + broadcaster_instance.stop() + + assert broadcaster_instance._thread is None + + +def test_add_client_adds_to_clients_set( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests add_client() adds client to the clients set.""" + mock_client = MagicMock() + + broadcaster_instance.add_client(mock_client) + + assert mock_client in broadcaster_instance.clients + assert len(broadcaster_instance.clients) == 1 + + +def test_add_client_logs_client_count( + broadcaster_instance: broadcaster.Broadcaster, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests add_client() logs the total client count.""" + mock_client = MagicMock() + + with caplog.at_level("INFO"): + broadcaster_instance.add_client(mock_client) + + assert "total clients: 1" in caplog.text + + +def test_add_client_multiple_times_stores_once( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests adding the same client twice only stores it once.""" + mock_client = MagicMock() + + broadcaster_instance.add_client(mock_client) + broadcaster_instance.add_client(mock_client) + + assert len(broadcaster_instance.clients) == 1 + + +def test_add_client_multiple_different_clients( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests adding multiple different clients.""" + client1 = MagicMock() + client2 = MagicMock() + + broadcaster_instance.add_client(client1) + broadcaster_instance.add_client(client2) + + assert len(broadcaster_instance.clients) == 2 + assert client1 in broadcaster_instance.clients + assert client2 in broadcaster_instance.clients + + +def test_remove_client_removes_from_set_and_logs_count( + broadcaster_instance: broadcaster.Broadcaster, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests remove_client() removes client from set and logs total count.""" + mock_client = MagicMock() + broadcaster_instance.add_client(mock_client) + + with caplog.at_level("INFO"): + broadcaster_instance.remove_client(mock_client) + + assert mock_client not in broadcaster_instance.clients + assert len(broadcaster_instance.clients) == 0 + assert "total clients: 0" in caplog.text + + +def test_remove_client_nonexistent_does_not_raise( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests remove_client() with nonexistent client does not raise (discard).""" + mock_client = MagicMock() + + broadcaster_instance.remove_client(mock_client) + + assert len(broadcaster_instance.clients) == 0 + + +def test_format_frame_empty_streams_returns_valid_json( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests format_frame() with empty list returns valid JSON with streams key.""" + result = broadcaster_instance.format_frame([]) + parsed = json.loads(result) + + assert "streams" in parsed + assert parsed["streams"] == [] + + +def test_format_frame_single_stream( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests format_frame() with single stream data.""" + streams_data = [ + { + "stream_name": "EEG", + "data": [1.0, 2.0, 3.0], + "channel_labels": ["Fp1", "Fp2", "Fz"], + } + ] + + result = broadcaster_instance.format_frame(streams_data) + parsed = json.loads(result) + + assert len(parsed["streams"]) == 1 + assert parsed["streams"][0]["stream_name"] == "EEG" + assert parsed["streams"][0]["data"] == [1.0, 2.0, 3.0] + assert parsed["streams"][0]["channel_labels"] == ["Fp1", "Fp2", "Fz"] + + +def test_format_frame_multiple_streams( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests format_frame() with multiple streams.""" + streams_data = [ + { + "stream_name": "EEG", + "data": [1.0, 2.0, 3.0], + "channel_labels": ["Fp1", "Fp2", "Fz"], + }, + { + "stream_name": "Accelerometer", + "data": [0.1, 0.2, 9.8], + "channel_labels": ["X", "Y", "Z"], + }, + ] + + result = broadcaster_instance.format_frame(streams_data) + + parsed = json.loads(result) + assert len(parsed["streams"]) == 2 + assert parsed["streams"][0]["stream_name"] == "EEG" + assert parsed["streams"][1]["stream_name"] == "Accelerometer" + + +def test_run_creates_and_closes_event_loop( + broadcaster_instance: broadcaster.Broadcaster, + mock_presenter: MagicMock, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests _run() creates event loop, logs start/end, and closes loop on exit.""" + broadcaster_instance._running = True + + def stop_after_one_iteration() -> None: + broadcaster_instance._running = False + + mock_presenter.poll_data.side_effect = stop_after_one_iteration + + with caplog.at_level("INFO"): + broadcaster_instance._run() + + assert "Broadcast loop started" in caplog.text + assert "Broadcast loop ended" in caplog.text + assert broadcaster_instance._loop is not None + assert broadcaster_instance._loop.is_closed() + + +def test_run_polls_presenter_for_data( + broadcaster_instance: broadcaster.Broadcaster, + mock_presenter: MagicMock, +) -> None: + """Tests _run() calls presenter.poll_data() each iteration.""" + call_count = 0 + + def stop_after_three_iterations() -> list[object]: + nonlocal call_count + call_count += 1 + if call_count >= 3: + broadcaster_instance._running = False + return [] + + broadcaster_instance._running = True + mock_presenter.poll_data.side_effect = stop_after_three_iterations + + broadcaster_instance._run() + + assert mock_presenter.poll_data.call_count == 3 + + +def test_run_broadcasts_when_data_available( + broadcaster_instance: broadcaster.Broadcaster, + mock_presenter: MagicMock, +) -> None: + """Tests _run() calls _broadcast_to_clients when poll_data returns data.""" + streams_data: list[object] = [ + {"stream_name": "EEG", "data": [1.0], "channel_labels": ["Fp1"]} + ] + + def return_data_then_stop() -> list[object]: + broadcaster_instance._running = False + return streams_data + + broadcaster_instance._running = True + mock_presenter.poll_data.side_effect = return_data_then_stop + + with patch.object(broadcaster_instance, "_broadcast_to_clients") as mock_broadcast: + broadcaster_instance._run() + + mock_broadcast.assert_called_once() + call_arg = mock_broadcast.call_args[0][0] + assert "EEG" in call_arg + + +def test_run_does_not_broadcast_when_no_data( + broadcaster_instance: broadcaster.Broadcaster, + mock_presenter: MagicMock, +) -> None: + """Tests _run() does not call _broadcast_to_clients when poll_data is empty.""" + + def return_empty_then_stop() -> list[object]: + broadcaster_instance._running = False + return [] + + broadcaster_instance._running = True + mock_presenter.poll_data.side_effect = return_empty_then_stop + + with patch.object(broadcaster_instance, "_broadcast_to_clients") as mock_broadcast: + broadcaster_instance._run() + + mock_broadcast.assert_not_called() + + +def test_run_handles_exception_and_continues( + broadcaster_instance: broadcaster.Broadcaster, + mock_presenter: MagicMock, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests _run() logs error and continues when exception occurs.""" + call_count = 0 + + def raise_then_stop() -> list[object]: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("Test error") + broadcaster_instance._running = False + return [] + + broadcaster_instance._running = True + mock_presenter.poll_data.side_effect = raise_then_stop + + broadcaster_instance._run() + + assert "Error in broadcast loop" in caplog.text + assert call_count == 2 + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +def test_broadcast_to_clients_sends_to_all_clients( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests _broadcast_to_clients() sends message to all connected clients.""" + broadcaster_instance._loop = asyncio.new_event_loop() + mock_client1 = MagicMock() + mock_client2 = MagicMock() + mock_client1.send = MagicMock(side_effect=_async_noop) + mock_client2.send = MagicMock(side_effect=_async_noop) + broadcaster_instance.add_client(mock_client1) + broadcaster_instance.add_client(mock_client2) + message = '{"streams": []}' + + broadcaster_instance._broadcast_to_clients(message) + + mock_client1.send.assert_called_once_with(message) + mock_client2.send.assert_called_once_with(message) + broadcaster_instance._loop.close() + + +def test_broadcast_to_clients_does_nothing_when_no_clients( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests _broadcast_to_clients() does nothing when no clients connected.""" + broadcaster_instance._loop = asyncio.new_event_loop() + message = '{"streams": []}' + + broadcaster_instance._broadcast_to_clients(message) + + assert len(broadcaster_instance.clients) == 0 + broadcaster_instance._loop.close() + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +def test_broadcast_to_clients_removes_disconnected_clients( + broadcaster_instance: broadcaster.Broadcaster, + caplog: pytest.LogCaptureFixture, +) -> None: + """Tests _broadcast_to_clients() removes all clients that fail to receive.""" + broadcaster_instance._loop = asyncio.new_event_loop() + mock_client1 = MagicMock() + mock_client2 = MagicMock() + mock_client1.send = MagicMock(side_effect=_async_raise_connection_error) + mock_client2.send = MagicMock(side_effect=_async_raise_connection_error) + broadcaster_instance.add_client(mock_client1) + broadcaster_instance.add_client(mock_client2) + message = '{"streams": []}' + + broadcaster_instance._broadcast_to_clients(message) + + mock_client1.send.assert_called_once_with(message) + mock_client2.send.assert_called_once_with(message) + assert mock_client1 not in broadcaster_instance.clients + assert mock_client2 not in broadcaster_instance.clients + assert len(broadcaster_instance.clients) == 0 + assert "Failed to send to client" in caplog.text + broadcaster_instance._loop.close() + + +def test_broadcast_to_clients_does_nothing_when_loop_is_none( + broadcaster_instance: broadcaster.Broadcaster, +) -> None: + """Tests _broadcast_to_clients() does not send when _loop is None.""" + broadcaster_instance._loop = None + mock_client = MagicMock() + broadcaster_instance.add_client(mock_client) + message = '{"streams": []}' + + broadcaster_instance._broadcast_to_clients(message) + + mock_client.send.assert_not_called() + assert mock_client in broadcaster_instance.clients diff --git a/uv.lock b/uv.lock index d93d5b5..fa1935a 100644 --- a/uv.lock +++ b/uv.lock @@ -214,6 +214,7 @@ source = { editable = "." } dependencies = [ { name = "numpy" }, { name = "pylsl" }, + { name = "websockets" }, ] [package.dev-dependencies] @@ -234,6 +235,7 @@ docs = [ requires-dist = [ { name = "numpy", specifier = ">=1.26.4,<2.3.0" }, { name = "pylsl", specifier = ">=1.17.6" }, + { name = "websockets", specifier = ">=14.0" }, ] [package.metadata.requires-dev] @@ -593,3 +595,44 @@ sdist = { url = "https://files.pythonhosted.org/packages/56/2c/444f465fb2c65f40c wheels = [ { url = "https://files.pythonhosted.org/packages/f3/40/b1c265d4b2b62b58576588510fc4d1fe60a86319c8de99fd8e9fec617d2c/virtualenv-20.31.2-py3-none-any.whl", hash = "sha256:36efd0d9650ee985f0cad72065001e66d49a6f24eb44d98980f630686243cf11", size = 6057982 }, ] + +[[package]] +name = "websockets" +version = "16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/04/24/4b2031d72e840ce4c1ccb255f693b15c334757fc50023e4db9537080b8c4/websockets-16.0.tar.gz", hash = "sha256:5f6261a5e56e8d5c42a4497b364ea24d94d9563e8fbd44e78ac40879c60179b5", size = 179346 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/20/74/221f58decd852f4b59cc3354cccaf87e8ef695fede361d03dc9a7396573b/websockets-16.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:04cdd5d2d1dacbad0a7bf36ccbcd3ccd5a30ee188f2560b7a62a30d14107b31a", size = 177343 }, + { url = "https://files.pythonhosted.org/packages/19/0f/22ef6107ee52ab7f0b710d55d36f5a5d3ef19e8a205541a6d7ffa7994e5a/websockets-16.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:8ff32bb86522a9e5e31439a58addbb0166f0204d64066fb955265c4e214160f0", size = 175021 }, + { url = "https://files.pythonhosted.org/packages/10/40/904a4cb30d9b61c0e278899bf36342e9b0208eb3c470324a9ecbaac2a30f/websockets-16.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:583b7c42688636f930688d712885cf1531326ee05effd982028212ccc13e5957", size = 175320 }, + { url = "https://files.pythonhosted.org/packages/9d/2f/4b3ca7e106bc608744b1cdae041e005e446124bebb037b18799c2d356864/websockets-16.0-cp310-cp310-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:7d837379b647c0c4c2355c2499723f82f1635fd2c26510e1f587d89bc2199e72", size = 183815 }, + { url = "https://files.pythonhosted.org/packages/86/26/d40eaa2a46d4302becec8d15b0fc5e45bdde05191e7628405a19cf491ccd/websockets-16.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:df57afc692e517a85e65b72e165356ed1df12386ecb879ad5693be08fac65dde", size = 185054 }, + { url = "https://files.pythonhosted.org/packages/b0/ba/6500a0efc94f7373ee8fefa8c271acdfd4dca8bd49a90d4be7ccabfc397e/websockets-16.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:2b9f1e0d69bc60a4a87349d50c09a037a2607918746f07de04df9e43252c77a3", size = 184565 }, + { url = "https://files.pythonhosted.org/packages/04/b4/96bf2cee7c8d8102389374a2616200574f5f01128d1082f44102140344cc/websockets-16.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:335c23addf3d5e6a8633f9f8eda77efad001671e80b95c491dd0924587ece0b3", size = 183848 }, + { url = "https://files.pythonhosted.org/packages/02/8e/81f40fb00fd125357814e8c3025738fc4ffc3da4b6b4a4472a82ba304b41/websockets-16.0-cp310-cp310-win32.whl", hash = "sha256:37b31c1623c6605e4c00d466c9d633f9b812ea430c11c8a278774a1fde1acfa9", size = 178249 }, + { url = "https://files.pythonhosted.org/packages/b4/5f/7e40efe8df57db9b91c88a43690ac66f7b7aa73a11aa6a66b927e44f26fa/websockets-16.0-cp310-cp310-win_amd64.whl", hash = "sha256:8e1dab317b6e77424356e11e99a432b7cb2f3ec8c5ab4dabbcee6add48f72b35", size = 178685 }, + { url = "https://files.pythonhosted.org/packages/f2/db/de907251b4ff46ae804ad0409809504153b3f30984daf82a1d84a9875830/websockets-16.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:31a52addea25187bde0797a97d6fc3d2f92b6f72a9370792d65a6e84615ac8a8", size = 177340 }, + { url = "https://files.pythonhosted.org/packages/f3/fa/abe89019d8d8815c8781e90d697dec52523fb8ebe308bf11664e8de1877e/websockets-16.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:417b28978cdccab24f46400586d128366313e8a96312e4b9362a4af504f3bbad", size = 175022 }, + { url = "https://files.pythonhosted.org/packages/58/5d/88ea17ed1ded2079358b40d31d48abe90a73c9e5819dbcde1606e991e2ad/websockets-16.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:af80d74d4edfa3cb9ed973a0a5ba2b2a549371f8a741e0800cb07becdd20f23d", size = 175319 }, + { url = "https://files.pythonhosted.org/packages/d2/ae/0ee92b33087a33632f37a635e11e1d99d429d3d323329675a6022312aac2/websockets-16.0-cp311-cp311-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:08d7af67b64d29823fed316505a89b86705f2b7981c07848fb5e3ea3020c1abe", size = 184631 }, + { url = "https://files.pythonhosted.org/packages/c8/c5/27178df583b6c5b31b29f526ba2da5e2f864ecc79c99dae630a85d68c304/websockets-16.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:7be95cfb0a4dae143eaed2bcba8ac23f4892d8971311f1b06f3c6b78952ee70b", size = 185870 }, + { url = "https://files.pythonhosted.org/packages/87/05/536652aa84ddc1c018dbb7e2c4cbcd0db884580bf8e95aece7593fde526f/websockets-16.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d6297ce39ce5c2e6feb13c1a996a2ded3b6832155fcfc920265c76f24c7cceb5", size = 185361 }, + { url = "https://files.pythonhosted.org/packages/6d/e2/d5332c90da12b1e01f06fb1b85c50cfc489783076547415bf9f0a659ec19/websockets-16.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1c1b30e4f497b0b354057f3467f56244c603a79c0d1dafce1d16c283c25f6e64", size = 184615 }, + { url = "https://files.pythonhosted.org/packages/77/fb/d3f9576691cae9253b51555f841bc6600bf0a983a461c79500ace5a5b364/websockets-16.0-cp311-cp311-win32.whl", hash = "sha256:5f451484aeb5cafee1ccf789b1b66f535409d038c56966d6101740c1614b86c6", size = 178246 }, + { url = "https://files.pythonhosted.org/packages/54/67/eaff76b3dbaf18dcddabc3b8c1dba50b483761cccff67793897945b37408/websockets-16.0-cp311-cp311-win_amd64.whl", hash = "sha256:8d7f0659570eefb578dacde98e24fb60af35350193e4f56e11190787bee77dac", size = 178684 }, + { url = "https://files.pythonhosted.org/packages/84/7b/bac442e6b96c9d25092695578dda82403c77936104b5682307bd4deb1ad4/websockets-16.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:71c989cbf3254fbd5e84d3bff31e4da39c43f884e64f2551d14bb3c186230f00", size = 177365 }, + { url = "https://files.pythonhosted.org/packages/b0/fe/136ccece61bd690d9c1f715baaeefd953bb2360134de73519d5df19d29ca/websockets-16.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:8b6e209ffee39ff1b6d0fa7bfef6de950c60dfb91b8fcead17da4ee539121a79", size = 175038 }, + { url = "https://files.pythonhosted.org/packages/40/1e/9771421ac2286eaab95b8575b0cb701ae3663abf8b5e1f64f1fd90d0a673/websockets-16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:86890e837d61574c92a97496d590968b23c2ef0aeb8a9bc9421d174cd378ae39", size = 175328 }, + { url = "https://files.pythonhosted.org/packages/18/29/71729b4671f21e1eaa5d6573031ab810ad2936c8175f03f97f3ff164c802/websockets-16.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9b5aca38b67492ef518a8ab76851862488a478602229112c4b0d58d63a7a4d5c", size = 184915 }, + { url = "https://files.pythonhosted.org/packages/97/bb/21c36b7dbbafc85d2d480cd65df02a1dc93bf76d97147605a8e27ff9409d/websockets-16.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e0334872c0a37b606418ac52f6ab9cfd17317ac26365f7f65e203e2d0d0d359f", size = 186152 }, + { url = "https://files.pythonhosted.org/packages/4a/34/9bf8df0c0cf88fa7bfe36678dc7b02970c9a7d5e065a3099292db87b1be2/websockets-16.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a0b31e0b424cc6b5a04b8838bbaec1688834b2383256688cf47eb97412531da1", size = 185583 }, + { url = "https://files.pythonhosted.org/packages/47/88/4dd516068e1a3d6ab3c7c183288404cd424a9a02d585efbac226cb61ff2d/websockets-16.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:485c49116d0af10ac698623c513c1cc01c9446c058a4e61e3bf6c19dff7335a2", size = 184880 }, + { url = "https://files.pythonhosted.org/packages/91/d6/7d4553ad4bf1c0421e1ebd4b18de5d9098383b5caa1d937b63df8d04b565/websockets-16.0-cp312-cp312-win32.whl", hash = "sha256:eaded469f5e5b7294e2bdca0ab06becb6756ea86894a47806456089298813c89", size = 178261 }, + { url = "https://files.pythonhosted.org/packages/c3/f0/f3a17365441ed1c27f850a80b2bc680a0fa9505d733fe152fdf5e98c1c0b/websockets-16.0-cp312-cp312-win_amd64.whl", hash = "sha256:5569417dc80977fc8c2d43a86f78e0a5a22fee17565d78621b6bb264a115d4ea", size = 178693 }, + { url = "https://files.pythonhosted.org/packages/72/07/c98a68571dcf256e74f1f816b8cc5eae6eb2d3d5cfa44d37f801619d9166/websockets-16.0-pp311-pypy311_pp73-macosx_10_15_x86_64.whl", hash = "sha256:349f83cd6c9a415428ee1005cadb5c2c56f4389bc06a9af16103c3bc3dcc8b7d", size = 174947 }, + { url = "https://files.pythonhosted.org/packages/7e/52/93e166a81e0305b33fe416338be92ae863563fe7bce446b0f687b9df5aea/websockets-16.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:4a1aba3340a8dca8db6eb5a7986157f52eb9e436b74813764241981ca4888f03", size = 175260 }, + { url = "https://files.pythonhosted.org/packages/56/0c/2dbf513bafd24889d33de2ff0368190a0e69f37bcfa19009ef819fe4d507/websockets-16.0-pp311-pypy311_pp73-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:f4a32d1bd841d4bcbffdcb3d2ce50c09c3909fbead375ab28d0181af89fd04da", size = 176071 }, + { url = "https://files.pythonhosted.org/packages/a5/8f/aea9c71cc92bf9b6cc0f7f70df8f0b420636b6c96ef4feee1e16f80f75dd/websockets-16.0-pp311-pypy311_pp73-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0298d07ee155e2e9fda5be8a9042200dd2e3bb0b8a38482156576f863a9d457c", size = 176968 }, + { url = "https://files.pythonhosted.org/packages/9a/3f/f70e03f40ffc9a30d817eef7da1be72ee4956ba8d7255c399a01b135902a/websockets-16.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:a653aea902e0324b52f1613332ddf50b00c06fdaf7e92624fbf8c77c78fa5767", size = 178735 }, + { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598 }, +]