-
Notifications
You must be signed in to change notification settings - Fork 1
feat(web): Implement WebSocket Broadcaster #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
a640753
Adding Broadcaster with start/stop lifecycle and poll-broadcast loop.…
kimit0310 ea290ba
liblsl now requires CMake 3.28+, which broke Windows CI builds
kimit0310 9cc3f71
explicitly setting PYLSL_LIB to point to the built library.
kimit0310 680b7e4
fix LSL setup on macOS and Windows
kimit0310 18f4793
Addressing comments, adding unit tests upto format_frame
kimit0310 827c9da
forgot to remove _run and _broadcast_to_clients.
kimit0310 c82023f
removing unused import
kimit0310 e3d9ca7
merging the two almost identical tests
kimit0310 5f0ad91
adding _run and _broadcast_to_clients and their tests.
kimit0310 4c53b79
combining a couple of tests together, and fixing mypy issues.
kimit0310 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """This is the web submodule for MoBI_View.""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| """WebSocket broadcaster for real-time data streaming. | ||
|
|
||
| This module provides the Broadcaster class that reads data from the presenter | ||
| and broadcasts it as JSON frames to all connected WebSocket clients. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import json | ||
| import logging | ||
| import threading | ||
| import time | ||
| from typing import Any, Dict, List, Optional, Set | ||
|
|
||
| from websockets.asyncio import server | ||
|
|
||
| from MoBI_View.core import config | ||
| from MoBI_View.presenters import main_app_presenter | ||
|
|
||
| logger = logging.getLogger("MoBI-View.web.broadcaster") | ||
|
|
||
|
|
||
| class Broadcaster: | ||
| """Broadcasts real-time data from presenter to WebSocket clients. | ||
|
|
||
| The Broadcaster runs a background thread that continuously polls the presenter | ||
| for new data, formats it as JSON, and broadcasts to all connected clients. | ||
|
|
||
| Attributes: | ||
| presenter: The MainAppPresenter instance providing data. | ||
| clients: Set of connected WebSocket clients. | ||
| broadcast_interval: Time between broadcasts in seconds. | ||
| """ | ||
|
|
||
| CLIENT_SEND_TIMEOUT: float = 1.0 | ||
|
|
||
| def __init__( | ||
| self, | ||
| presenter: main_app_presenter.MainAppPresenter, | ||
| broadcast_interval: Optional[float] = None, | ||
| ) -> None: | ||
| """Initializes the Broadcaster with a presenter and interval. | ||
|
|
||
| Args: | ||
| presenter: The MainAppPresenter instance to poll for data. | ||
| broadcast_interval: Time between broadcasts in seconds. Defaults to | ||
| Config.TIMER_INTERVAL converted to seconds. | ||
| """ | ||
| self.presenter = presenter | ||
| self.clients: Set[server.ServerConnection] = set() | ||
| self._clients_lock = threading.Lock() | ||
| self._running = False | ||
| self._thread: Optional[threading.Thread] = None | ||
| self._loop: Optional[asyncio.AbstractEventLoop] = None | ||
|
|
||
| self.broadcast_interval = ( | ||
| broadcast_interval or config.Config.TIMER_INTERVAL / 1000 | ||
| ) | ||
|
|
||
| def start(self) -> None: | ||
| """Starts the broadcast loop in a background thread. | ||
|
|
||
| Creates a new thread running the broadcast loop. If already running, | ||
| this method does nothing. | ||
| """ | ||
| if self._running: | ||
| logger.warning("Broadcaster already running") | ||
| return | ||
|
|
||
| self._running = True | ||
| self._thread = threading.Thread(target=lambda: None, daemon=True) | ||
| self._thread.start() | ||
| logger.info("Broadcaster started") | ||
|
|
||
| def stop(self) -> None: | ||
| """Stops the broadcast loop and waits for thread termination. | ||
|
|
||
| Signals the broadcast loop to stop and waits for the thread to finish. | ||
| If not running, this method does nothing. | ||
| """ | ||
| if not self._running: | ||
| logger.warning("Broadcaster not running") | ||
| return | ||
|
|
||
| self._running = False | ||
| if self._thread is None: | ||
| return | ||
|
|
||
| with self._clients_lock: | ||
| client_count = len(self.clients) | ||
| timeout = ( | ||
| self.broadcast_interval | ||
| + (client_count * self.CLIENT_SEND_TIMEOUT) | ||
| + self.CLIENT_SEND_TIMEOUT | ||
| ) | ||
| self._thread.join(timeout=timeout) | ||
| self._thread = None | ||
| self._loop = None | ||
| logger.info("Broadcaster stopped") | ||
|
|
||
| def add_client(self, client: server.ServerConnection) -> None: | ||
| """Adds a WebSocket client to the broadcast set. | ||
|
|
||
| Args: | ||
| client: The WebSocket connection to add. | ||
| """ | ||
| with self._clients_lock: | ||
| self.clients.add(client) | ||
| logger.info("Client added, total clients: %d", len(self.clients)) | ||
|
|
||
| def remove_client(self, client: server.ServerConnection) -> None: | ||
| """Removes a WebSocket client from the broadcast set. | ||
|
|
||
| Args: | ||
| client: The WebSocket connection to remove. | ||
| """ | ||
| with self._clients_lock: | ||
| self.clients.discard(client) | ||
| logger.info("Client removed, total clients: %d", len(self.clients)) | ||
|
|
||
| def format_frame(self, streams_data: List[Dict[str, Any]]) -> str: | ||
| """Formats stream data as a JSON frame for broadcasting. | ||
|
|
||
| Creates a JSON structure containing timestamp and all stream data. | ||
|
|
||
| Args: | ||
| streams_data: List of stream data dictionaries from presenter.poll_data(). | ||
| Each dictionary contains 'stream_name', 'data', and 'channel_labels'. | ||
|
|
||
| Returns: | ||
| JSON string containing the formatted frame. | ||
| """ | ||
| frame = { | ||
| "streams": streams_data, | ||
| } | ||
| return json.dumps(frame) | ||
|
|
||
| def _run(self) -> None: | ||
| """Main broadcast loop running in background thread. | ||
|
|
||
| Continuously polls the presenter for data, formats it, and broadcasts | ||
| to all connected clients. Runs until stop() is called. | ||
| """ | ||
| self._loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(self._loop) | ||
|
|
||
| logger.info("Broadcast loop started") | ||
|
|
||
| while self._running: | ||
| try: | ||
| streams_data = self.presenter.poll_data() | ||
|
|
||
| if streams_data: | ||
| frame = self.format_frame(streams_data) | ||
| self._broadcast_to_clients(frame) | ||
|
|
||
| time.sleep(self.broadcast_interval) | ||
|
|
||
| except Exception as err: | ||
| logger.error("Error in broadcast loop: %s", err) | ||
| time.sleep(self.broadcast_interval) | ||
|
|
||
| self._loop.close() | ||
| logger.info("Broadcast loop ended") | ||
|
|
||
| def _broadcast_to_clients(self, message: str) -> None: | ||
| """Sends a message to all connected clients. | ||
|
|
||
| Iterates through all clients and sends the message asynchronously. | ||
| Disconnected clients are removed from the set. | ||
|
|
||
| Args: | ||
| message: The JSON message string to broadcast. | ||
| """ | ||
| with self._clients_lock: | ||
| clients_snapshot = set(self.clients) | ||
|
|
||
| disconnected: List[server.ServerConnection] = [] | ||
|
|
||
| for client in clients_snapshot: | ||
| try: | ||
| if self._loop is not None: | ||
| future = asyncio.run_coroutine_threadsafe( | ||
| client.send(message), self._loop | ||
| ) | ||
| future.result(timeout=self.CLIENT_SEND_TIMEOUT) | ||
| except Exception as err: | ||
| logger.warning("Failed to send to client: %s", err) | ||
| disconnected.append(client) | ||
|
|
||
| for client in disconnected: | ||
| self.remove_client(client) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will move to config if we decide to keep it and if we want to make it tunable for users. Used in _broadcast_to_clients() and stop() timeout calculations, currently set as 1.0, which is pretty conservative.