Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Ignore Limewire log files
limewire*.json

# Ignore Hydrant log files
system-events*.log

# Created by https://www.toptal.com/developers/gitignore/api/python,macos,windows
# Edit at https://www.toptal.com/developers/gitignore?templates=python,macos,windows

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ requires-python = "<=3.13,>=3.12"
dependencies = [
"asyncudp>=0.11.0",
"click>=8.1.8",
"libsass>=0.23.0",
"loguru>=0.7.3",
"nicegui>=2.23.2",
"openpyxl>=3.1.5",
"pandas>=2.3.3",
"platformdirs>=4.5.0",
"python-dotenv<2.0.0,>=1.0.1",
"scapy>=2.6.1",
"seaborn>=0.13.2",
"synnax>=0.46,<0.47",
"tftpy==0.8.5",
]
name = "limewire"
version = "0.1.0"
Expand Down
31 changes: 27 additions & 4 deletions src/hydrant/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
import pathlib

import click
from nicegui import ui
from nicegui import app, ui

from limewire.util import SocketAddress

Expand All @@ -10,12 +13,32 @@
@click.argument(
"fc_address", type=SocketAddress(), default="141.212.192.170:5000"
)
def main(fc_address: tuple[str, int]):
@click.option(
"--log-table",
default=None,
type=click.Path(exists=True, dir_okay=False, path_type=pathlib.Path),
)
def main(fc_address: tuple[str, int], log_table: pathlib.Path):
print("! HYDRANT RUNNING !")

hydrant = Hydrant(fc_address)
script_dir = os.path.dirname(os.path.abspath(__file__))

app.add_static_file(
url_path="/favicon.ico",
local_file=os.path.join(script_dir, "resources/favicon.ico"),
)
app.add_static_file(
url_path="/lebron.png",
local_file=os.path.join(script_dir, "resources/lebron.png"),
)
app.add_static_file(
url_path="/lebron_shoot.jpg",
local_file=os.path.join(script_dir, "resources/lebron_shoot.jpg"),
)

hydrant = Hydrant(fc_address, log_table)

