diff --git a/.gitignore b/.gitignore index 4f805c8..42c152e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Ignore Limewire log files limewire*.json +# Ignore Hydrant log files +system-events*.log + # Created by https://www.toptal.com/developers/gitignore/api/python,macos,windows # Edit at https://www.toptal.com/developers/gitignore?templates=python,macos,windows diff --git a/pyproject.toml b/pyproject.toml index ac198f2..c9dae37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,13 +10,17 @@ requires-python = "<=3.13,>=3.12" dependencies = [ "asyncudp>=0.11.0", "click>=8.1.8", + "libsass>=0.23.0", "loguru>=0.7.3", "nicegui>=2.23.2", + "openpyxl>=3.1.5", + "pandas>=2.3.3", "platformdirs>=4.5.0", "python-dotenv<2.0.0,>=1.0.1", "scapy>=2.6.1", "seaborn>=0.13.2", "synnax>=0.46,<0.47", + "tftpy==0.8.5", ] name = "limewire" version = "0.1.0" diff --git a/src/hydrant/__init__.py b/src/hydrant/__init__.py index 91dcd13..6e710f8 100644 --- a/src/hydrant/__init__.py +++ b/src/hydrant/__init__.py @@ -1,5 +1,8 @@ +import os +import pathlib + import click -from nicegui import ui +from nicegui import app, ui from limewire.util import SocketAddress @@ -10,12 +13,32 @@ @click.argument( "fc_address", type=SocketAddress(), default="141.212.192.170:5000" ) -def main(fc_address: tuple[str, int]): +@click.option( + "--log-table", + default=None, + type=click.Path(exists=True, dir_okay=False, path_type=pathlib.Path), +) +def main(fc_address: tuple[str, int], log_table: pathlib.Path): print("! HYDRANT RUNNING !") - hydrant = Hydrant(fc_address) + script_dir = os.path.dirname(os.path.abspath(__file__)) + + app.add_static_file( + url_path="/favicon.ico", + local_file=os.path.join(script_dir, "resources/favicon.ico"), + ) + app.add_static_file( + url_path="/lebron.png", + local_file=os.path.join(script_dir, "resources/lebron.png"), + ) + app.add_static_file( + url_path="/lebron_shoot.jpg", + local_file=os.path.join(script_dir, "resources/lebron_shoot.jpg"), + ) + + hydrant = Hydrant(fc_address, log_table) - ui.run(hydrant.main_page, show=False, reload=False) + ui.run(hydrant.main_page, show=False, reload=False, favicon="favicon.ico") if __name__ == "__main__": diff --git a/src/hydrant/device_command_history.py b/src/hydrant/device_command_history.py index b04bcb8..eeb040d 100644 --- a/src/hydrant/device_command_history.py +++ b/src/hydrant/device_command_history.py @@ -29,7 +29,7 @@ def set_ack(self, ack_time: datetime, ack_msg: str): def to_gui_dict(self): date_format = "%b %d, %Y %I:%M:%S %p" return { - "Board": self.board.name, + "Board": self.board.pretty_name, "Command": self.command.name, "Send Time": self.send_time.strftime(date_format), "ACK?": self.ack, diff --git a/src/hydrant/eeprom_generate.py b/src/hydrant/eeprom_generate.py new file mode 100644 index 0000000..75cab5c --- /dev/null +++ b/src/hydrant/eeprom_generate.py @@ -0,0 +1,190 @@ +import ipaddress +import struct +import zlib +from enum import Enum +from io import BytesIO + +import tftpy + + +class ValveVoltage(Enum): + V12 = 0 # 12V + V24 = 1 # 24V + + +class TCGain(Enum): + X1 = 0 + X2 = 1 + X4 = 2 + X8 = 3 + X16 = 4 + X32 = 5 + X64 = 6 + X128 = 7 + + @classmethod + def from_int(cls, gain: int): + if gain == 1: + return TCGain.Gain_1x + elif gain == 2: + return TCGain.Gain_2x + elif gain == 4: + return TCGain.Gain_4x + elif gain == 8: + return TCGain.Gain_8x + elif gain == 16: + return TCGain.Gain_16x + elif gain == 32: + return TCGain.Gain_32x + elif gain == 64: + return TCGain.Gain_64x + elif gain == 128: + return TCGain.Gain_128x + else: + raise Exception("TC gain must be a power of 2 from 1 to 128") + + +class PT: + def __init__(self, range: float, offset: float, max: float): + self.range = range + self.offset = offset + self.max_voltage = max + + +class TC: + def __init__(self, gain: TCGain): + self.gain = gain + + +class VLV: + def __init__(self, voltage: ValveVoltage, enabled: bool): + self.voltage = voltage + self.enabled = enabled + + +def generate_bb_eeprom( + bb_num: int, + fc_ip: ipaddress.IPv4Address, + bb_ip: ipaddress.IPv4Address, + pts: list[PT], + tcs: list[TC], + vlvs: list[VLV], +) -> bytes: + if len(pts) != 10: + raise ValueError( + "Bay Board must be configured with exactly 10 PT channels" + ) + if len(tcs) != 6: + raise ValueError( + "Bay Board must be configured with exactly 6 TC channels" + ) + if len(vlvs) != 5: + raise ValueError( + "Bay Board must be configured with exactly 5 valve channels" + ) + if bb_num < 1 or bb_num > 3: + raise ValueError("Bay Board must be configured as BB1 through BB3") + raw_out = struct.pack(" bytes: + if len(pts) != 5: + raise ValueError( + "Flight Computer must be configured with exactly 5 PT channels" + ) + if len(tcs) != 3: + raise ValueError( + "Flight Computer must be configured with exactly 3 TC channels" + ) + if len(vlvs) != 3: + raise ValueError( + "Flight Computer must be configured with exactly 3 valve channels" + ) + raw_out = bytes() + for x in pts: + raw_out += ( + struct.pack(" + @keyframes flash-red { + 100% { background-color: black; } + 0% { background-color: #942626; } + } + + """) + # HEADER with ui.header().classes( "bg-black text-white px-6 py-4 border-b border-gray-700" @@ -100,71 +158,251 @@ def main_page(self): ) # MAIN PAGE CONTENT - with ui.column().classes( - "w-full p-6 gap-4 max-w-7xl mx-auto" - ) as main_page_content: + with ui.row().classes("w-full mx-auto no-wrap") as main_page_content: self.main_page_content = main_page_content - # DEVICE COMMANDS - with ui.card().classes( - "w-full bg-gray-900 border border-gray-700 p-6" - ): - ui.label("DEVICE COMMANDS").classes( - "text-xl font-bold text-white mb-4" - ) - with ui.column().classes("w-full gap-3"): - # BOARD - ui.label("BOARD").classes("text-lg font-bold text-white") - - # Board selector - self.board_select = ui.select( - label="Select a board", - options=list(self.boards_available.keys()), - ).classes("w-full") - - # Command - ui.label("COMMAND").classes("text-lg font-bold text-white") - self.command_select = ui.select( - label="Select a command", - options=list(self.commands_available.keys()), - ).classes("w-full") - - # Dialog that is used for popup - with ui.dialog() as dialog, ui.card(): - self.confirm_label = ui.label("") - with ui.row(): - ui.button( - "YES", - on_click=lambda: self.send_after_confirm( - dialog - ), - ) - ui.button("NO", on_click=lambda: dialog.close()) - + # DEVICE COMMANDS & SYSTEM CONFIGURATION + with ui.column().classes("w-full p-6 gap-4"): + with ui.row().classes("w-full no-wrap justify-center relative"): + main_page_toggle = ( + ui.toggle( + { + 1: "Device Commands", + 2: "System Configuration", + 3: "Event Log", + }, + value=1, + ) + .classes("self-center border-2 border-[#2f2d63]") + .props('toggle-color="purple"') + ) ui.button( - "SEND", on_click=lambda: self.send_command(dialog) - ).classes("w-half bg-blue-600 text-white hover:bg-blue-700") - - # COMMAND HISTORY CARD - self.command_history_table() - - # ERROR LOG CARD + "Reset", on_click=self.warn_restore_defaults + ).classes("absolute right-0").bind_visibility_from( + main_page_toggle, "value", backward=lambda v: v == 2 + ) + with ( + ui.tab_panels() + .classes("w-full bg-[#121212]") + .props('animated="false"') + .bind_value_from(main_page_toggle, "value") + ): + with ui.tab_panel(1).classes("p-0"): + # DEVICE COMMANDS + with ui.row().classes( + "w-full mx-auto no-wrap h-[27em] gap-0" + ): + with ui.column().classes("w-1/2 h-full pr-2"): + with ui.card().classes( + "w-full bg-gray-900 border border-gray-700 p-6 h-full" + ): + ui.label("DEVICE COMMANDS").classes( + "text-xl font-bold text-white mb-4" + ) + with ui.column().classes("w-full gap-3"): + # BOARD + ui.label("BOARD").classes( + "text-lg font-bold text-white" + ) + # Board selector + self.board_select = ui.select( + label="Select a board", + options=list( + self.boards_available.keys() + ), + ).classes("w-full") + # Command + ui.label("COMMAND").classes( + "text-lg font-bold text-white" + ) + self.command_select = ui.select( + label="Select a command", + options=list( + self.commands_available.keys() + ), + ).classes("w-full") + # Dialog that is used for popup + with ui.dialog() as dialog, ui.card(): + self.confirm_label = ui.label("") + with ui.row(): + ui.button( + "YES", + on_click=lambda: self.send_after_confirm( + dialog + ), + ) + ui.button( + "NO", + on_click=lambda: dialog.close(), + ) + ui.button( + "SEND", + on_click=lambda: self.send_command( + dialog + ), + ).classes( + "w-half bg-blue-600 text-white hover:bg-blue-700" + ) + with ui.column().classes("w-1/2 h-full pl-2"): + # ERROR LOG + with ui.column().classes("w-full gap-4 h-full"): + # ERROR LOG CARD + self.error_log.display() + with ui.row().classes("w-full mx-auto no-wrap"): + # COMMAND HISTORY CARD + self.command_history_table() + with ui.tab_panel(2).classes("p-0"): + # SYSTEM CONFIGURATION + with ui.row().classes( + "w-full mx-auto no-wrap h-[31em] gap-0" + ): + with ui.column().classes("w-1/2 pr-2 h-full"): + self.system_config = SystemConfigUI( + self, self.log_listener + ) + with ui.column().classes("w-1/2 pl-2 h-full"): + # ERROR LOG + with ui.column().classes("w-full gap-4 h-full"): + # ERROR LOG CARD + self.error_log.display() + with ui.row().classes("w-full mx-auto no-wrap"): + # BOARD SPECIFIC CONFIG + with ui.card().classes( + "w-full bg-gray-900 border border-gray-700 p-6" + ): + with ui.row().classes("w-full mx-auto no-wrap"): + ui.label("MANUAL BOARD CONFIG").classes( + "text-xl font-bold text-white mb-4" + ) + ui.space() + board_config_toggle = ( + ui.toggle( + { + 1: "Flight Computer", + 2: "Bay Board 1", + 3: "Bay Board 2", + 4: "Bay Board 3", + 5: "Flight Recorder", + }, + value=1, + ) + .classes("border-1 border-[#2f2d63]") + .props( + 'toggle-color="lime" toggle-text-color="black"' + ) + ) + with ( + ui.tab_panels() + .classes("bg-gray-900") + .props('animated="false"') + .bind_value_from( + board_config_toggle, "value" + ) + ): + with ui.tab_panel(1).classes("p-0"): + self.FC_TFTP_IP = IPAddressUI( + DEFAULT_FC_IP, "TFTP IP" + ) + with ui.tab_panel(2).classes("p-0"): + self.BB1_TFTP_IP = IPAddressUI( + DEFAULT_BB1_IP, "TFTP IP" + ) + with ui.tab_panel(3).classes("p-0"): + self.BB2_TFTP_IP = IPAddressUI( + DEFAULT_BB2_IP, "TFTP IP" + ) + with ui.tab_panel(4).classes("p-0"): + self.BB3_TFTP_IP = IPAddressUI( + DEFAULT_BB3_IP, "TFTP IP" + ) + with ui.tab_panel(5).classes("p-0"): + self.FR_TFTP_IP = IPAddressUI( + DEFAULT_FR_IP, "TFTP IP" + ) + ui.separator().classes("h-1") + with ui.row().classes("w-full mx-auto no-wrap"): + with ( + ui.tab_panels() + .classes("w-full bg-gray-900") + .props('animated="false"') + .bind_value_from( + board_config_toggle, "value" + ) + ): + with ui.tab_panel(1).classes("p-0"): + self.FC_config = FCConfigUI() + with ui.tab_panel(2).classes("p-0"): + self.BB1_config = BBConfigUI(1) + with ui.tab_panel(3).classes("p-0"): + self.BB2_config = BBConfigUI(2) + with ui.tab_panel(4).classes("p-0"): + self.BB3_config = BBConfigUI(3) + with ui.tab_panel(5).classes("p-0"): + self.FR_config = FRConfigUI() + # FC CONNECTION DIV + with ( + ui.element("div") + .style( + "position: fixed; right: 1.5rem; bottom: 1.5rem; z-index: 1000; box-shadow: 0 0 0.5em #7f9fbf35; background-color: black;" + ) + .classes("rounded-sm") as fc_conn_stat + ): + self.fc_connection_status = fc_conn_stat + fc_conn_stat.set_visibility(not self.start_fc_connection_status) with ui.card().classes( - "w-full bg-gray-900 border border-gray-700 p-6" + "bg-transparent text-white p-6 pl-4 shadow-lg" ): - ui.label("Error Log").classes( - "text-xl font-bold text-red-400 mb-4" - ) - - error_column = ui.column().classes("w-full overflow-y-auto") - with error_column: - ui.label("Errors will appear here").classes( - "text-gray-500 italic" - ) + with ui.row().classes("no-wrap"): + ui.icon("error", color="yellow") + with ui.column(): + ui.label("Flight Computer disconnected.").classes( + "text-bold" + ) + ui.label("Trying to reconnect...") + ui.space().classes("h-32") + + self.log_listener.attach_ui(self.error_log) + + def warn_restore_defaults(self): + with ( + ui.dialog() as dialog, + ui.card().classes( + "w-100 h-30 flex flex-col justify-center items-center" + ), + ): + ui.button(icon="close", on_click=lambda e: dialog.close()).classes( + "absolute right-0 top-0 bg-transparent" + ).props('flat color="white" size="lg"') + ui.label("Confirm reset to defaults").classes("text-xl") + ui.button( + "Confirm", + on_click=lambda e: (self.restore_defaults(), dialog.close()), + ) + dialog.open() + + def restore_defaults(self): + self.system_config.ICD_config = None + self.FC_TFTP_IP.set_ip(DEFAULT_FC_IP) + self.BB1_TFTP_IP.set_ip(DEFAULT_BB1_IP) + self.BB2_TFTP_IP.set_ip(DEFAULT_BB2_IP) + self.BB3_TFTP_IP.set_ip(DEFAULT_BB3_IP) + self.FR_TFTP_IP.set_ip(DEFAULT_FR_IP) + + self.FC_config.restore_defaults() + self.BB1_config.restore_defaults() + self.BB2_config.restore_defaults() + self.BB3_config.restore_defaults() + self.FR_config.restore_defaults() + + self.system_config.reset_progress_indicators() + self.system_config.ICD_file.reset() + self.system_config.ICD_config = None def send_command(self, dialog): """Initialize send command process on button press""" + if self.board_select.value is None or self.command_select.value is None: + return self.selected_board_name = self.board_select.value self.selected_command_name = self.command_select.value @@ -183,23 +421,35 @@ async def send_after_confirm(self, dialog): self.board = self.boards_available[self.selected_board_name] self.command = self.commands_available[self.selected_command_name] + if self.fc_writer: + msg = DeviceCommandMessage(self.board, self.command) + msg_bytes = bytes(msg) - msg = DeviceCommandMessage(self.board, self.command) - msg_bytes = bytes(msg) - - print( - f"Sending {self.selected_command_name} to board {self.selected_board_name}" - ) - self.fc_writer.write(len(msg_bytes).to_bytes(1) + msg_bytes) - await self.fc_writer.drain() + print( + f"Sending {self.selected_command_name} to board {self.selected_board_name}" + ) + self.fc_writer.write(len(msg_bytes).to_bytes(1) + msg_bytes) + await self.fc_writer.drain() - new_entry = DeviceCommandHistoryEntry( - board=msg.board, - command=msg.command, - send_time=datetime.now(), - ) - self.device_command_history.append(new_entry) - self.device_command_recency[(msg.board, msg.command)] = new_entry + new_entry = DeviceCommandHistoryEntry( + board=msg.board, + command=msg.command, + send_time=datetime.now(), + ) + self.device_command_history.append(new_entry) + self.device_command_recency[(msg.board, msg.command)] = new_entry + else: + self.fc_connection_status.classes( + add="animate-[flash-red_1s_ease-in-out_1]" + ) + ui.timer( + 1, + lambda: self.fc_connection_status.classes( + remove="animate-[flash-red_1s_ease-in-out_1]" + ), + active=True, + once=True, + ) self.refresh_history_table() def refresh_history_table(self): @@ -219,8 +469,17 @@ def command_history_table(self): column_defs = [] for field in DeviceCommandHistoryEntry.fields(): col_def = {"field": field} + col_def["tooltipField"] = field if field == "Send Time": col_def["sort"] = "desc" + + if field == "ACK?": + col_def["width"] = 100 + col_def["maxWidth"] = 100 + elif field == "ACK Message": + pass + else: + col_def["maxWidth"] = 210 column_defs.append(col_def) with history_column: diff --git a/src/hydrant/hydrant_error_ui.py b/src/hydrant/hydrant_error_ui.py new file mode 100644 index 0000000..83ecbbe --- /dev/null +++ b/src/hydrant/hydrant_error_ui.py @@ -0,0 +1,389 @@ +import asyncio +import ipaddress +import logging +import os +import pathlib +from collections import deque +from datetime import datetime, timezone + +import pandas as pd +from nicegui import ui + +from lmp.firmware_log import FirmwareLog + +EVENT_LOG_PORT = 1234 + +EVENT_LOG_FILE = "" + +LOG_TABLE_SHEET = "Lookup Table" + +EVENT = 15 + +EEPROM_CODES = [ + 219, + 220, + 221, + 222, + 705, +] # Event codes indicating something changed with the eeprom config + + +class LogTable: + def __init__(self, table_path: pathlib.Path): + self.table_path = table_path + table_df = pd.read_csv(table_path, na_filter=False) + self.lookup_table = {} + for _, row in table_df.iterrows(): + self.lookup_table[int(row["Number"])] = { + "type": str(row["Type"]), + "name": str(row["Name"]), + "function": str(row["Function"]), + "severity": str(row["Severity"]) + if not pd.isna(row["Severity"]) + else "None", + "details": str(row["Note (optional)"]), + } + + def get_type(self, code: int): + err = self.lookup_table.get(code) + if err is not None: + return err["type"] + else: + return None + + def get_name(self, code: int): + err = self.lookup_table.get(code) + if err is not None: + return err["name"] + else: + return None + + def get_function(self, code: int): + err = self.lookup_table.get(code) + if err is not None: + return err["function"] + else: + return None + + def get_severity(self, code: int): + err = self.lookup_table.get(code) + if err is not None: + return err["severity"] + else: + return None + + def get_details(self, code: int): + err = self.lookup_table.get(code) + if err is not None: + return err["details"] + else: + return None + + +class EventLogUI: + def __init__(self, lookup_table: LogTable = None): + self.tables: list[ui.table] = [] + self.listener: EventLogListener = None + self.cur_id = 0 + self.lookup_table = lookup_table + + def display(self): + ui.add_sass(""" + .sticky-table + height: 100% + .q-table__top, + .q-table__bottom, + thead tr:first-child th + background-color: #2b2b2b + + th + font-size: 14px + thead tr th + position: sticky + z-index: 10 + thead tr:first-child th + top: 0 + tbody + scroll-margin-top: 48px + """) + ui.add_sass(""" + .fullscreen + height: 100vw !important + """) + with ui.card().classes( + "w-full bg-gray-900 border border-gray-700 p-6 pt-4 gap-2 h-full" + ): + with ui.row().classes("w-full no-wrap"): + ui.label("Event Log").classes("text-xl font-bold text-red-400") + ui.space() + ui.button("Clear", on_click=self.clear_log).props( + "outline" + ).classes("self-center") + ui.button( + icon="fullscreen", + on_click=lambda e: log_table.run_method("toggleFullscreen"), + ).props("flat round dense size='20px'") + columns = [ + { + "name": "msg", + "label": "Message", + "field": "msg", + "required": False, + "align": "left", + "sortable": False, + "classes": "min-w-80", + "headerClasses": "min-w-80", + "style": "overflow: hidden;overflow-wrap: break-word;white-space: normal;", + }, + { + "name": "board", + "label": "Board", + "field": "board", + "required": False, + "sortable": True, + "align": "left", + }, + { + "name": "timestamp", + "label": "Timestamp", + "field": "timestamp", + "required": False, + "sortable": True, + "align": "left", + "sortOrder": "da", + }, + { + "name": "code", + "label": "Code", + "field": "code", + "required": False, + "sortable": True, + "align": "left", + }, + { + "name": "ip", + "label": "IP Address", + "field": "ip", + "required": False, + "sortable": True, + "align": "left", + }, + { + "name": "id", + "label": "id", + "field": "id", + "required": True, + "classes": "hidden", + "headerClasses": "hidden", + }, + { + "name": "tooltip", + "label": "tooltip", + "field": "tooltip", + "required": False, + "classes": "hidden", + "headerClasses": "hidden", + }, + ] + log_table = ( + ui.table(columns=columns, rows=[]) + .classes("w-full overflow-y-auto sticky-table") + .props( + "no-data-label hide-no-data dense table-header-class='size-xl'" + ) + ) + log_table.pagination = { + "sortBy": "timestamp", + "rowsPerPage": 0, + "descending": True, + } + log_table.add_slot( + "body-cell-msg", + """ + + + {{ props.value }} + + """, + ) + log_table.add_slot( + "top-right", + """ + + """, + ) + self.tables.append(log_table) + + def clear_log(self, e=None): + if self.listener is not None: + self.listener.log_buffer.clear() + + for x in self.tables: + x.rows = [] + + def add_log( + self, + log: FirmwareLog, + addr: ipaddress.IPv4Address = None, + localtime: bool = True, + ): + time_str = None + if log.timestamp is not None: + timestamp = log.timestamp + if localtime: + local_zone = datetime.now(timezone.utc).astimezone().tzinfo + timestamp = timestamp.astimezone(local_zone) + time_str = f"{timestamp.strftime('%b %d, %Y %I:%M:%S.')}{timestamp.microsecond // 1000} {timestamp.strftime('%p')} {timestamp.strftime('%Z')}" + + for x in self.tables: + x.add_row( + { + "msg": log.message, + "board": log.board.pretty_name + if log.board is not None + else None, + "timestamp": time_str, + "code": log.status_code, + "ip": addr, + "id": self.cur_id, + "tooltip": self.generate_tooltip(log.status_code), + } + ) + self.cur_id += 1 + + def attach_listener(self, listener): + self.listener = listener + + def generate_tooltip(self, code: int): + if code is None: + return None + if self.lookup_table is None: + return None + error_name = self.lookup_table.get_name(code) + error_type = self.lookup_table.get_type(code) + error_severity = self.lookup_table.get_severity(code) + error_details = self.lookup_table.get_details(code) + if error_name is None: + return None + tooltip_msg = f"Error {error_name}: Type - {error_type}, Severity - {error_severity}

