Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"nicegui>=2.23.2",
"platformdirs>=4.5.0",
"python-dotenv<2.0.0,>=1.0.1",
"scapy>=2.6.1",
"seaborn>=0.13.2",
"synnax>=0.46,<0.47",
]
Expand Down
16 changes: 13 additions & 3 deletions src/fc_simulator/fc_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Board,
DeviceCommandAckMessage,
DeviceCommandMessage,
HeartbeatMessage,
TelemetryFramer,
TelemetryMessage,
ValveCommandMessage,
Expand Down Expand Up @@ -64,14 +65,20 @@ async def generate_telemetry_data(self, run_time: float) -> None:
]

values_sent = 0
loop_counter = 0
while True:
loop_start_time = asyncio.get_running_loop().time()
for board in boards:
values = [
i * random.uniform(0, 1) for i in range(board.num_values)
]

timestamp = sy.TimeStamp.now()
# Send a 0-timestamped telemetry message every 100 messages
if loop_counter % 100 == 0:
timestamp = loop_counter
else:
timestamp = sy.TimeStamp.now()

msg = TelemetryMessage(board, timestamp, values)
self.telemetry_framer.send_message(msg)

Expand All @@ -87,6 +94,8 @@ async def generate_telemetry_data(self, run_time: float) -> None:
)
await asyncio.sleep(max(0, 1 / DATA_RATE - loop_elapsed_time))

loop_counter += 1

actual_run_time = asyncio.get_running_loop().time() - start_time

print(
Expand Down Expand Up @@ -148,9 +157,10 @@ async def get_response_msg(
)

return response

case HeartbeatMessage.MSG_ID:
pass
case _:
raise ValueError("Received non-command message.")
print(f"Received non-command message (header 0x{msg_id:X}).")

async def handle_client(
self,
Expand Down
4 changes: 3 additions & 1 deletion src/hydrant/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@


@click.command(context_settings={"help_option_names": ["--help", "-h"]})
@click.argument("fc_address", type=SocketAddress())
@click.argument(
"fc_address", type=SocketAddress(), default="141.212.192.170:5000"
)
def main(fc_address: tuple[str, int]):
print("! HYDRANT RUNNING !")

Expand Down
83 changes: 55 additions & 28 deletions src/limewire/limewire.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import platform
from asyncio.streams import StreamReader, StreamWriter
from contextlib import asynccontextmanager

import asyncudp
import synnax as sy
from loguru import logger
from scapy.error import Scapy_Exception

from lmp import (
HeartbeatMessage,
Expand All @@ -18,7 +20,14 @@
)
from lmp.framer import FramingError

from .util import get_write_time_channel_name, synnax_init
from .ntp_sync import send_ntp_sync
from .util import (
get_write_time_channel_name,
is_valve_command_channel,
synnax_init,
)

WINERROR_SEMAPHORE_TIMEOUT = 121


class Limewire:
Expand Down Expand Up @@ -68,13 +77,31 @@ async def lifespan():
f"Connecting to flight computer at {fc_addr[0]}:{fc_addr[1]}..."
)

tcp_reader, tcp_writer = await self._connect_fc(*fc_addr)
self.lmp_framer = LMPFramer(tcp_reader, tcp_writer)

self.connected = True
async with asyncio.timeout(1):
tcp_reader, tcp_writer = await self._connect_fc(
*fc_addr
)
self.lmp_framer = LMPFramer(tcp_reader, tcp_writer)
self.connected = True
except TimeoutError:
logger.warning("Connection attempt timed out.")
continue
except ConnectionRefusedError:
logger.warning("Connection refused.")
await asyncio.sleep(1)
continue
except OSError as err:
if (
platform.system() == "Windows"
and getattr(err, "winerr", None)
== WINERROR_SEMAPHORE_TIMEOUT
):
logger.warning(
f"Connection attempt timed out (Windows OSError: {str(err)})."
)
continue
else:
raise err

peername = tcp_writer.get_extra_info("peername")
logger.info(
Expand All @@ -100,9 +127,7 @@ async def lifespan():
f"Tasks failed with {len(eg.exceptions)} error(s)"
)
for exc in eg.exceptions:
logger.exception(
"Exception raised with type %s: %s", type(exc), exc
)
logger.opt(exception=exc).error("Traceback: ")
if reconnect:
continue
else:
Expand Down Expand Up @@ -219,6 +244,22 @@ async def _synnax_write(self) -> None:
f"Synnax validation error '{str(err)}', skipping frame"
)