ui.run(hydrant.main_page, show=False, reload=False)
ui.run(hydrant.main_page, show=False, reload=False, favicon="favicon.ico")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/hydrant/device_command_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def set_ack(self, ack_time: datetime, ack_msg: str):
def to_gui_dict(self):
date_format = "%b %d, %Y %I:%M:%S %p"
return {
"Board": self.board.name,
"Board": self.board.pretty_name,
"Command": self.command.name,
"Send Time": self.send_time.strftime(date_format),
"ACK?": self.ack,
Expand Down
190 changes: 190 additions & 0 deletions src/hydrant/eeprom_generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import ipaddress
import struct
import zlib
from enum import Enum
from io import BytesIO

import tftpy


class ValveVoltage(Enum):
V12 = 0 # 12V
V24 = 1 # 24V


class TCGain(Enum):
X1 = 0
X2 = 1
X4 = 2
X8 = 3
X16 = 4
X32 = 5
X64 = 6
X128 = 7

@classmethod
def from_int(cls, gain: int):
if gain == 1:
return TCGain.Gain_1x
elif gain == 2:
return TCGain.Gain_2x
elif gain == 4:
return TCGain.Gain_4x
elif gain == 8:
return TCGain.Gain_8x
elif gain == 16:
return TCGain.Gain_16x
elif gain == 32:
return TCGain.Gain_32x
elif gain == 64:
return TCGain.Gain_64x
elif gain == 128:
return TCGain.Gain_128x
else:
raise Exception("TC gain must be a power of 2 from 1 to 128")


class PT:
def __init__(self, range: float, offset: float, max: float):
self.range = range
self.offset = offset
self.max_voltage = max


class TC:
def __init__(self, gain: TCGain):
self.gain = gain


class VLV:
def __init__(self, voltage: ValveVoltage, enabled: bool):
self.voltage = voltage
self.enabled = enabled


def generate_bb_eeprom(
bb_num: int,
fc_ip: ipaddress.IPv4Address,
bb_ip: ipaddress.IPv4Address,
pts: list[PT],
tcs: list[TC],
vlvs: list[VLV],
) -> bytes:
if len(pts) != 10:
raise ValueError(
"Bay Board must be configured with exactly 10 PT channels"
)
if len(tcs) != 6:
raise ValueError(
"Bay Board must be configured with exactly 6 TC channels"
)
if len(vlvs) != 5:
raise ValueError(
"Bay Board must be configured with exactly 5 valve channels"
)
if bb_num < 1 or bb_num > 3:
raise ValueError("Bay Board must be configured as BB1 through BB3")
raw_out = struct.pack("<B", bb_num)
for x in pts:
raw_out += (
struct.pack("<f", x.offset)
+ struct.pack("<f", x.range)
+ struct.pack("<f", x.max_voltage)
)
for x in tcs:
raw_out += struct.pack("<B", x.gain.value)
for x in vlvs:
raw_out += struct.pack("<B", x.voltage.value) + struct.pack(
"<B", int(x.enabled)
)
raw_out += fc_ip.packed + bb_ip.packed

crc = zlib.crc32(raw_out)

raw_out += struct.pack("<I", crc)
return raw_out


def generate_fr_eeprom(
fc_ip: ipaddress.IPv4Address, fr_ip: ipaddress.IPv4Address
):
print("Flight Recorder doesn't exist yet :(")
raise NotImplementedError(
"Can't do this man, actually this shouldn't be possible"
)


def generate_fc_eeprom(
pts: list[PT],
tcs: list[TC],
vlvs: list[VLV],
limewire_IP: ipaddress.IPv4Address,
fc_ip: ipaddress.IPv4Address,
bb1_ip: ipaddress.IPv4Address,
bb2_ip: ipaddress.IPv4Address,
bb3_ip: ipaddress.IPv4Address,
fr_ip: ipaddress.IPv4Address,
) -> bytes:
if len(pts) != 5:
raise ValueError(
"Flight Computer must be configured with exactly 5 PT channels"
)
if len(tcs) != 3:
raise ValueError(
"Flight Computer must be configured with exactly 3 TC channels"
)
if len(vlvs) != 3:
raise ValueError(
"Flight Computer must be configured with exactly 3 valve channels"
)
raw_out = bytes()
for x in pts:
raw_out += (
struct.pack("<f", x.offset)
+ struct.pack("<f", x.range)
+ struct.pack("<f", x.max_voltage)
)
for x in tcs:
raw_out += struct.pack("<B", x.gain.value)
for x in vlvs:
raw_out += struct.pack("<B", x.voltage.value) + struct.pack(
"<B", int(x.enabled)
)
raw_out += (
limewire_IP.packed
+ fc_ip.packed
+ bb1_ip.packed
+ bb2_ip.packed
+ bb3_ip.packed
+ fr_ip.packed
)

crc = zlib.crc32(raw_out)

raw_out += struct.pack("<I", crc)
return raw_out


def send_eeprom_tftp(board: ipaddress.IPv4Address, content: bytes):
print("Sending eeprom config over TFTP to " + str(board))
tftp_client = tftpy.TftpClient(str(board))
tftp_client.upload("eeprom.bin", BytesIO(content))


def configure_fc(
pts, tcs, vlvs, gseip, fcip, bb1ip, bb2ip, bb3ip, frip, tftpip
):
eeprom_content = generate_fc_eeprom(
pts, tcs, vlvs, gseip, fcip, bb1ip, bb2ip, bb3ip, frip
)
send_eeprom_tftp(tftpip, eeprom_content)


def configure_bb(bb_num, pts, tcs, vlvs, fcip, bbip, tftpip):
eeprom_content = generate_bb_eeprom(bb_num, fcip, bbip, pts, tcs, vlvs)
send_eeprom_tftp(tftpip, eeprom_content)


def configure_fr(fcip, frip, tftpip):
eeprom_content = generate_fr_eeprom(fcip, frip)
send_eeprom_tftp(tftpip, eeprom_content)
Loading
Loading