{error_details}" + return tooltip_msg + + +class EventLogListener: + def __init__(self): + self.eeprom_response: asyncio.Future = None + self.log_UIs: list[EventLogUI] = [] + self.transport = None + self.log_buffer = deque(maxlen=100) + log_setup = logging.getLogger("events") + logdir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "logs" + ) + os.makedirs(logdir, exist_ok=True) + log_file = os.path.join( + logdir, "system-events.log" + ) # TODO figure out logging location + formatter = logging.Formatter( + "%(levelname)s: %(asctime)s %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p -", + ) + filehandler = logging.FileHandler(log_file, mode="a") + filehandler.setFormatter(formatter) + log_setup.setLevel(logging.INFO) + log_setup.addHandler(filehandler) + + def attach_ui(self, ui: EventLogUI): + self.log_UIs.append(ui) + ui.attach_listener(self) + for x in self.log_buffer: + ui.add_log(x[0], x[1]) + + async def open_listener(self): + while True: + try: + loop = asyncio.get_event_loop() + ( + self.transport, + self.handler, + ) = await loop.create_datagram_endpoint( + self.create_protocol, + ("0.0.0.0", EVENT_LOG_PORT), + reuse_address=False, + reuse_port=False, + ) + except Exception as err: + print(f"Error opening log listener: {str(err)}") + await asyncio.sleep(1) + continue + + try: + if self.handler is not None: + await self.handler.wait_for_close() + except asyncio.CancelledError: + print("Log listener cancelled.") + break + except Exception as e: + print(f"Got exception: {e}") + continue + + def create_protocol(self): + return EventLogProtocol(self) + + def log_to_UIs(self, log: FirmwareLog, addr: ipaddress.IPv4Address): + self.log_buffer.append((log, addr)) + for x in self.log_UIs: + x.add_log(log, addr=addr, localtime=True) + + async def setup_future(self) -> asyncio.Future: + if self.eeprom_response is not None and not self.eeprom_response.done(): + self.eeprom_response.cancel() + await asyncio.sleep( + 0 + ) # yield event loop and trigger cancelled response + self.eeprom_response = asyncio.get_event_loop().create_future() + return self.eeprom_response + + def trigger_eeprom_response(self, log: FirmwareLog): + if self.eeprom_response is not None and not self.eeprom_response.done(): + self.eeprom_response.set_result(log) + + +class EventLogProtocol(asyncio.DatagramProtocol): + def __init__(self, listener): + super().__init__() + self.listener: EventLogListener = listener + self.open = False + + def connection_made(self, transport): + self.transport = transport + self.open = True + + def datagram_received(self, data, addr): + log = FirmwareLog.from_bytes(data) + self.listener.log_to_UIs(log, addr[0]) + if ( + log.status_code is not None + and (log.status_code % 1000) in EEPROM_CODES + ): + try: + self.listener.trigger_eeprom_response(log) + except Exception as err: + print("Error triggering eeprom config response " + str(err)) + logging.getLogger("events").info( + log.to_log() + f", IP: {addr[0]}" + ) # TODO change to a custom log level after merging with hydrant-reconnection + + def connection_lost(self, exc): + self.open = False + + async def wait_for_close(self): + while self.open: + await asyncio.sleep(0.5) diff --git a/src/hydrant/hydrant_system_config.py b/src/hydrant/hydrant_system_config.py new file mode 100644 index 0000000..3da6190 --- /dev/null +++ b/src/hydrant/hydrant_system_config.py @@ -0,0 +1,675 @@ +import ipaddress +from io import BytesIO + +import pandas as pd +import synnax as sy +from synnax.hardware import ni + +ICD_SHEET = "AVI Mappings 25-26" + +DEFAULT_GSE_IP = ipaddress.IPv4Address("141.212.192.160") +DEFAULT_FC_IP = ipaddress.IPv4Address("141.212.192.170") +DEFAULT_BB1_IP = ipaddress.IPv4Address("141.212.192.180") +DEFAULT_BB2_IP = ipaddress.IPv4Address("141.212.192.190") +DEFAULT_BB3_IP = ipaddress.IPv4Address("141.212.192.200") +DEFAULT_FR_IP = ipaddress.IPv4Address("141.212.192.210") +DEFAULT_PT_RANGE = 1000 +DEFAULT_PT_OFFSET = 0.5 +DEFAULT_PT_MAX = 4.5 +DEFAULT_TC_GAIN = 1 +DEFAULT_VALVE_VOLTAGE = 12 +DEFAULT_VALVE_ENABLED = False + +NUM_FC_PTs = 5 +NUM_BB_PTs = 10 + +NUM_FC_TCs = 3 +NUM_BB_TCs = 6 + +NUM_FC_VLVs = 3 +NUM_BB_VLVs = 5 + +analog_task_name: str = "Sensors" +analog_card_model: str = "PCI-6225" + +digital_task_name: str = "Valves" +digital_card_model: str = "PCI-6514" + +tc_calibrations = { + "1": {"slope": 1.721453951, "offset": -7.888645383}, + "2": {"slope": 1.717979575, "offset": -7.887206187}, + "3": {"slope": 1.749114263, "offset": -8.059569538}, + "4": {"slope": 1.746326017, "offset": -7.988352324}, + "5": {"slope": 1.758960807, "offset": -8.000751167}, + "6": {"slope": 1.723974665, "offset": -7.891630334}, + "7": {"slope": 1.703447212, "offset": -7.961173615}, + "8": {"slope": 1.725947472, "offset": -7.928342723}, + "9": {"slope": 1.223907933, "offset": -3.041473799}, + "10": {"slope": 1.163575088, "offset": -3.001507707}, + "11": {"slope": 1.183121251, "offset": -2.962919485}, + "12": {"slope": 1.255762908, "offset": -2.436113303}, + "13": {"slope": 1.209157541, "offset": -3.018604306}, + "14": {"slope": 1.154169121, "offset": -2.924291025}, +} + + +class ICD: + def __init__(self, content: bytes, name: str): + self.name = name + ICD_df = pd.read_excel( + BytesIO(content), + header=1, + sheet_name=ICD_SHEET, + keep_default_na=False, + na_values="", + ) + try: + ICD_df["Type"] = ICD_df["Type"].ffill(axis=0) + ICD_df["Connection Location"] = ICD_df["Connection Location"].ffill( + axis=0 + ) + except KeyError as err: + raise KeyError("Missing critical column: " + str(err)) + self.ebox_channels = [] + self.fc_channels = [] + self.bb1_channels = [] + self.bb2_channels = [] + self.bb3_channels = [] + self.ips = [] + setup_thermistor = False + + for row_num, row in ICD_df.iterrows(): + try: + if pd.isna(row["Name"]): + continue + if row["Connection Location"] == "EBOX": + if row["Name"] == "": + continue + try: + if "Margin" in row["Name"]: + continue + if "Broken Channel" in row["Name"]: + continue + except Exception: + continue + channel_num = int(row["Channel"]) + if row["Type"] == "PTs": + if ( + pd.isna(row["Max Pressure"]) + or pd.isna(row["Calibration Offset (V)"]) + or pd.isna(row["Max Output Voltage"]) + ): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "PT", + "channel": channel_num, + "port": channel_num - 1, + "max": int(row["Max Pressure"]), + "min": 0, + "offset": float(row["Calibration Offset (V)"]), + "max_voltage": float(row["Max Output Voltage"]), + } + self.ebox_channels.append(channel) + elif row["Type"] == "VLVs": + if channel_num >= 17: + port = 6 + elif channel_num >= 9: + port = 5 + elif channel_num >= 0: + port = 4 + else: + raise ICDValueException( + "Invalid channel number in row", row.to_dict() + ) + channel = { + "name": row["Name"], + "type": "VLV", + "channel": channel_num, + "port": port, + "line": (channel_num - 1) % 8, + } + self.ebox_channels.append(channel) + elif row["Type"] == "TCs": + if not setup_thermistor: + channel = { + "type": "Thermistor", + "name": "Thermistor", + "signal": 78, + "supply": 79, + "max": 8, + "min": -8, + } + setup_thermistor = True + self.ebox_channels.append(channel) + channel = { + "name": row["Name"], + "type": "TC", + "channel": channel_num, + "port": channel_num - 1 + 64, + "max": 8, + "min": -8, + } + self.ebox_channels.append(channel) + else: + raise ICDValueException( + "Unknown peripheral type", row.to_dict() + ) + elif row["Connection Location"] == "Press Bay Board": + if row["Name"] == "": + continue + + if row["Type"] == "PTs": + if ( + pd.isna(row["Max Pressure"]) + or pd.isna(row["Calibration Offset (V)"]) + or pd.isna(row["Max Output Voltage"]) + ): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "PT", + "channel": int(row["Channel"]), + "range": int(row["Max Pressure"]), + "offset": float(row["Calibration Offset (V)"]), + "max_voltage": float(row["Max Output Voltage"]), + } + self.bb1_channels.append(channel) + elif row["Type"] == "TCs": + channel = { + "name": row["Name"], + "type": "TC", + "channel": int(row["Channel"]), + "gain": int(calculate_tc_gain(row["TC Range"])), + } + self.bb1_channels.append(channel) + elif row["Type"] == "VLVs": + if pd.isna(row["Supply Voltage (V)"]): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "VLV", + "channel": int(row["Channel"]), + "voltage": int(row["Supply Voltage (V)"]), + } + self.bb1_channels.append(channel) + else: + raise ICDValueException( + "Unknown peripheral type", row.to_dict() + ) + elif row["Connection Location"] == "Intertank Bay Board": + if row["Name"] == "": + continue + + if row["Type"] == "PTs": + if ( + pd.isna(row["Max Pressure"]) + or pd.isna(row["Calibration Offset (V)"]) + or pd.isna(row["Max Output Voltage"]) + ): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "PT", + "channel": int(row["Channel"]), + "range": int(row["Max Pressure"]), + "offset": float(row["Calibration Offset (V)"]), + "max_voltage": float(row["Max Output Voltage"]), + } + self.bb2_channels.append(channel) + elif row["Type"] == "TCs": + channel = { + "name": row["Name"], + "type": "TC", + "channel": int(row["Channel"]), + "gain": int(calculate_tc_gain(row["TC Range"])), + } + self.bb2_channels.append(channel) + elif row["Type"] == "VLVs": + if pd.isna(row["Supply Voltage (V)"]): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "VLV", + "channel": int(row["Channel"]), + "voltage": int(row["Supply Voltage (V)"]), + } + self.bb2_channels.append(channel) + else: + raise ICDValueException( + "Unknown peripheral type", row.to_dict() + ) + elif row["Connection Location"] == "Engine Bay Board": + if row["Name"] == "": + continue + + if row["Type"] == "PTs": + if ( + pd.isna(row["Max Pressure"]) + or pd.isna(row["Calibration Offset (V)"]) + or pd.isna(row["Max Output Voltage"]) + ): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "PT", + "channel": int(row["Channel"]), + "range": int(row["Max Pressure"]), + "offset": float(row["Calibration Offset (V)"]), + "max_voltage": float(row["Max Output Voltage"]), + } + self.bb3_channels.append(channel) + elif row["Type"] == "TCs": + channel = { + "name": row["Name"], + "type": "TC", + "channel": int(row["Channel"]), + "gain": int(calculate_tc_gain(row["TC Range"])), + } + self.bb3_channels.append(channel) + elif row["Type"] == "VLVs": + if pd.isna(row["Supply Voltage (V)"]): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "VLV", + "channel": int(row["Channel"]), + "voltage": int(row["Supply Voltage (V)"]), + } + self.bb3_channels.append(channel) + else: + raise ICDValueException( + "Unknown peripheral type", row.to_dict() + ) + elif row["Connection Location"] == "Flight Computer": + if row["Name"] == "": + continue + + if row["Type"] == "PTs": + if ( + str(row["Max Pressure"]) == "N/A" + or str(row["Supply Voltage (V)"]) == "N/A" + ): + print("Skipping Flight Computer Fluctus channel") + continue # Special case for Fluctus channel + if ( + pd.isna(row["Max Pressure"]) + or pd.isna(row["Calibration Offset (V)"]) + or pd.isna(row["Max Output Voltage"]) + ): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "PT", + "channel": int(row["Channel"]), + "range": int(row["Max Pressure"]), + "offset": float(row["Calibration Offset (V)"]), + "max_voltage": float(row["Max Output Voltage"]), + } + self.fc_channels.append(channel) + elif row["Type"] == "TCs": + channel = { + "name": row["Name"], + "type": "TC", + "channel": int(row["Channel"]), + "gain": int(calculate_tc_gain(row["TC Range"])), + } + self.fc_channels.append(channel) + elif row["Type"] == "VLVs": + if pd.isna(row["Supply Voltage (V)"]): + raise ICDValueException( + "Missing value while processing ICD", + row.to_dict(), + ) + channel = { + "name": row["Name"], + "type": "VLV", + "channel": int(row["Channel"]), + "voltage": int(row["Supply Voltage (V)"]), + } + self.fc_channels.append(channel) + else: + raise ICDValueException( + "Unknown peripheral type", row.to_dict() + ) + elif row["Type"] == "IPs": + if pd.isna(row["Connection Location"]): + continue + addr = { + "device": row["Name"], + "ip": str(row["Connection Location"]), + } + self.ips.append(addr) + else: + continue + except ValueError: + raise ICDException("ICD processing error", row.to_dict()) + except KeyError as err: + raise KeyError("Missing critical column: " + str(err)) + + +def calculate_tc_gain(range): + return 8 # lol we only use Type-T + + +def configure_ebox(channels) -> tuple[bool, str]: + try: + synnax_client = synnax_login( + "127.0.0.1" + ) # this should be running on the same machine as synnax + analog_task, digital_task, analog_card = create_tasks(synnax_client, 50) + setup_channels( + synnax_client, channels, analog_task, digital_task, analog_card + ) + configure_tasks(synnax_client, analog_task, digital_task) + return (True, "") + except Exception as e: + return (False, str(e)) + + +def setup_channels( + client: sy.Synnax, channels, analog_task, digital_task, analog_card +): + print("Creating channels in Synnax") + # yes_to_all = False # create new synnax channels for all items in the sheet? + + for channel in channels: + if channel["type"] == "PT": + print(f" > Creating PT: {channel['name']}") + setup_pt(client, channel, analog_task, analog_card) + elif channel["type"] == "VLV": + print(f" > Creating VLV: {channel['name']}") + setup_vlv(client, channel, digital_task) + elif channel["type"] == "TC": + print(f" > Creating TC: {channel['name']}") + setup_tc(client, channel, analog_task, analog_card) + elif channel["type"] == "Thermistor": + print(" > Creating Thermistor") + setup_thermistor(client, channel, analog_task, analog_card) + else: + raise Exception( + f"Sensor type {channel['type']} in channels dict not recognized (issue with the script)" + ) + print(" > Successfully created channels in Synnax") + + +def setup_pt(client: sy.Synnax, channel, analog_task, analog_card): + time_channel = client.channels.create( + name="gse_sensor_time", + data_type=sy.DataType.TIMESTAMP, + retrieve_if_name_exists=True, + is_index=True, + ) + + pt_channel = client.channels.create( + name=f"gse_pt_{channel['channel']}", + data_type=sy.DataType.FLOAT32, + index=time_channel.key, + retrieve_if_name_exists=True, + ) + + analog_task.config.channels.append( + ni.AIVoltageChan( + channel=pt_channel.key, + device=analog_card.key, + port=channel["port"], + custom_scale=ni.LinScale( + slope=( + channel["max"] + / ( + channel["max_voltage"] - channel["offset"] + ) # slope is max output in psi over output range (4 volts for our PTs) + ), + y_intercept=( + -( + channel["max"] + / (channel["max_voltage"] - channel["offset"]) + ) + * channel[ + "offset" + ] # y intercept is negative slope at 0.5 volts + ), + pre_scaled_units="Volts", + scaled_units="PoundsPerSquareInch", + ), + terminal_config="RSE", + max_val=channel["max"], + min_val=channel["min"], + ) + ) + + +def setup_vlv(client: sy.Synnax, channel, digital_task): + gse_state_time = client.channels.create( + name="gse_state_time", + is_index=True, + data_type=sy.DataType.TIMESTAMP, + retrieve_if_name_exists=True, + ) + + state_chan = client.channels.create( + name=f"gse_state_{channel['channel']}", + data_type=sy.DataType.UINT8, + retrieve_if_name_exists=True, + index=gse_state_time.key, + ) + + cmd_chan = client.channels.create( + name=f"gse_vlv_{channel['channel']}", + data_type=sy.DataType.UINT8, + retrieve_if_name_exists=True, + virtual=True, + ) + + digital_task.config.channels.append( + ni.DOChan( + cmd_channel=cmd_chan.key, + state_channel=state_chan.key, + port=channel["port"], + line=channel["line"], + ) + ) + + +def setup_thermistor(client: sy.Synnax, channel, analog_task, analog_card): + time_channel = client.channels.create( + name="gse_sensor_time", + data_type=sy.DataType.TIMESTAMP, + retrieve_if_name_exists=True, + is_index=True, + ) + + therm_supply = client.channels.create( + name="gse_thermistor_supply", + data_type=sy.DataType.FLOAT32, + index=time_channel.key, + retrieve_if_name_exists=True, + ) + + therm_signal = client.channels.create( + name="gse_thermistor_signal", + data_type=sy.DataType.FLOAT32, + index=time_channel.key, + retrieve_if_name_exists=True, + ) + + analog_task.config.channels.append( + ni.AIVoltageChan( + channel=therm_supply.key, + device=analog_card.key, + port=channel["supply"], + min_val=channel["min"], + max_val=channel["max"], + terminal_config="RSE", + ), + ) + + analog_task.config.channels.append( + ni.AIVoltageChan( + channel=therm_signal.key, + device=analog_card.key, + port=channel["signal"], + min_val=channel["min"], + max_val=channel["max"], + terminal_config="RSE", + ), + ) + + +def setup_tc(client: sy.Synnax, channel, analog_task, analog_card): + time_channel = client.channels.create( + name="gse_sensor_time", + data_type=sy.DataType.TIMESTAMP, + retrieve_if_name_exists=True, + is_index=True, + ) + + tc_channel = client.channels.create( + name=f"gse_tc_{channel['channel']}_raw", # TCs without any CJC, CJC happens in a seperate script in real-time + data_type=sy.DataType.FLOAT32, + index=time_channel.key, + retrieve_if_name_exists=True, + ) + + analog_task.config.channels.append( + ni.AIVoltageChan( + channel=tc_channel.key, + device=analog_card.key, + port=channel["port"], + custom_scale=ni.LinScale( + slope=tc_calibrations[str(channel["channel"])]["slope"], + y_intercept=tc_calibrations[str(channel["channel"])]["offset"], + pre_scaled_units="Volts", + scaled_units="Volts", + ), + terminal_config="RSE", + max_val=channel["max"], + ) + ) + + +def synnax_login(cluster: str) -> sy.Synnax: + try: + client = sy.Synnax( + host=cluster, + port=9090, + username="synnax", + password="seldon", + ) + except Exception as e: + raise Exception( + f"Could not connect to Synnax at {cluster}, check to make sure Synnax is running. Original error: {str(e)}" + ) + return client + + +def create_tasks(client: sy.Synnax, frequency: int): + print("Creating tasks...") + print(" > Scanning for cards...") + try: + analog_card = client.hardware.devices.retrieve(model=analog_card_model) + print( + " > Analog card '" + + analog_card.make + + " " + + analog_card.model + + "' found!" + ) + except Exception: + raise Exception( + "Analog card '" + + analog_card_model + + "' not found, are you sure it's connected? Maybe try re-enabling the NI Device Scanner." + ) + + try: + digital_card = client.hardware.devices.retrieve( + model=digital_card_model + ) + print( + " > Digital card '" + + digital_card.make + + " " + + digital_card.model + + "' found!" + ) + except Exception: + raise Exception( + "Digital card '" + + digital_card_model + + "' not found, are you sure it's connected? Maybe try re-enabling the NI Device Scanner." + ) + + analog_task = ni.AnalogReadTask( + name=analog_task_name, + sample_rate=sy.Rate.HZ * frequency, + stream_rate=sy.Rate.HZ * frequency / 2, + data_saving=True, + channels=[], + ) + + digital_task = ni.DigitalWriteTask( + name=digital_task_name, + device=digital_card.key, + state_rate=sy.Rate.HZ * frequency, + data_saving=True, + channels=[], + ) + + return analog_task, digital_task, analog_card + + +def configure_tasks(client: sy.Synnax, analog_task, digital_task): + print("Configuring tasks... (this may take a while)") + + if ( + analog_task.config.channels != [] + ): # only configure if there are channels + print(" > Attempting to configure analog task") + client.hardware.tasks.configure( + task=analog_task, timeout=6000 + ) # long timeout cause our NI hardware is dumb + print(" > Successfully configured analog task!") + if digital_task.config.channels != []: + print(" > Attempting to configure digital task") + client.hardware.tasks.configure(task=digital_task, timeout=500) + print(" > Successfully configured digital task!") + print(" > All tasks have been successfully created!") + + +class ICDException(Exception): + def __init__(self, *args): + super().__init__(*args) + self.type = args[0] + self.row = args[1] + for k, v in self.row.items(): + if pd.isna(v): + self.row[k] = "-" + + +class ICDValueException(ICDException): + pass diff --git a/src/hydrant/hydrant_ui.py b/src/hydrant/hydrant_ui.py new file mode 100644 index 0000000..a9e8e21 --- /dev/null +++ b/src/hydrant/hydrant_ui.py @@ -0,0 +1,1429 @@ +import asyncio +import copy +import datetime +import ipaddress +import traceback + +from nicegui import run, ui + +from lmp.firmware_log import FirmwareLog +from lmp.util import Board + +from .eeprom_generate import ( + PT, + TC, + VLV, + TCGain, + ValveVoltage, + configure_bb, + configure_fc, + configure_fr, +) +from .hydrant_error_ui import EventLogListener +from .hydrant_system_config import ( + DEFAULT_BB1_IP, + DEFAULT_BB2_IP, + DEFAULT_BB3_IP, + DEFAULT_FC_IP, + DEFAULT_FR_IP, + DEFAULT_GSE_IP, + DEFAULT_PT_MAX, + DEFAULT_PT_OFFSET, + DEFAULT_PT_RANGE, + DEFAULT_TC_GAIN, + DEFAULT_VALVE_ENABLED, + DEFAULT_VALVE_VOLTAGE, + ICD, + ICDException, + NUM_BB_PTs, + NUM_BB_TCs, + NUM_BB_VLVs, + NUM_FC_PTs, + NUM_FC_TCs, + NUM_FC_VLVs, + configure_ebox, +) + +progress_lookup = { + -1: {"name": "close", "color": "red"}, + 0: {"name": "", "color": "black"}, + 1: {"name": "check", "color": "green"}, +} + +EEPROM_RESPONSE_TIMEOUT = 1 # seconds + + +class IPAddressUI: + def __init__(self, initial: ipaddress.IPv4Address, name: str): + self.ip = initial + with ui.row().classes("no-wrap items-end gap-1"): + ui.label(name + ": ").classes("self-center") + oct0 = ( + ui.input( + value=self.get_octet(0), + on_change=IPUIOctetHandle(self, 0), + ) + .props("dense outlined") + .classes("min-w-[4em] w-[4em]") + ) + ui.label(".") + oct1 = ( + ui.input( + value=self.get_octet(1), + on_change=IPUIOctetHandle(self, 1), + ) + .props("dense outlined") + .classes("min-w-[4em] w-[4em]") + ) + ui.label(".") + oct2 = ( + ui.input( + value=self.get_octet(2), + on_change=IPUIOctetHandle(self, 2), + ) + .props("dense outlined") + .classes("min-w-[4em] w-[4em]") + ) + ui.label(".") + oct3 = ( + ui.input( + value=self.get_octet(3), + on_change=IPUIOctetHandle(self, 3), + ) + .props("dense outlined") + .classes("min-w-[4em] w-[4em]") + ) + + self.octs = [oct0, oct1, oct2, oct3] + + def set_octet(self, octet_index: int, new_octet: int): + if new_octet > 255 or new_octet < 0: + raise ValueError("IP octet " + str(new_octet) + " not valid") + ip_int = int(self.ip) + ip_int &= ~(0xFF << (8 * (3 - octet_index))) + ip_int |= new_octet << (8 * (3 - octet_index)) + self.ip = ipaddress.IPv4Address(ip_int) + + def get_octet(self, octet_index: int): + ip_int = int(self.ip) + return (ip_int >> (8 * (3 - octet_index))) & 0x000000FF + + def reset_octet(self, octet_index: int): + self.octs[octet_index].value = self.get_octet(octet_index) + + def set_ip(self, ipaddr: ipaddress.IPv4Address): + self.ip = ipaddr + for x in range(4): + self.octs[x].set_value(self.get_octet(x)) + + +class IPUIOctetHandle: + parent: IPAddressUI + + def __init__(self, base: IPAddressUI, octet_index: int): + self.oct = octet_index + self.parent = base + + def sanitize_octet(self, octet: str): + if octet == "" or octet.isdigit(): + if int(octet) < 256 and int(octet) >= 0: + return True + return False + + def __call__(self, e): + if e.value is int: + return + + new_val = str(e.value) + + if new_val == "": + ui.timer(0, lambda: e.sender.set_value(0), active=True, once=True) + self.parent.set_octet(self.oct, 0) + return + elif new_val.isdigit(): + if int(new_val) < 256 and int(new_val) >= 0: + self.parent.set_octet(self.oct, int(new_val)) + ui.timer( + 0, + lambda: e.sender.set_value(self.parent.get_octet(self.oct)), + active=True, + once=True, + ) + return + + ui.timer( + 0, + lambda: e.sender.set_value(e.previous_value), + active=True, + once=True, + ) + + +class PTUI: + def __init__(self, range: float, offset: float, max: float, name: str): + self.range = range + self.offset = offset + self.max = max + with ui.column().classes("w-full gap-1"): + ui.label(name).classes("pb-1 pl-2") + ui.separator() + ui.number(label="Range", value=range).props( + "filled dense" + ).bind_value(self, "range") + ui.number(label="Offset", value=offset).props( + "filled dense" + ).bind_value(self, "offset") + ui.number(label="Max", value=max).props("filled dense").bind_value( + self, "max" + ) + + def to_PT(self): + return PT(self.range, self.offset, self.max) + + +class TCUI: + def __init__(self, gain: int, name: str): + self.gain = gain + with ui.row().classes("w-full mx-auto no-wrap gap-1 items-center"): + ui.label(name).classes("min-w-10") + ui.select( + [1, 2, 4, 8, 16, 32, 64, 128], value=gain, label="Gain" + ).props("filled dense").classes("min-w-25").bind_value(self, "gain") + + def to_TC(self): + return TC(TCGain.from_int(self.gain)) + + +class ValveUI: + def __init__(self, enabled: bool, voltage: int, name: str): + self.voltage = voltage + self.enabled = enabled + with ui.column().classes("w-full gap-1"): + ui.label(name).classes("pb-1 pl-2") + ui.separator() + # ui.checkbox("Enabled") + ui.switch("Enabled", value=enabled).bind_value(self, "enabled") + ui.select([12, 24], value=voltage, label="Voltage").props( + "filled dense" + ).classes("min-w-25").bind_value(self, "voltage") + + def to_VLV(self): + if self.voltage == 12: + return VLV(ValveVoltage.V12, self.enabled) + elif self.voltage == 24: + return VLV(ValveVoltage.V24, self.enabled) + else: + raise Exception("Voltage must be either 12v or 24v") + + +class FCConfigUI: + def __init__(self): + self.PTs: list[PTUI] = [] + self.TCs: list[TCUI] = [] + self.valves: list[ValveUI] = [] + with ui.column().classes("h-full w-full"): + with ui.row().classes( + "w-full mx-auto no-wrap flex items-stretch h-full" + ): + # PTs + with ui.column().classes( + "border-1 p-2 border-gray-500 flex-1 basis-auto" + ): + with ui.row().classes("w-full mx-auto no-wrap gap-3"): + for x in range(NUM_FC_PTs): + self.PTs.append( + PTUI( + DEFAULT_PT_RANGE, + DEFAULT_PT_OFFSET, + DEFAULT_PT_MAX, + "PT " + str(x + 1), + ) + ) + # TCs + with ui.column().classes( + "border-1 p-2 border-gray-500 gap-3 flex-1 basis-auto justify-center" + ): + with ui.column().classes("w-full gap-3"): + for x in range(NUM_FC_TCs): + self.TCs.append( + TCUI(DEFAULT_TC_GAIN, "TC " + str(x + 1) + " ") + ) + # Valves + with ui.column().classes( + "border-1 p-2 border-gray-500 flex-1 basis-auto" + ): + with ui.row().classes("w-full mx-auto no-wrap gap-3"): + for x in range(NUM_FC_VLVs): + self.valves.append( + ValveUI( + DEFAULT_VALVE_ENABLED, + DEFAULT_VALVE_VOLTAGE, + "Valve " + str(x + 1), + ) + ) + with ui.row().classes( + "w-full mx-auto no-wrap flex items-stretch h-full border-1 p-2 border-gray-500" + ): + # IP Addresses + with ui.column().classes("w-full"): + with ui.column().classes("w-fit items-end"): + self.limewireIP = IPAddressUI( + DEFAULT_GSE_IP, "Limewire IP" + ) + self.FCIP = IPAddressUI( + DEFAULT_FC_IP, "Flight Computer IP" + ) + with ui.column().classes("w-full"): + with ui.column().classes("w-fit items-end"): + self.BB1IP = IPAddressUI( + DEFAULT_BB1_IP, "Bay Board 1 IP" + ) + self.BB2IP = IPAddressUI( + DEFAULT_BB2_IP, "Bay Board 2 IP" + ) + with ui.column().classes("w-full"): + with ui.column().classes("w-fit items-end"): + self.BB3IP = IPAddressUI( + DEFAULT_BB3_IP, "Bay Board 3 IP" + ) + self.FRIP = IPAddressUI( + DEFAULT_FR_IP, "Flight Recorder IP" + ) + + def restore_defaults(self): + for x in self.PTs: + x.range = DEFAULT_PT_RANGE + x.offset = DEFAULT_PT_OFFSET + x.max = DEFAULT_PT_MAX + for x in self.TCs: + x.gain = DEFAULT_TC_GAIN + for x in self.valves: + x.enabled = DEFAULT_VALVE_ENABLED + x.voltage = DEFAULT_VALVE_VOLTAGE + self.limewireIP.set_ip(DEFAULT_GSE_IP) + self.FCIP.set_ip(DEFAULT_FC_IP) + self.BB1IP.set_ip(DEFAULT_BB1_IP) + self.BB2IP.set_ip(DEFAULT_BB2_IP) + self.BB3IP.set_ip(DEFAULT_BB3_IP) + self.FRIP.set_ip(DEFAULT_FR_IP) + + +class BBConfigUI: + def __init__(self, num: int): + self.bb_num = num + self.PTs: list[PTUI] = [] + self.TCs: list[TCUI] = [] + self.valves: list[ValveUI] = [] + with ui.column().classes("h-full w-full"): + # PTs + with ui.row().classes( + "w-full mx-auto no-wrap flex items-stretch h-full" + ): + with ui.column().classes( + "border-1 p-2 border-gray-500 flex-1 basis-auto" + ): + with ui.row().classes("w-full mx-auto no-wrap gap-3"): + for x in range(NUM_BB_PTs): + self.PTs.append( + PTUI( + DEFAULT_PT_RANGE, + DEFAULT_PT_OFFSET, + DEFAULT_PT_MAX, + "PT " + str(x + 1), + ) + ) + with ui.row().classes( + "w-full mx-auto no-wrap flex items-stretch h-full" + ): + # TCs + with ui.column().classes( + "border-1 p-2 border-gray-500 gap-3 flex-1 basis-auto justify-center" + ): + with ui.row().classes("w-full mx-auto no-wrap"): + with ui.column().classes("w-full gap-3"): + for x in range(int(NUM_BB_TCs / 2)): + self.TCs.append( + TCUI( + DEFAULT_TC_GAIN, + "TC " + str(x + 1) + " ", + ) + ) + with ui.column().classes("w-full gap-3"): + for x in range(int(NUM_BB_TCs / 2), NUM_BB_TCs): + self.TCs.append( + TCUI( + DEFAULT_TC_GAIN, + "TC " + str(x + 1) + " ", + ) + ) + # Valves + with ui.column().classes( + "border-1 p-2 border-gray-500 flex-1 basis-auto" + ): + with ui.row().classes("w-full mx-auto no-wrap gap-3"): + for x in range(NUM_BB_VLVs): + self.valves.append( + ValveUI( + DEFAULT_VALVE_ENABLED, + DEFAULT_VALVE_VOLTAGE, + "Valve " + str(x + 1), + ) + ) + with ui.row().classes( + "w-full no-wrap justify-center h-full border-1 p-2 border-gray-500" + ): + # IP Addresses + ui.space() + if self.bb_num == 1: + self.BBIP = IPAddressUI(DEFAULT_BB1_IP, "Bay Board IP") + elif self.bb_num == 2: + self.BBIP = IPAddressUI(DEFAULT_BB2_IP, "Bay Board IP") + elif self.bb_num == 3: + self.BBIP = IPAddressUI(DEFAULT_BB3_IP, "Bay Board IP") + ui.space() + self.FCIP = IPAddressUI(DEFAULT_FC_IP, "Flight Computer IP") + ui.space() + + def restore_defaults(self): + for x in self.PTs: + x.range = DEFAULT_PT_RANGE + x.offset = DEFAULT_PT_OFFSET + x.max = DEFAULT_PT_MAX + for x in self.TCs: + x.gain = DEFAULT_TC_GAIN + for x in self.valves: + x.enabled = DEFAULT_VALVE_ENABLED + x.voltage = DEFAULT_VALVE_VOLTAGE + self.FCIP.set_ip(DEFAULT_FC_IP) + if self.bb_num == 1: + self.BBIP.set_ip(DEFAULT_BB1_IP) + elif self.bb_num == 2: + self.BBIP.set_ip(DEFAULT_BB2_IP) + elif self.bb_num == 3: + self.BBIP.set_ip(DEFAULT_BB3_IP) + + +class FRConfigUI: + def __init__(self): + with ui.column().classes("h-full w-full"): + with ui.row().classes( + "w-full no-wrap justify-center h-full border-1 p-2 border-gray-500" + ): + # IP Addresses + ui.space() + self.FRIP = IPAddressUI(DEFAULT_FR_IP, "Flight Recorder IP") + ui.space() + self.FCIP = IPAddressUI(DEFAULT_FC_IP, "Flight Computer IP") + ui.space() + + def restore_defaults(self): + self.FCIP.set_ip(DEFAULT_FC_IP) + self.FRIP.set_ip(DEFAULT_FR_IP) + + +class SystemConfigUI: + def __init__(self, parentUI, log_listener: EventLogListener): + self.log_listener = log_listener + self.base = parentUI + self.configure_ebox = False + self.configure_fc = False + self.configure_bb1 = False + self.configure_bb2 = False + self.configure_bb3 = False + self.configure_fr = False + self.ICD_config = None + self.ebox_loading = False + self.fc_loading = False + self.bb1_loading = False + self.bb2_loading = False + self.bb3_loading = False + self.fr_loading = False + with ui.card().classes( + "w-full bg-gray-900 border border-gray-700 p-6 h-full" + ): + with ui.row().classes("w-full mx-auto no-wrap"): + with ui.column().classes(): + ui.label("SYSTEM CONFIG").classes( + "text-xl font-bold text-white mb-4" + ) + self.ICD_file = ui.upload( + label="Load from ICD", on_upload=self.handle_ICD + ).props( + "accept=.xlsx no-thumbnails no-icon auto__false color=lime text-color=black" + ) + self.config_button = ui.button( + "Write Configuration", + color="orange", + on_click=self.warn_write_config, + ).classes("text-base w-full") + with ui.column().classes("gap-0 pl-10"): + self.all_select = ( + ui.checkbox() + .classes("h-11") + .on("click", self.handle_all_check) + ) + self.ebox_select = ( + ui.checkbox("EBox") + .classes("h-8") + .on("click", self.handle_device_select) + .bind_value(self, "configure_ebox") + .bind_enabled_from( + self, "ICD_config", backward=self.handle_ebox_select + ) + ) + self.fc_select = ( + ui.checkbox("Flight Computer") + .classes("h-8") + .on("click", self.handle_device_select) + .bind_value(self, "configure_fc") + ) + self.bb1_select = ( + ui.checkbox("Bay Board 1 (Press)") + .classes("h-8") + .on("click", self.handle_device_select) + .bind_value(self, "configure_bb1") + ) + self.bb2_select = ( + ui.checkbox("Bay Board 2 (Intertank)") + .classes("h-8") + .on("click", self.handle_device_select) + .bind_value(self, "configure_bb2") + ) + self.bb3_select = ( + ui.checkbox("Bay Board 3 (Engine)") + .classes("h-8") + .on("click", self.handle_device_select) + .bind_value(self, "configure_bb3") + ) + self.fr_select = ( + ui.checkbox("Flight Recorder") + .classes("h-8") + .on("click", self.handle_device_select) + .bind_value(self, "configure_fr") + .props("disable") + ) + ui.separator().classes("w-full h-1") + ui.label("Progress").classes("self-center text-lg") + with ui.row().classes( + "w-full mx-auto no-wrap gap-0 justify-between" + ): + with ui.column().classes("items-center gap-0") as ebox_prog: + with ui.tooltip() as ebox_tooltip: + self.ebox_prog_tooltip = ( + ui.html(sanitize=False) + .classes("text-[12px]") + .style( + """display:inline-block;width:max-content;white-space:normal;max-width:100em;""" + ) + ) + self.ebox_prog_tooltip.set_visibility(False) + ebox_tooltip.bind_visibility_from( + self.ebox_prog_tooltip, "visible" + ) + self.ebox_progress_tool = ebox_prog + ui.label("EBox").classes("text-sm") + with ui.element().classes("relative w-8 h-8 mt-2"): + self.ebox_progress = ( + ui.icon("", size="2em") + .classes("absolute inset-0 flex m-auto") + .bind_visibility_from( + self, "ebox_loading", backward=lambda v: not v + ) + ) + ui.spinner(color="white", size="2em").classes( + "absolute inset-0 m-auto" + ).bind_visibility_from(self, "ebox_loading") + with ui.column().classes("items-center gap-0") as fc_prog: + with ui.tooltip() as fc_tooltip: + self.fc_prog_tooltip = ( + ui.html(sanitize=False) + .classes("text-[12px]") + .style( + """display:inline-block;width:max-content;white-space:normal;max-width:100em;""" + ) + ) + self.fc_prog_tooltip.set_visibility(False) + fc_tooltip.bind_visibility_from( + self.fc_prog_tooltip, "visible" + ) + self.fc_progress_tool = fc_prog + ui.label("Flight Computer").classes("text-sm") + with ui.element().classes("relative w-8 h-8 mt-2"): + self.fc_progress = ( + ui.icon("", size="2em") + .classes("absolute inset-0 flex m-auto") + .bind_visibility_from( + self, "fc_loading", backward=lambda v: not v + ) + ) + ui.spinner(color="white", size="2em").classes( + "absolute inset-0 m-auto" + ).bind_visibility_from(self, "fc_loading") + with ui.column().classes("items-center gap-0") as bb1_prog: + with ui.tooltip() as bb1_tooltip: + self.bb1_prog_tooltip = ( + ui.html(sanitize=False) + .classes("text-[12px]") + .style( + """display:inline-block;width:max-content;white-space:normal;max-width:100em;""" + ) + ) + self.bb1_prog_tooltip.set_visibility(False) + bb1_tooltip.bind_visibility_from( + self.bb1_prog_tooltip, "visible" + ) + self.bb1_progress_tool = bb1_prog + ui.label("Bay Board 1").classes("text-sm") + with ui.element().classes("relative w-8 h-8 mt-2"): + self.bb1_progress = ( + ui.icon("", size="2em") + .classes("absolute inset-0 flex m-auto") + .bind_visibility_from( + self, "bb1_loading", backward=lambda v: not v + ) + ) + ui.spinner(color="white", size="2em").classes( + "absolute inset-0 m-auto" + ).bind_visibility_from(self, "bb1_loading") + with ui.column().classes("items-center gap-0") as bb2_prog: + with ui.tooltip() as bb2_tooltip: + self.bb2_prog_tooltip = ( + ui.html(sanitize=False) + .classes("text-[12px]") + .style( + """display:inline-block;width:max-content;white-space:normal;max-width:100em;""" + ) + ) + self.bb2_prog_tooltip.set_visibility(False) + bb2_tooltip.bind_visibility_from( + self.bb2_prog_tooltip, "visible" + ) + self.bb2_progress_tool = bb2_prog + ui.label("Bay Board 2").classes("text-sm") + with ui.element().classes("relative w-8 h-8 mt-2"): + self.bb2_progress = ( + ui.icon("", size="2em") + .classes("absolute inset-0 flex m-auto") + .bind_visibility_from( + self, "bb2_loading", backward=lambda v: not v + ) + ) + ui.spinner(color="white", size="2em").classes( + "absolute inset-0 m-auto" + ).bind_visibility_from(self, "bb2_loading") + with ui.column().classes("items-center gap-0") as bb3_prog: + with ui.tooltip() as bb3_tooltip: + self.bb3_prog_tooltip = ( + ui.html(sanitize=False) + .classes("text-[12px]") + .style( + """display:inline-block;width:max-content;white-space:normal;max-width:100em;""" + ) + ) + self.bb3_prog_tooltip.set_visibility(False) + bb3_tooltip.bind_visibility_from( + self.bb3_prog_tooltip, "visible" + ) + self.bb3_progress_tool = bb3_prog + ui.label("Bay Board 3").classes("text-sm") + with ui.element().classes("relative w-8 h-8 mt-2"): + self.bb3_progress = ( + ui.icon("", size="2em") + .classes("absolute inset-0 flex m-auto") + .bind_visibility_from( + self, "bb3_loading", backward=lambda v: not v + ) + ) + ui.spinner(color="white", size="2em").classes( + "absolute inset-0 m-auto" + ).bind_visibility_from(self, "bb3_loading") + with ui.column().classes("items-center gap-0") as fr_prog: + with ui.tooltip() as fr_tooltip: + self.fr_prog_tooltip = ( + ui.html(sanitize=False) + .classes("text-[12px]") + .style( + """display:inline-block;width:max-content;white-space:normal;max-width:100em;""" + ) + ) + self.fr_prog_tooltip.set_visibility(False) + fr_tooltip.bind_visibility_from( + self.fr_prog_tooltip, "visible" + ) + self.fr_progress_tool = fr_prog + ui.label("Flight Recorder").classes("text-sm") + with ui.element().classes("relative w-8 h-8 mt-2"): + self.fr_progress = ( + ui.icon("", size="2em") + .classes("absolute inset-0 flex m-auto") + .bind_visibility_from( + self, "fr_loading", backward=lambda v: not v + ) + ) + ui.spinner(color="white", size="2em").classes( + "absolute inset-0 m-auto" + ).bind_visibility_from(self, "fr_loading") + + def update_progress_icon(self, icon: ui.icon, v: int): + icon.props(f'color="{progress_lookup[v]["color"]}"') + icon.name = progress_lookup[v]["name"] + + def handle_all_check(self, e): + if self.ICD_config is not None: + self.ebox_select.set_value(e.sender.value) + self.fc_select.set_value(e.sender.value) + self.bb1_select.set_value(e.sender.value) + self.bb2_select.set_value(e.sender.value) + self.bb3_select.set_value(e.sender.value) + # self.fr_select.set_value(e.sender.value) + + def handle_device_select(self, e): + if ( + self.ebox_select.value + and self.fc_select.value + and self.bb1_select.value + and self.bb2_select.value + and self.bb3_select.value + and self.fr_select.value + ): + self.all_select.set_value(True) + else: + self.all_select.set_value(False) + + async def handle_ICD(self, e): + self.ICD_config = None + try: + self.ICD_config = ICD(await e.file.read(), e.file.name) + except (ValueError, KeyError) as e: + print("Error while processing ICD") + traceback.print_exception(type(e), e, e.__traceback__) + with ( + ui.dialog().props("persistent") as dialog, + ui.card().classes( + "bg-[#990000] w-250 h-40 flex flex-col justify-center items-center" + ), + ): + ui.button( + icon="close", on_click=lambda e: dialog.close() + ).classes("absolute right-0 top-0 bg-transparent").props( + 'flat color="white" size="lg"' + ) + ui.label("ICD processing error:").classes("text-xl") + ui.space().classes("h-1 w-full") + ui.label(str(e)).classes("text-base") + dialog.open() + return + except ICDException as e: + print("Value error while processing ICD") + traceback.print_exception(type(e), e, e.__traceback__) + with ( + ui.dialog().props("persistent").classes("") as dialog, + ui.card().classes( + "bg-[#990000] max-w-[90vw] h-60 flex flex-col justify-center items-center" + ), + ): + with ui.column().classes("items-center"): + ui.button( + icon="close", on_click=lambda e: dialog.close() + ).classes("absolute right-0 top-0 bg-transparent").props( + 'flat color="white" size="lg"' + ) + ui.label(f"{e.type}:").classes("text-xl") + ui.space().classes("h-10 w-full") + with ui.scroll_area().classes("max-w-[80vw] w-[80vw] h-25"): + with ui.row().classes( + "w-full no-wrap overflow-x-auto overflow-scroll" + ): + for k, v in e.row.items(): + with ui.column().classes( + "whitespace-nowrap overflow-x-auto overflow-y-hidden items-center" + ): + ui.label(str(k)).classes( + "bold whitespace-nowrap overflow-x-auto overflow-y-hidden" + ) + ui.label(str(v)).classes( + "whitespace-nowrap overflow-x-auto overflow-y-hidden" + ) + dialog.open() + return + self.process_board_channels() + + def process_board_channels(self): + fc_board_ui: FCConfigUI = self.base.FC_config + bb1_board_ui: BBConfigUI = self.base.BB1_config + bb2_board_ui: BBConfigUI = self.base.BB2_config + bb3_board_ui: BBConfigUI = self.base.BB3_config + fr_board_ui: FRConfigUI = self.base.FR_config + + fc_vlvs_configured = [] + bb1_vlvs_configured = [] + bb2_vlvs_configured = [] + bb3_vlvs_configured = [] + + for x in self.ICD_config.fc_channels: + if x["type"] == "PT": + fc_board_ui.PTs[x["channel"] - 1].range = x["range"] + fc_board_ui.PTs[x["channel"] - 1].offset = x["offset"] + fc_board_ui.PTs[x["channel"] - 1].max = x["max_voltage"] + elif x["type"] == "TC": + fc_board_ui.TCs[x["channel"] - 1].gain = x["gain"] + elif x["type"] == "VLV": + fc_board_ui.valves[x["channel"] - 1].voltage = x["voltage"] + fc_board_ui.valves[x["channel"] - 1].enabled = True + fc_vlvs_configured.append(x["channel"] - 1) + for x in range(NUM_FC_VLVs): + if x not in fc_vlvs_configured: + fc_board_ui.valves[x].enabled = False + + for x in self.ICD_config.bb1_channels: + if x["type"] == "PT": + bb1_board_ui.PTs[x["channel"] - 1].range = x["range"] + bb1_board_ui.PTs[x["channel"] - 1].offset = x["offset"] + bb1_board_ui.PTs[x["channel"] - 1].max = x["max_voltage"] + elif x["type"] == "TC": + bb1_board_ui.TCs[x["channel"] - 1].gain = x["gain"] + elif x["type"] == "VLV": + bb1_board_ui.valves[x["channel"] - 1].voltage = x["voltage"] + bb1_board_ui.valves[x["channel"] - 1].enabled = True + bb1_vlvs_configured.append(x["channel"] - 1) + for x in range(NUM_BB_VLVs): + if x not in bb1_vlvs_configured: + bb1_board_ui.valves[x].enabled = False + + for x in self.ICD_config.bb2_channels: + if x["type"] == "PT": + bb2_board_ui.PTs[x["channel"] - 1].range = x["range"] + bb2_board_ui.PTs[x["channel"] - 1].offset = x["offset"] + bb2_board_ui.PTs[x["channel"] - 1].max = x["max_voltage"] + elif x["type"] == "TC": + bb2_board_ui.TCs[x["channel"] - 1].gain = x["gain"] + elif x["type"] == "VLV": + bb2_board_ui.valves[x["channel"] - 1].voltage = x["voltage"] + bb2_board_ui.valves[x["channel"] - 1].enabled = True + bb2_vlvs_configured.append(x["channel"] - 1) + for x in range(NUM_BB_VLVs): + if x not in bb2_vlvs_configured: + bb2_board_ui.valves[x].enabled = False + + for x in self.ICD_config.bb3_channels: + if x["type"] == "PT": + bb3_board_ui.PTs[x["channel"] - 1].range = x["range"] + bb3_board_ui.PTs[x["channel"] - 1].offset = x["offset"] + bb3_board_ui.PTs[x["channel"] - 1].max = x["max_voltage"] + elif x["type"] == "TC": + bb3_board_ui.TCs[x["channel"] - 1].gain = x["gain"] + elif x["type"] == "VLV": + bb3_board_ui.valves[x["channel"] - 1].voltage = x["voltage"] + bb3_board_ui.valves[x["channel"] - 1].enabled = True + bb3_vlvs_configured.append(x["channel"] - 1) + for x in range(NUM_BB_VLVs): + if x not in bb3_vlvs_configured: + bb3_board_ui.valves[x].enabled = False + + for x in self.ICD_config.ips: + if x["device"] == "DAQ PC": + fc_board_ui.limewireIP.set_ip(ipaddress.IPv4Address(x["ip"])) + elif x["device"] == "Flight Computer": + fc_board_ui.FCIP.set_ip(ipaddress.IPv4Address(x["ip"])) + bb1_board_ui.FCIP.set_ip(ipaddress.IPv4Address(x["ip"])) + bb2_board_ui.FCIP.set_ip(ipaddress.IPv4Address(x["ip"])) + bb3_board_ui.FCIP.set_ip(ipaddress.IPv4Address(x["ip"])) + fr_board_ui.FCIP.set_ip(ipaddress.IPv4Address(x["ip"])) + elif x["device"] == "Press Bay Board": + fc_board_ui.BB1IP.set_ip(ipaddress.IPv4Address(x["ip"])) + bb1_board_ui.BBIP.set_ip(ipaddress.IPv4Address(x["ip"])) + elif x["device"] == "Intertank Bay Board": + fc_board_ui.BB2IP.set_ip(ipaddress.IPv4Address(x["ip"])) + bb2_board_ui.BBIP.set_ip(ipaddress.IPv4Address(x["ip"])) + elif x["device"] == "Engine Bay Board": + fc_board_ui.BB3IP.set_ip(ipaddress.IPv4Address(x["ip"])) + bb3_board_ui.BBIP.set_ip(ipaddress.IPv4Address(x["ip"])) + elif x["device"] == "Flight Recorder": + fc_board_ui.FRIP.set_ip(ipaddress.IPv4Address(x["ip"])) + fr_board_ui.FRIP.set_ip(ipaddress.IPv4Address(x["ip"])) + + def reset_progress_indicators(self): + self.update_progress_icon(self.ebox_progress, 0) + self.update_progress_icon(self.fc_progress, 0) + self.update_progress_icon(self.bb1_progress, 0) + self.update_progress_icon(self.bb2_progress, 0) + self.update_progress_icon(self.bb3_progress, 0) + self.update_progress_icon(self.fr_progress, 0) + + self.ebox_prog_tooltip.set_visibility(False) + self.fc_prog_tooltip.set_visibility(False) + self.bb1_prog_tooltip.set_visibility(False) + self.bb2_prog_tooltip.set_visibility(False) + self.bb3_prog_tooltip.set_visibility(False) + self.fr_prog_tooltip.set_visibility(False) + + async def start_config_write(self): + print("Starting configuration") + + # Make a copy of everything since this will take a while and we don't want things + # changing mid configuration + config_ebox = self.configure_ebox + config_fc = self.configure_fc + config_bb1 = self.configure_bb1 + config_bb2 = self.configure_bb2 + config_bb3 = self.configure_bb3 + config_fr = self.configure_fr + + gse_channels = None + ICD_name = "" + if self.ICD_config is not None: + gse_channels = copy.deepcopy(self.ICD_config.ebox_channels) + ICD_name = self.ICD_config.name + + fc_board_ui: FCConfigUI = self.base.FC_config + bb1_board_ui: BBConfigUI = self.base.BB1_config + bb2_board_ui: BBConfigUI = self.base.BB2_config + bb3_board_ui: BBConfigUI = self.base.BB3_config + fr_board_ui: FRConfigUI = self.base.FR_config + + fc_tftp = self.base.FC_TFTP_IP.ip + bb1_tftp = self.base.BB1_TFTP_IP.ip + bb2_tftp = self.base.BB2_TFTP_IP.ip + bb3_tftp = self.base.BB3_TFTP_IP.ip + fr_tftp = self.base.FR_TFTP_IP.ip + + fc_PTs = [pt.to_PT() for pt in fc_board_ui.PTs] + fc_TCs = [tc.to_TC() for tc in fc_board_ui.TCs] + fc_VLVs = [vlv.to_VLV() for vlv in fc_board_ui.valves] + fc_GSEIP = fc_board_ui.limewireIP.ip + fc_FCIP = fc_board_ui.FCIP.ip + fc_BB1IP = fc_board_ui.BB1IP.ip + fc_BB2IP = fc_board_ui.BB2IP.ip + fc_BB3IP = fc_board_ui.BB3IP.ip + fc_FRIP = fc_board_ui.FRIP.ip + + bb1_PTs = [pt.to_PT() for pt in bb1_board_ui.PTs] + bb1_TCs = [tc.to_TC() for tc in bb1_board_ui.TCs] + bb1_VLVs = [vlv.to_VLV() for vlv in bb1_board_ui.valves] + bb1_BBIP = bb1_board_ui.BBIP.ip + bb1_FCIP = bb1_board_ui.FCIP.ip + + bb2_PTs = [pt.to_PT() for pt in bb2_board_ui.PTs] + bb2_TCs = [tc.to_TC() for tc in bb2_board_ui.TCs] + bb2_VLVs = [vlv.to_VLV() for vlv in bb2_board_ui.valves] + bb2_BBIP = bb2_board_ui.BBIP.ip + bb2_FCIP = bb2_board_ui.FCIP.ip + + bb3_PTs = [pt.to_PT() for pt in bb3_board_ui.PTs] + bb3_TCs = [tc.to_TC() for tc in bb3_board_ui.TCs] + bb3_VLVs = [vlv.to_VLV() for vlv in bb3_board_ui.valves] + bb3_BBIP = bb3_board_ui.BBIP.ip + bb3_FCIP = bb3_board_ui.FCIP.ip + + fr_FCIP = fr_board_ui.FCIP.ip + fr_FRIP = fr_board_ui.FRIP.ip + + # time to actually configure + self.reset_progress_indicators() + + if config_ebox: + print("Configuring EBox") + if gse_channels is None: + print("ICD not loaded, skipping EBox") + self.set_progress( + self.ebox_progress, + self.ebox_prog_tooltip, + "ICD not loaded", + False, + ) + else: + self.ebox_loading = True + self.set_in_progress( + self.ebox_prog_tooltip, + "Configuring, this may take a while...", + ) + (result, msg) = await run.cpu_bound( + configure_ebox, gse_channels + ) + tooltip_msg = f"Used ICD '{ICD_name}'" + ( + f"

{msg}" if not result else "" + ) + self.set_progress( + self.ebox_progress, + self.ebox_prog_tooltip, + tooltip_msg, + result, + ) + self.ebox_loading = False + print( + "EBox configured!" + if result + else f"Failed to configure EBox: {msg}" + ) + print("") + + if config_fc: + print("Configuring Flight Computer") + self.fc_loading = True + eeprom_future = None + if self.log_listener is not None: + eeprom_future = await self.log_listener.setup_future() + self.set_board_tftp_in_progress(self.fc_prog_tooltip) + try: + await run.cpu_bound( + configure_fc, + fc_PTs, + fc_TCs, + fc_VLVs, + fc_GSEIP, + fc_FCIP, + fc_BB1IP, + fc_BB2IP, + fc_BB3IP, + fc_FRIP, + fc_tftp, + ) + self.set_board_config_in_progress(self.fc_prog_tooltip) + print( + "Flight Computer config successfully sent, waiting for response." + ) + if eeprom_future is not None: + future_msg = "" + future_result = False + try: + log: FirmwareLog = await asyncio.wait_for( + eeprom_future, timeout=EEPROM_RESPONSE_TIMEOUT + ) + if log.status_code % 1000 == 705: + future_result = True + future_msg = log.message + if log.board != Board.FC: + future_result = False + future_msg = ( + "A successful response came from the wrong board: " + + log.board.pretty_name + ) + print( + "Successful eeprom came from the wrong board: " + + log.board.pretty_name + ) + else: + print( + "Successful response from the Flight Computer" + ) + else: + future_msg = log.message + print( + "Flight Computer config failed: " + log.message + ) + except asyncio.CancelledError as err: + future_msg = ( + "Error: configuration was started from a different source, " + + str(err) + ) + print("EEPROM config future was cancelled") + except TimeoutError: + future_msg = "Timed out waiting for response" + print( + "Flight Computer config timed out waiting for UDP response" + ) + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{future_msg}" + self.set_progress( + self.fc_progress, + self.fc_prog_tooltip, + tooltip_msg, + future_result, + ) + except Exception as err: + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{str(err)}" + self.set_progress( + self.fc_progress, self.fc_prog_tooltip, tooltip_msg, False + ) + print( + f"Failed to send Flight Computer EEPROM config over TFTP: {str(err)}" + ) + self.fc_loading = False + print("") + + if config_bb1: + print("Configuring Bay Board 1") + self.bb1_loading = True + eeprom_future = None + if self.log_listener is not None: + eeprom_future = await self.log_listener.setup_future() + self.set_board_tftp_in_progress(self.bb1_prog_tooltip) + try: + await run.cpu_bound( + configure_bb, + 1, + bb1_PTs, + bb1_TCs, + bb1_VLVs, + bb1_FCIP, + bb1_BBIP, + bb1_tftp, + ) + self.set_board_config_in_progress(self.bb1_prog_tooltip) + print( + "Bay Board 1 config successfully sent, waiting for response." + ) + if eeprom_future is not None: + future_msg = "" + future_result = False + try: + log: FirmwareLog = await asyncio.wait_for( + eeprom_future, timeout=EEPROM_RESPONSE_TIMEOUT + ) + if log.status_code % 1000 == 705: + future_result = True + future_msg = log.message + if log.board != Board.BB1: + future_result = False + future_msg = ( + "A successful response came from the wrong board: " + + log.board.pretty_name + ) + print( + "Successful eeprom came from the wrong board: " + + log.board.pretty_name + ) + else: + print("Successful response from Bay Board 1") + else: + future_msg = log.message + print("Bay Board 1 config failed: " + log.message) + except asyncio.CancelledError as err: + future_msg = ( + "Error: configuration was started from a different source, " + + str(err) + ) + print("EEPROM config future was cancelled") + except TimeoutError: + future_msg = "Timed out waiting for response" + print( + "Bay Board 1 config timed out waiting for UDP response" + ) + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{future_msg}" + self.set_progress( + self.bb1_progress, + self.bb1_prog_tooltip, + tooltip_msg, + future_result, + ) + except Exception as err: + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{str(err)}" + self.set_progress( + self.bb1_progress, self.bb1_prog_tooltip, tooltip_msg, False + ) + print( + f"Failed to send Bay Board 1 EEPROM config over TFTP: {str(err)}" + ) + self.bb1_loading = False + print("") + + if config_bb2: + print("Configuring Bay Board 2") + self.bb2_loading = True + eeprom_future = None + if self.log_listener is not None: + eeprom_future = await self.log_listener.setup_future() + self.set_board_tftp_in_progress(self.bb2_prog_tooltip) + try: + await run.cpu_bound( + configure_bb, + 2, + bb2_PTs, + bb2_TCs, + bb2_VLVs, + bb2_FCIP, + bb2_BBIP, + bb2_tftp, + ) + self.set_board_config_in_progress(self.bb2_prog_tooltip) + print( + "Bay Board 2 config successfully sent, waiting for response." + ) + if eeprom_future is not None: + future_msg = "" + future_result = False + try: + log: FirmwareLog = await asyncio.wait_for( + eeprom_future, timeout=EEPROM_RESPONSE_TIMEOUT + ) + if log.status_code % 1000 == 705: + future_result = True + future_msg = log.message + if log.board != Board.BB2: + future_result = False + future_msg = ( + "A successful response came from the wrong board: " + + log.board.pretty_name + ) + print( + "Successful eeprom came from the wrong board: " + + log.board.pretty_name + ) + else: + print("Successful response from Bay Board 2") + else: + future_msg = log.message + print("Bay Board 2 config failed: " + log.message) + except asyncio.CancelledError as err: + future_msg = ( + "Error: configuration was started from a different source, " + + str(err) + ) + print("EEPROM config future was cancelled") + except TimeoutError: + future_msg = "Timed out waiting for response" + print( + "Bay Board 2 config timed out waiting for UDP response" + ) + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{future_msg}" + self.set_progress( + self.bb2_progress, + self.bb2_prog_tooltip, + tooltip_msg, + future_result, + ) + except Exception as err: + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{str(err)}" + self.set_progress( + self.bb2_progress, self.bb2_prog_tooltip, tooltip_msg, False + ) + print( + f"Failed to send Bay Board 2 EEPROM config over TFTP: {str(err)}" + ) + self.bb2_loading = False + print("") + + if config_bb3: + print("Configuring Bay Board 3") + self.bb3_loading = True + eeprom_future = None + if self.log_listener is not None: + eeprom_future = await self.log_listener.setup_future() + self.set_board_tftp_in_progress(self.bb3_prog_tooltip) + try: + await run.cpu_bound( + configure_bb, + 3, + bb3_PTs, + bb3_TCs, + bb3_VLVs, + bb3_FCIP, + bb3_BBIP, + bb3_tftp, + ) + self.set_board_config_in_progress(self.bb3_prog_tooltip) + print( + "Bay Board 3 config successfully sent, waiting for response." + ) + if eeprom_future is not None: + future_msg = "" + future_result = False + try: + log: FirmwareLog = await asyncio.wait_for( + eeprom_future, timeout=EEPROM_RESPONSE_TIMEOUT + ) + if log.status_code % 1000 == 705: + future_result = True + future_msg = log.message + if log.board != Board.BB3: + future_result = False + future_msg = ( + "A successful response came from the wrong board: " + + log.board.pretty_name + ) + print( + "Successful eeprom came from the wrong board: " + + log.board.pretty_name + ) + else: + print("Successful response from Bay Board 3") + else: + future_msg = log.message + print("Bay Board 3 config failed: " + log.message) + except asyncio.CancelledError as err: + future_msg = ( + "Error: configuration was started from a different source, " + + str(err) + ) + print("EEPROM config future was cancelled") + except TimeoutError: + future_msg = "Timed out waiting for response" + print( + "Bay Board 3 config timed out waiting for UDP response" + ) + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{future_msg}" + self.set_progress( + self.bb3_progress, + self.bb3_prog_tooltip, + tooltip_msg, + future_result, + ) + except Exception as err: + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{str(err)}" + self.set_progress( + self.bb3_progress, self.bb3_prog_tooltip, tooltip_msg, False + ) + print( + f"Failed to send Bay Board 3 EEPROM config over TFTP: {str(err)}" + ) + self.bb3_loading = False + print("") + + if config_fr: + print("Configuring Flight Recorder") + self.fr_loading = True + eeprom_future = None + if self.log_listener is not None: + eeprom_future = await self.log_listener.setup_future() + self.set_board_tftp_in_progress(self.fr_prog_tooltip) + try: + await run.cpu_bound(configure_fr, fr_FCIP, fr_FRIP, fr_tftp) + self.set_board_config_in_progress(self.fr_prog_tooltip) + print( + "Flight Recorder config successfully sent, waiting for response." + ) + if eeprom_future is not None: + future_msg = "" + future_result = False + try: + log: FirmwareLog = await asyncio.wait_for( + eeprom_future, timeout=EEPROM_RESPONSE_TIMEOUT + ) + if log.status_code % 1000 == 705: + future_result = True + future_msg = log.message + if log.board != Board.FR: + future_result = False + future_msg = ( + "A successful response came from the wrong board: " + + log.board.pretty_name + ) + print( + "Successful eeprom came from the wrong board: " + + log.board.pretty_name + ) + else: + print( + "Successful response from the Flight Recorder" + ) + else: + future_msg = log.message + print( + "Flight Recorder config failed: " + log.message + ) + except asyncio.CancelledError as err: + future_msg = ( + "Error: configuration was started from a different source, " + + str(err) + ) + print("EEPROM config future was cancelled") + except TimeoutError: + future_msg = "Timed out waiting for response" + print( + "Flight Recorder config timed out waiting for UDP response" + ) + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{future_msg}" + self.set_progress( + self.fr_progress, + self.fr_prog_tooltip, + tooltip_msg, + future_result, + ) + except Exception as err: + tooltip_msg = "" + if gse_channels is not None: + tooltip_msg = f"Used ICD '{ICD_name}'
" + tooltip_msg += f"
{str(err)}" + self.set_progress( + self.fr_progress, self.fr_prog_tooltip, tooltip_msg, False + ) + print( + f"Failed to send Flight Recorder EEPROM config over TFTP: {str(err)}" + ) + self.fr_loading = False + print("") + + print("System configuration done!\n") + + def set_progress( + self, icon: ui.icon, tooltip: ui.html, text: str, result: bool + ): + if result: + self.update_progress_icon(icon, 1) + else: + self.update_progress_icon(icon, -1) + now = datetime.datetime.now() + tooltip.set_content( + f"{now.strftime('%b %d, %Y %I:%M:%S %p')}
{text}" + ) + tooltip.set_visibility(True) + + def set_board_config_in_progress(self, tooltip: ui.html): + now = datetime.datetime.now() + tooltip.set_content( + f"{now.strftime('%b %d, %Y %I:%M:%S %p')}
Config sent, waiting for response." + ) + tooltip.set_visibility(True) + + def set_board_tftp_in_progress(self, tooltip: ui.html): + now = datetime.datetime.now() + tooltip.set_content( + f"{now.strftime('%b %d, %Y %I:%M:%S %p')}
Sending config..." + ) + tooltip.set_visibility(True) + + def set_in_progress(self, tooltip: ui.html, msg: str): + now = datetime.datetime.now() + tooltip.set_content(f"{now.strftime('%b %d, %Y %I:%M:%S %p')}
{msg}") + tooltip.set_visibility(True) + + def handle_ebox_select(self, v): + if v is None: + self.configure_ebox = False + return v is not None + + def warn_write_config(self, e): + with ( + ui.dialog() as dialog, + ui.card().classes( + "w-100 h-30 flex flex-col justify-center items-center" + ), + ): + ui.button(icon="close", on_click=lambda e: dialog.close()).classes( + "absolute right-0 top-0 bg-transparent" + ).props('flat color="white" size="lg"') + ui.label("Confirm write configuration?").classes("text-xl") + ui.button("Confirm", on_click=lambda: self.on_config_warn(dialog)) + dialog.open() + + async def on_config_warn(self, dialog): + dialog.close() + self.config_button.set_enabled(False) + await self.start_config_write() + self.config_button.set_enabled(True) diff --git a/src/hydrant/resources/favicon.ico b/src/hydrant/resources/favicon.ico new file mode 100644 index 0000000..999d883 Binary files /dev/null and b/src/hydrant/resources/favicon.ico differ diff --git a/src/hydrant/resources/lebron.png b/src/hydrant/resources/lebron.png new file mode 100644 index 0000000..e97e8fe Binary files /dev/null and b/src/hydrant/resources/lebron.png differ diff --git a/src/hydrant/resources/lebron_shoot.jpg b/src/hydrant/resources/lebron_shoot.jpg new file mode 100644 index 0000000..6d5a945 Binary files /dev/null and b/src/hydrant/resources/lebron_shoot.jpg differ diff --git a/src/hydrant/resources/limelight-mp.ico b/src/hydrant/resources/limelight-mp.ico new file mode 100644 index 0000000..999d883 Binary files /dev/null and b/src/hydrant/resources/limelight-mp.ico differ diff --git a/src/hydrant/resources/limelight-mp.png b/src/hydrant/resources/limelight-mp.png new file mode 100644 index 0000000..1a61e35 Binary files /dev/null and b/src/hydrant/resources/limelight-mp.png differ diff --git a/src/hydrant/resources/limelight-mp32.ico b/src/hydrant/resources/limelight-mp32.ico new file mode 100644 index 0000000..916ac41 Binary files /dev/null and b/src/hydrant/resources/limelight-mp32.ico differ diff --git a/src/lmp/firmware_log.py b/src/lmp/firmware_log.py new file mode 100644 index 0000000..2488086 --- /dev/null +++ b/src/lmp/firmware_log.py @@ -0,0 +1,82 @@ +import ipaddress +from datetime import datetime, timezone + +from lmp import Board + + +class FirmwareLog: + """A firmware log message sent over UDP.""" + + timestamp: datetime # UTC timestamp + board: Board + status_code: int + message: str + ip: ipaddress.IPv4Address + + def __init__( + self, timestamp: datetime, board: Board, status_code: int, message: str + ): + if status_code is not None and status_code > 9999: + raise ValueError(f"Invalid status code {status_code}") + + if board is not None and status_code is not None: + board_matches_status_code = status_code // 1000 == board.value + if not board_matches_status_code: + raise ValueError( + f"Status code {status_code} isn't from board {board.name}" + ) + + self.timestamp = timestamp + self.board = board + self.status_code = status_code + self.message = message + + @classmethod + def from_bytes(cls, log_bytes: bytes): + """Construct a FirmwareLog by parsing log_bytes.""" + try: + log_str = log_bytes.decode().strip() + except UnicodeDecodeError: + raise ValueError("Message not ASCII-encoded.") + + timestamp = None # default if no timestamp is given + try: + timestamp_str = log_str[:24] + timestamp = datetime.strptime( + timestamp_str, "%Y-%m-%dT%H:%M:%S.%fZ" + ).replace(tzinfo=timezone.utc) + log_str = log_str[24:] + if log_str[0] == " ": + log_str = log_str[1:] + except Exception: + pass # It's ok to not have a timestamp, some logs will not + + code = None # default if no error code is included + try: + code_str = log_str[:4] + if not code_str.isdigit(): + raise ValueError() + code = int(code_str) + log_str = log_str[4:] + if log_str[0] == " ": + log_str = log_str[1:] + except Exception: + pass + + board = None + if code is not None: + board = Board(code // 1000) + + return cls(timestamp, board, code, log_str) + + def __str__(self): + return f"{{Timestamp: {self.timestamp}, Board: {self.board}, Code: {self.status_code}, Msg: '{self.message}'}}" + + def to_log(self): + time_str = None + if self.timestamp is not None: + timestamp = self.timestamp + local_zone = datetime.now(timezone.utc).astimezone().tzinfo + timestamp = timestamp.astimezone(local_zone) + time_str = timestamp.strftime("%b %d, %Y %I:%M:%S.%f %p %Z") + return f"'{self.message}', Timestamp: {time_str}, Code: {self.status_code}, Board: {self.board.pretty_name if self.board is not None else None}" diff --git a/src/lmp/util.py b/src/lmp/util.py index 62d038e..524d724 100644 --- a/src/lmp/util.py +++ b/src/lmp/util.py @@ -64,6 +64,18 @@ def index_channel(self) -> str: def __str__(self) -> str: return repr(self).removeprefix(f"{self.__class__.__name__}.") + @property + def pretty_name(self) -> str: + """The pretty name for this board, used for Hydrant UIs""" + P_NAMES = { + Board.FC: "Flight Computer", + Board.BB1: "Bay Board 1 (Press)", + Board.BB2: "Bay Board 2 (Intertank)", + Board.BB3: "Bay Board 3 (Engine)", + Board.FR: "Flight Recorder", + } + return P_NAMES[self] + class Valve: """A class to represent a valve on the rocket.""" diff --git a/uv.lock b/uv.lock index 4dc553a..6b4fda7 100644 --- a/uv.lock +++ b/uv.lock @@ -353,6 +353,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/11/a8/c6a4b901d17399c77cd81fb001ce8961e9f5e04d3daf27e8925cb012e163/docutils-0.22.3-py3-none-any.whl", hash = "sha256:bd772e4aca73aff037958d44f2be5229ded4c09927fcf8690c577b66234d6ceb", size = 633032, upload-time = "2025-11-06T02:35:52.391Z" }, ] +[[package]] +name = "et-xmlfile" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" }, +] + [[package]] name = "fastapi" version = "0.121.0" @@ -717,6 +726,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/8f/8f6f491d595a9e5912971f3f863d81baddccc8a4d0c3749d6a0dd9ffc9df/kiwisolver-1.4.9-cp313-cp313t-win_arm64.whl", hash = "sha256:0749fd8f4218ad2e851e11cc4dc05c7cbc0cbc4267bdfdb31782e65aace4ee9c", size = 68646, upload-time = "2025-08-10T21:27:00.52Z" }, ] +[[package]] +name = "libsass" +version = "0.23.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/79/b4/ab091585eaa77299558e3289ca206846aefc123fb320b5656ab2542c20ad/libsass-0.23.0.tar.gz", hash = "sha256:6f209955ede26684e76912caf329f4ccb57e4a043fd77fe0e7348dd9574f1880", size = 316068, upload-time = "2024-01-06T18:53:05.404Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/13/fc1bea1de880ca935137183727c7d4dd921c4128fc08b8ddc3698ba5a8a3/libsass-0.23.0-cp38-abi3-macosx_11_0_x86_64.whl", hash = "sha256:34cae047cbbfc4ffa832a61cbb110f3c95f5471c6170c842d3fed161e40814dc", size = 1086783, upload-time = "2024-01-06T19:02:38.903Z" }, + { url = "https://files.pythonhosted.org/packages/55/2f/6af938651ff3aec0a0b00742209df1172bc297fa73531f292801693b7315/libsass-0.23.0-cp38-abi3-macosx_14_0_arm64.whl", hash = "sha256:ea97d1b45cdc2fc3590cb9d7b60f1d8915d3ce17a98c1f2d4dd47ee0d9c68ce6", size = 982759, upload-time = "2024-01-06T19:02:41.331Z" }, + { url = "https://files.pythonhosted.org/packages/fd/5a/eb5b62641df0459a3291fc206cf5bd669c0feed7814dded8edef4ade8512/libsass-0.23.0-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:4a218406d605f325d234e4678bd57126a66a88841cb95bee2caeafdc6f138306", size = 9444543, upload-time = "2024-01-06T19:02:43.191Z" }, + { url = "https://files.pythonhosted.org/packages/e5/fc/275783f5120970d859ae37d04b6a60c13bdec2aa4294b9dfa8a37b5c2513/libsass-0.23.0-cp38-abi3-win32.whl", hash = "sha256:31e86d92a5c7a551df844b72d83fc2b5e50abc6fbbb31e296f7bebd6489ed1b4", size = 775481, upload-time = "2024-01-06T19:02:46.05Z" }, + { url = "https://files.pythonhosted.org/packages/ef/20/caf3c7cf2432d85263119798c45221ddf67bdd7dae8f626d14ff8db04040/libsass-0.23.0-cp38-abi3-win_amd64.whl", hash = "sha256:a2ec85d819f353cbe807432d7275d653710d12b08ec7ef61c124a580a8352f3c", size = 872914, upload-time = "2024-01-06T19:02:47.61Z" }, +] + [[package]] name = "limewire" version = "0.1.0" @@ -724,13 +746,17 @@ source = { editable = "." } dependencies = [ { name = "asyncudp" }, { name = "click" }, + { name = "libsass" }, { name = "loguru" }, { name = "nicegui" }, + { name = "openpyxl" }, + { name = "pandas" }, { name = "platformdirs" }, { name = "python-dotenv" }, { name = "scapy" }, { name = "seaborn" }, { name = "synnax" }, + { name = "tftpy" }, ] [package.dev-dependencies] @@ -742,13 +768,17 @@ dev = [ requires-dist = [ { name = "asyncudp", specifier = ">=0.11.0" }, { name = "click", specifier = ">=8.1.8" }, + { name = "libsass", specifier = ">=0.23.0" }, { name = "loguru", specifier = ">=0.7.3" }, { name = "nicegui", specifier = ">=2.23.2" }, + { name = "openpyxl", specifier = ">=3.1.5" }, + { name = "pandas", specifier = ">=2.3.3" }, { name = "platformdirs", specifier = ">=4.5.0" }, { name = "python-dotenv", specifier = ">=1.0.1,<2.0.0" }, { name = "scapy", specifier = ">=2.6.1" }, { name = "seaborn", specifier = ">=0.13.2" }, { name = "synnax", specifier = ">=0.46,<0.47" }, + { name = "tftpy", specifier = "==0.8.5" }, ] [package.metadata.requires-dev] @@ -1057,6 +1087,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/67/63/871fad5f0073fc00fbbdd7232962ea1ac40eeaae2bba66c76214f7954236/numpy-2.3.4-cp313-cp313t-win_arm64.whl", hash = "sha256:b6c231c9c2fadbae4011ca5e7e83e12dc4a5072f1a1d85a0a7b3ed754d145a40", size = 10266691, upload-time = "2025-10-15T16:17:00.048Z" }, ] +[[package]] +name = "openpyxl" +version = "3.1.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "et-xmlfile" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" }, +] + [[package]] name = "opentelemetry-api" version = "1.37.0" @@ -1749,6 +1791,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4a/c7/a33a06e7cf32bf9a5e4220fe9a9b5ed92ff492078ea7962fd18f26493897/synnax_freighter-0.46.0-py3-none-any.whl", hash = "sha256:fade3bd6ac5decb6a28d6b07f0bb4ae5b0d646a4b876aabf48706af02b6aa6ef", size = 17239, upload-time = "2025-09-29T14:53:27.347Z" }, ] +[[package]] +name = "tftpy" +version = "0.8.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f9/e4/fcb8ef9c25e4b9f5b15f4b5c5fce550de13bb0de7610691f848c959546ab/tftpy-0.8.5.tar.gz", hash = "sha256:dd38e3744530d0c30fa1c715d7fa454319bc8d399bb40c05839cc771f05d0e6c", size = 71339, upload-time = "2025-02-14T14:04:41.125Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ef/dd/5040132ab5dd3f32b8c916aef35ce101ffaaa9ec5a30679d94bd405fc17d/tftpy-0.8.5-py3-none-any.whl", hash = "sha256:435fdf5cc00a1439e698711b1390917b634c84addef581c63e6f743c7b370c28", size = 27372, upload-time = "2025-02-14T14:04:37.665Z" }, +] + [[package]] name = "typing-extensions" version = "4.15.0"