try:
self.synnax_writer.close()
except sy.ValidationError:
# Why oh why must you be this way Synnax :(
pass

# Writer will get re-initialzed during next loop iteration
self.synnax_writer = None

logger.info("Sending NTP sync.")

try:
send_ntp_sync()
except Scapy_Exception:
logger.warning("NTP sync failed.")

self.queue.task_done()

def _build_synnax_frame(
Expand Down Expand Up @@ -282,21 +323,11 @@ async def _open_synnax_writer(self, timestamp: int) -> sy.Writer:
for ch in data_channels:
writer_channels.append(ch)

authorities = []
for channel in writer_channels:
# Schematic and/or autosequences should maintain control
# of command channels, and Limewire should have absolute
# authority of all other channels.
if "cmd" in channel:
authorities.append(0)
else:
authorities.append(255)

writer = self.synnax_client.open_writer(
start=timestamp,
channels=writer_channels,
enable_auto_commit=True,
authorities=authorities,
authorities=0, # Limewire should never control command channels,
)

return writer
Expand All @@ -305,15 +336,11 @@ async def _relay_valve_cmds(self):
"""Relay valve commands from Synnax to the flight computer."""

# Create a list of all valve command channels
cmd_channels = []
for data_channels in self.channels.values():
for channel in data_channels:
if (
"cmd" in channel
and "timestamp" not in channel
and "limewire" not in channel
):
cmd_channels.append(channel)
cmd_channels: list[str] = []
for data_channel_names in self.channels.values():
for channel_name in data_channel_names:
if is_valve_command_channel(channel_name):
cmd_channels.append(channel_name)

async with await self.synnax_client.open_async_streamer(
cmd_channels
Expand Down
8 changes: 8 additions & 0 deletions src/limewire/ntp_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from scapy.all import IP, UDP, send
from scapy.layers.ntp import NTPHeader


def send_ntp_sync():
ntp_packet = NTPHeader(mode=5)
packet = IP(dst="141.212.192.255") / UDP(dport=123) / ntp_packet
send(packet)
22 changes: 21 additions & 1 deletion src/limewire/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,31 @@ def synnax_init() -> tuple[sy.Synnax, dict[str, list[str]]]:
return client, channels


def is_valve_command_channel(channel_name: str) -> bool:
name_split = channel_name.split("_")

if len(name_split) != 3:
return False

return name_split[1] == "vlv"


def is_valve_state_channel(channel_name: str) -> bool:
name_split = channel_name.split("_")

if len(name_split) != 3:
return False

return name_split[1] == "state"


def get_data_type(channel_name: str) -> sy.DataType:
"""Return the DataType associated with the channel."""
if "limewire" in channel_name:
return sy.DataType.TIMESTAMP
if "state" in channel_name or "cmd" in channel_name:
if is_valve_command_channel(channel_name) or is_valve_state_channel(
channel_name
):
return sy.DataType.UINT8
return sy.DataType.FLOAT32

Expand Down
4 changes: 2 additions & 2 deletions src/lmp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ def from_channel_name(cls, name: str):
ValueError: The channel name passed in is not a valve channel.
"""

if "vlv" not in name:
if "vlv" not in name and "state" not in name:
raise ValueError(f"Invalid valve channel {name}")

components = name.split("_")
board_name = components[0]
num = int(components[1][-1])
num = int(components[2])

return cls(Board[board_name.upper()], num)

Expand Down
11 changes: 11 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading