diff --git a/gs/backend/gs_cli/cli.py b/gs/backend/gs_cli/cli.py new file mode 100644 index 000000000..0232de9bf --- /dev/null +++ b/gs/backend/gs_cli/cli.py @@ -0,0 +1,323 @@ +import io +import sys +import threading +from collections.abc import Callable +from sys import argv +from typing import cast + +from serial import Serial, SerialException +from textual import on +from textual.app import App, ComposeResult +from textual.containers import HorizontalGroup, HorizontalScroll, ScrollableContainer, VerticalScroll +from textual.reactive import reactive +from textual.widget import Widget +from textual.widgets import Button, DataTable, Input, Label, Static + +from gs.backend.gs_cli.ground_station_cli import GroundStationShell + +if len(argv) == 2: + COM_PORT = argv[1] + shell = GroundStationShell(COM_PORT) + + +class CliPanel(ScrollableContainer): + """ + CLI panel class, handling command input and output displays + """ + + cli_output = reactive("") + + def __init__(self, *widgets: Widget, id: str | None = None) -> None: # noqa: A002 + """ + Initialize the CLI panel and set up output redirection + """ + super().__init__(*widgets, id=id) + self.shell = shell + self.cli_output_panel: Static | None = None + # Hold the original output stream + self.sys_stdout = sys.stdout + + # Buf creates a StringIO instance, storing what is printed from the cli through a redirection of stdout + self.buffer = io.StringIO() + + # Redirects the output stream (sys.stdout) to buffer, and anything printed will now be written to buffer + sys.stdout = self.buffer + + # Redirects the output stream of the gs shell to buffer, allowing help descriptions to be displayed + self.shell.stdout = sys.stdout + # Redirects the "error" output stream of the gs shell to buffer + sys.stderr = sys.stdout + + if self.shell.intro is not None: + print(self.shell.intro) + + def on_mount(self) -> None: + """ + Set up periodic CLI output refresh and initialize the output panel + """ + self.output_refresh = self.set_interval(1 / 120, self.update_cli) + self.cli_output_panel = self.query_one("#cli-output-panel", Static) + + # Buffer.getvalue() returns the contents of the string buffer as a str + self.cli_output = self.buffer.getvalue() + + def on_unmount(self) -> None: + """ + Restore original output streams when the CLI panel is unmounted + """ + # Upon exitting the cli, restore all original output streams + sys.stdout = self.sys_stdout + sys.stderr = self.sys_stdout + self.shell.stdout = self.sys_stdout + + def watch_cli_output(self, cli_output: str) -> None: + """ + Update the CLI output panel with the latest CLI output + """ + if self.cli_output_panel: + self.cli_output_panel.update(f"CLI - {COM_PORT}\n" + self.cli_output) + + def update_cli(self) -> None: + """ + Refresh the CLI output from the buffer + """ + self.cli_output = self.buffer.getvalue() + + # Use threads to ensure blocking commands don't block CLI + def run_cli_command_in_thread(self, cmd_function: Callable[[str], None], args: str) -> None: + """ + Run a CLI command in a separate thread to avoid blocking the UI + """ + thread = threading.Thread(target=cmd_function, args=(args,), daemon=True) + thread.start() + + @on(Input.Submitted) + def submit_command(self) -> None: + """ + Handle command submission from the input widget and dispatch to the shell + """ + cli_input = self.query_one(Input) + print(f"(UW Orbital): {cli_input.value}") + + # Splits the input into two parts: command name and args + command_parts = cli_input.value.strip().split(maxsplit=1) + try: + # Obtain function from gs shell instance based on inputted command + cmd_function = getattr(self.shell, f"do_{command_parts[0]}") + args = command_parts[1] if len(command_parts) > 1 and command_parts[1] is not None else "" + + # Make special exception for "exit" command + if command_parts[0] == "exit": + self.app.exit() + return + + # Change print_logs cmd button status to STOP if manually typed in + if command_parts[0] == "print_logs": + print("[yellow]Use print_logs button below CMDS to exit polling.[/yellow]") + # Query_one("#btn-print_logs") returns a widget, so type cast widget to button + btn = cast(Button, self.app.query_one("#btn-print_logs")) + btn.label = "STOP" + + # Disable send_conn_request cmd btn if manually typed in + if command_parts[0] == "send_conn_request": + btn = cast(Button, self.app.query_one("#btn-send_conn_request")) + btn.disabled = True + + self.run_cli_command_in_thread(cmd_function, args) + + except (AttributeError, IndexError): + print(f"*** Unknown syntax: {cli_input.value}") + + self.scroll_to_bottom() + self.query_one(Input).value = "" + + def scroll_to_bottom(self) -> None: + """ + Auto-scroll to bottom of cli panel + """ + self.scroll_end() + + def compose(self) -> ComposeResult: + """ + Compose the CLI panel widgets + """ + yield Static("", id="cli-output-panel") + yield Input(placeholder="Enter command here:", id="cli-input") + + +class CmdButton(HorizontalGroup): + """ + A horizontal group containing command buttons for the CLI + """ + + def __init__(self, cmdname: str) -> None: + """ + Initialize the command button with the given command name + """ + super().__init__() + self.cmdname = cmdname + self.shell = shell + + def on_button_pressed(self, event: Button.Pressed) -> None: + """ + Handle the event when the command button is pressed + """ + cmd_function = getattr(self.shell, f"do_{self.cmdname}") + args = "" + + thread = threading.Thread(target=cmd_function, args=(args,), daemon=True) + + if event.button.label != "STOP": + print(f"(UW Orbital): {self.cmdname}") + thread.start() + + if self.cmdname != "print_logs": + event.button.disabled = True + + if self.cmdname == "print_logs": + if event.button.label == "Run": + print("[yellow]Use print_logs button below CMDS to exit polling.[/yellow]") + event.button.label = "STOP" + else: + event.button.label = "Run" + self.shell.stop_printing = True + + def compose(self) -> ComposeResult: + """ + Compose the command button and its label + """ + yield Label(f"{self.cmdname}", id="button-label") + yield Button("Run", id=f"btn-{self.cmdname}") + + +class TimeTaggedLogs(HorizontalScroll): + """ + A horizontal scrollable widget displaying time-tagged command logs + """ + + def __init__(self, *widgets: Widget, id: str | None = None) -> None: # noqa: A002 + """ + Initialize the time-tagged logs table with sample data + """ + super().__init__(*widgets, id=id) + self.rows = [ + ("CMD", "Time", "Cate1", "Cate2"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ("ABC", "ABC", "ABC", "ABC"), + ] + + def compose(self) -> ComposeResult: + """ + Compose the time-tagged logs table and label + """ + yield Label("TIME TAGGED CMDS") + yield DataTable() + + def on_mount(self) -> None: + """ + Set up columns and rows for the time-tagged logs table + """ + table = self.query_one(DataTable) + table.add_columns(*self.rows[0]) + table.add_rows(self.rows[1:]) + + +class LogsPanel(Static): + """ + A static widget for displaying logs from a file + """ + + logs = reactive("") + + def on_mount(self) -> None: + """ + Set up periodic log refresh for the logs panel + """ + self.logs_refresh = self.set_interval(1 / 60, self.update_logs) + + def update_logs(self) -> None: + """ + Read and update logs from the log file + """ + try: + with open("gs/backend/logs.log") as logs: + self.logs = logs.read() + except FileNotFoundError: + self.update("[red]Logs file not found. Try running interface from root directory (OBC-Firmware)[/red]") + + def watch_logs(self, logs: str) -> None: + """ + Update logs panel with the latest logs + """ + self.update("LOGS\n\n" + self.logs) + self.scroll_to_bottom() + + def scroll_to_bottom(self) -> None: + """ + Auto-scroll to bottom of logs panel + """ + scrollable_container = self.app.query_one("#logs") + if scrollable_container: + scrollable_container.scroll_end() + + +class CLIWindow(App[None]): + """ + Main Textual application window for the CLI interface + """ + + CSS_PATH = "cli.tcss" + + def on_mount(self) -> None: + """ + Set the theme when application mounts + """ + self.theme = "dracula" + + def compose(self) -> ComposeResult: + """ + Compose the main application layout with all panels and widgets + """ + yield ScrollableContainer(LogsPanel("LOGS"), can_focus=True, id="logs") + yield CliPanel(id="cli") + yield VerticalScroll( + Static("CMDS"), + CmdButton("print_logs"), + CmdButton("send_conn_request"), + CmdButton("start_logging"), + id="cmd-panel", + ) + yield TimeTaggedLogs(id="timetagged") + + +def main() -> None: + """ + Entry point for the CLI application; opens the serial port at the com port and runs the ground station cli. + """ + if len(argv) != 2: + print("One argument needed: Com Port") + return + + try: + ser = Serial(COM_PORT) + print("Comm port set to: " + str(ser.name)) + ser.close() + + except SerialException as e: + print(f"An error occurred while opening the serial port: {e}") + + app = CLIWindow() + app.run() + + +if __name__ == "__main__": + main() diff --git a/gs/backend/gs_cli/cli.tcss b/gs/backend/gs_cli/cli.tcss new file mode 100644 index 000000000..0a084fb9f --- /dev/null +++ b/gs/backend/gs_cli/cli.tcss @@ -0,0 +1,128 @@ +Screen { + background: rgb(36, 36, 36); + overflow-y: auto; + layout: grid; + grid-size: 3 3; + grid-columns: 1fr 2fr 1fr; + grid-rows: 2fr auto 1fr; +} + +Static { + overflow-y: auto; + color: white; + text-style: bold; +} + +Button.-style-default:focus { + text-style: none; + background-tint: red 0%; +} + +#logs { + background: rgb(36, 36, 36); + row-span: 3; + scrollbar-size: 2 1; + scrollbar-color: rgb(85, 161, 232); + scrollbar-color-hover: rgb(64, 128, 188); + scrollbar-color-active: rgb(77, 161, 239); + border: round rgb(85, 161, 232) 60%; + height: 100%; + padding-left: 1; +} + +#cli { + row-span: 2; + background: rgb(36, 36, 36); + column-span: 1; + height: 1fr; + border: round rgb(242, 97, 97) 60%; + padding-left: 1; + scrollbar-size: 2 1; + scrollbar-color: rgb(242, 97, 97); + scrollbar-color-hover: rgb(196, 79, 79); + scrollbar-color-active: rgb(254, 115, 115); + overflow-y: auto; +} + +#cli-input { + dock: bottom; + border: blank rgb(28, 28, 28); + background: rgb(42, 42, 42); +} + +#cmd-panel { + column-span: 1; + row-span: 3; + scrollbar-size: 2 1; + background: rgb(36, 36, 36); + height: 1fr; + padding-left: 1; + border: round rgb(85, 161, 232) 60%; + scrollbar-color: rgb(85, 161, 232); + scrollbar-color-hover: rgb(64, 128, 188); + scrollbar-color-active: rgb(77, 161, 239); +} + +#cmd-panel > Static { + padding-bottom: 1; +} + +#cmd-panel > CmdButton { + layout: grid; + grid-columns: auto auto; + background: $boost; + color: auto; + margin-bottom: 1; + padding: 1; + padding-left: 3; + margin-right: 2; +} + +#cmd-panel > CmdButton > Button { + background: rgb(45,45,45); + align: right middle; + column-span: 1; +} + +#cmd-panel > CmdButton > Label { + column-span: 1; + padding-bottom: 1; +} + +#timetagged { + layout: grid; + color: white; + background: rgb(36, 36, 36); + height: 1fr; + border: round rgb(85, 161, 232) 60%; + grid-rows: auto; + padding-left: 1; + column-span: 1; +} + +#timetagged > Label { + padding-bottom: 1; +} + +#timetagged > DataTable { + height: 100%; + background: rgb(40,40,40); +} + +DataTable > .datatable--header{ + background: rgb(48, 48, 48); +} + +DataTable > .datatable--header-hover { + background: $block-hover-background 30%; +} + +DataTable:focus > .datatable--cursor { + background: dodgerblue; +} + +DataTable > .datatable--cursor { + background: $block-hover-background 10%; + color: $block-cursor-blurred-foreground; + text-style: $block-cursor-blurred-text-style; +} diff --git a/gs/backend/ground_station_cli.py b/gs/backend/gs_cli/ground_station_cli.py similarity index 91% rename from gs/backend/ground_station_cli.py rename to gs/backend/gs_cli/ground_station_cli.py index ed997664b..e712ec945 100644 --- a/gs/backend/ground_station_cli.py +++ b/gs/backend/gs_cli/ground_station_cli.py @@ -20,6 +20,7 @@ def __init__(self, com_port: str) -> None: self._com_port: str = com_port self._conn_request_sent: bool = False self.background_logging: Process | None = None + self.stop_printing = False # At the start of each command shell use, we clear the file and create a dated title with open(LOG_PATH, "w") as file: @@ -33,7 +34,7 @@ def __init__(self, com_port: str) -> None: ██║ ██║██║███╗██║ ██║ ██║██╔══██╗██╔══██╗██║ ██║ ██╔══██║██║ ╚██████╔╝╚███╔███╔╝ ╚██████╔╝██║ ██║██████╔╝██║ ██║ ██║ ██║███████╗ ╚═════╝ ╚══╝╚══╝ ╚═════╝ ╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚══════╝ - Welcome to the UW Orbital Command Line Inteface! Type help or ? to list commands.\n + Welcome to the UW Orbital Command Line Interface! Type help or ? to list commands.\n """ prompt = "(UW Orbital): " file = None @@ -72,6 +73,7 @@ def do_send_conn_request(self, line: str) -> None: finally: self._restart_logging() + print("Connection request successfully sent.") self._conn_request_sent = True def do_send_command(self, line: str) -> None: @@ -129,21 +131,26 @@ def do_start_logging(self, line: str) -> None: def do_print_logs(self, line: str) -> None: """ - Prints out logs and polls for log_pathlogs that are comming in. Use a Keyboard Interupt to exit (e.g. Ctrl + C) + Prints out logs and polls for log_pathlogs that are coming in. Use a Keyboard Interrupt to exit (e.g. Ctrl + C) """ + self.stop_printing = False + # Preliminary checks for the function to run # Write out the logs that we previously got with open(LOG_PATH) as file: - print(file.read()) + print("Printing " + file.read()) if self.background_logging is not None: self.background_logging.kill() # Here we run the function and catch an interrupt if it is executed by the user try: - poll(self._com_port, LOG_PATH, 1, True) + # We use lambda so that the poll function can acquire updated stop_printing values later + poll(self._com_port, LOG_PATH, 1, lambda: self.stop_printing) except KeyboardInterrupt: print("Exiting polling...") + finally: + print("Exiting polling...") self._restart_logging() diff --git a/gs/backend/obc_utils/command_utils.py b/gs/backend/obc_utils/command_utils.py index fba76e429..342a31e1e 100644 --- a/gs/backend/obc_utils/command_utils.py +++ b/gs/backend/obc_utils/command_utils.py @@ -318,13 +318,16 @@ def generate_command(args: str) -> tuple[CmdMsg | None, bool]: return command, is_timetagged -def poll(com_port: str, file_path: str | Path, timeout: int = 0, print_console: bool = False) -> None: +def poll( + com_port: str, + file_path: str | Path, + timeout: int = 0, + stop_flag: Callable[[], bool] | None = None, +) -> None: """ A function that is supposed to run in the background to keep receiving logs from the board :param com_port: The port that the board is connected to so it can poll - :param print_console: Whether the function should print to console or not. By default, this is set to False. This is - useful for the CLI where sometimes we want to print out the received logs from the board """ comms = CommsPipeline() @@ -340,6 +343,10 @@ def poll(com_port: str, file_path: str | Path, timeout: int = 0, print_console: open(file_path, "a") as file, ): while True: + # We use a stop flag here in order to break the loop without raising KeyboardInterrupt + if stop_flag and stop_flag(): + break + data = ser.read(100000) start_index = data.find(b"\x7e") end_index = data.rfind(b"\x7e") @@ -366,5 +373,3 @@ def poll(com_port: str, file_path: str | Path, timeout: int = 0, print_console: file.write(data_string) file.flush() - if print_console and len(data_string) != 0: - print(data_string) diff --git a/requirements.txt b/requirements.txt index b8fd7b2d8..29293e28d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # Regular dependencies +textual==6.2.0 requests==2.31.0 pyserial==3.5 skyfield==1.48 @@ -32,5 +33,5 @@ pytest==7.4.0 pytest-cov==4.1.0 mypy==1.8.0 ruff==0.2.0 -psycopg[binary]==3.2.6 # Must be installed before `pytest-postgresql` +psycopg[binary] pytest-postgresql==7.0.1