Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ authors = [
]
requires-python = "<=3.13,>=3.12"
dependencies = [
"asyncudp>=0.11.0",
"click>=8.1.8",
"loguru>=0.7.3",
"nicegui>=2.23.2",
Expand Down
56 changes: 28 additions & 28 deletions src/fc_simulator/fc_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from functools import partial
from typing import Tuple

import asyncudp
import synnax as sy

from lmp import (
Board,
DeviceCommandAckMessage,
DeviceCommandMessage,
TelemetryFramer,
TelemetryMessage,
ValveCommandMessage,
ValveStateMessage,
Expand All @@ -33,18 +35,24 @@ def __init__(self, ip_addr: str, tcp_port: int, run_time: float):
self.tcp_port = tcp_port
self.run_time = run_time

self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.log_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.log_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.tcp_aborted = False

async def generate_telemetry_data(
self,
addr: Tuple[str, int],
writer: asyncio.StreamWriter,
run_time: float,
) -> None:
async def generate_telemetry_data(self, run_time: float) -> None:
"""Send randomly generated telemetry data to Limewire."""

# Set up telemetry socket
self.telemetry_socket = await asyncudp.create_socket(
remote_addr=("255.255.255.255", 6767)
)
self.telemetry_socket._transport.get_extra_info("socket").setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1
)
print("Sending telemetry at 255.255.255.255:6767")

self.telemetry_framer = TelemetryFramer(sock=self.telemetry_socket)

start_time = asyncio.get_running_loop().time()

boards = [
Expand All @@ -65,21 +73,9 @@ async def generate_telemetry_data(

timestamp = sy.TimeStamp.now()
msg = TelemetryMessage(board, timestamp, values)
msg_bytes = bytes(msg)

try:
writer.write(len(msg_bytes).to_bytes(1) + msg_bytes)
await writer.drain()
values_sent += len(msg.values)
except ConnectionAbortedError:
print(
f"Connection to client {format_socket_address(addr)} manually aborted"
)
self.tcp_aborted = True
break

if self.tcp_aborted:
break
self.telemetry_framer.send_message(msg)

values_sent += len(msg.values)

if asyncio.get_running_loop().time() - start_time > run_time:
break
Expand Down Expand Up @@ -167,12 +163,8 @@ async def handle_client(

self.tcp_aborted = False

telemetry_task = asyncio.create_task(
self.generate_telemetry_data(addr, writer, run_time)
)
valve_task = asyncio.create_task(self.handle_commands(reader, writer))

await telemetry_task
await valve_task

if not self.tcp_aborted:
Expand All @@ -187,6 +179,12 @@ async def run(self) -> None:
ip_addr: The IP address with which to start the TCP server.
port: The port with which to start the server.
"""

# Start telemetry task
telemetry_task = asyncio.create_task(
self.generate_telemetry_data(self.run_time)
)

# We have to pass a partial function because asyncio.start_server()
# expects a function with only two arguments. functools.partial()
# "fills in" the run_time argument for us and returns a new function
Expand All @@ -203,7 +201,9 @@ async def run(self) -> None:
async with server:
await server.serve_forever()

await telemetry_task

while True:
self.udp_socket.sendto(
self.log_socket.sendto(
b"Hello, world!\r\n", ("127.0.0.1", self.UDP_PORT)
)
6 changes: 4 additions & 2 deletions src/limewire/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@


@click.command(context_settings={"help_option_names": ["--help", "-h"]})
@click.argument("fc_address", type=SocketAddress())
@click.option("--debug")
@click.argument(
"fc_address", type=SocketAddress(), default="141.212.192.170:5000"
)
@click.option("--debug", is_flag=True)
def main(fc_address: tuple[str, int], debug: bool):
"""Run Limewire."""

Expand Down
165 changes: 90 additions & 75 deletions src/limewire/limewire.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
from asyncio.streams import StreamReader, StreamWriter
from contextlib import asynccontextmanager

import asyncudp
import synnax as sy
from loguru import logger

from lmp import (
HeartbeatMessage,
LMPFramer,
LMPMessage,
TelemetryFramer,
TelemetryMessage,
Valve,
ValveCommandMessage,
ValveStateMessage,
)
from lmp.framer import FramingError

from .util import get_write_time_channel_name, synnax_init

Expand All @@ -24,7 +29,7 @@ def __init__(self) -> None:

self.synnax_client, self.channels = synnax_init()
self.synnax_writer = None
self.queue: asyncio.Queue[bytes] = asyncio.Queue()
self.queue: asyncio.Queue[LMPMessage] = asyncio.Queue()

async def start(self, fc_addr: tuple[str, int]) -> None:
"""Open a connection to the flight computer and start Limewire.
Expand All @@ -50,22 +55,28 @@ async def lifespan():
)
await asyncio.sleep(0.5)

telemetry_socket = await asyncudp.create_socket(
local_addr=("0.0.0.0", 6767)
)
self.telemetry_framer = TelemetryFramer(telemetry_socket)
logger.info("Listening for telemetry on UDP port 6767")

self.connected = False
while True:
try:
logger.info(
f"Connecting to flight computer at {fc_addr[0]}:{fc_addr[1]}..."
)

self.tcp_reader, self.tcp_writer = await self._connect_fc(
*fc_addr
)
tcp_reader, tcp_writer = await self._connect_fc(*fc_addr)
self.lmp_framer = LMPFramer(tcp_reader, tcp_writer)

self.connected = True
except ConnectionRefusedError:
await asyncio.sleep(1)
continue

peername = self.tcp_writer.get_extra_info("peername")
peername = tcp_writer.get_extra_info("peername")
logger.info(
f"Connected to flight computer at {peername[0]}:{peername[1]}."
)
Expand All @@ -76,7 +87,8 @@ async def lifespan():
reconnect = False
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(self._tcp_read())
tg.create_task(self._fc_tcp_read())
tg.create_task(self._fc_telemetry_listen())
tg.create_task(self._synnax_write())
tg.create_task(self._relay_valve_cmds())
tg.create_task(self._send_heartbeat())
Expand All @@ -96,41 +108,32 @@ async def lifespan():
else:
break

async def stop(self):
"""Run shutdown code."""

await self.lmp_framer.close()

if self.synnax_writer is not None:
try:
self.synnax_writer.close()
except sy.ValidationError:
logger.warning(
"Ignoring Synnax writer internal validation error(s)."
)

logger.info("=" * 60)

async def _send_heartbeat(self):
HEARTBEAT_INTERVAL = 1
while True:
try:
msg = HeartbeatMessage()
msg_bytes = bytes(msg)

self.tcp_writer.write(msg_bytes)
await self.tcp_writer.drain()
await self.lmp_framer.send_message(HeartbeatMessage())
await asyncio.sleep(HEARTBEAT_INTERVAL)
logger.debug(f"Queue size: {self.queue.qsize()}")
except ConnectionResetError as err:
raise err
# print(err)

async def stop(self):
"""Run shutdown code."""

self.tcp_writer.close()
await self.tcp_writer.wait_closed()

if self.synnax_writer is not None:
self.synnax_writer.close()

# Print statistics
# print() # Add extra newline after Ctrl+C
runtime = asyncio.get_event_loop().time() - self.start_time
if self.values_processed == 0:
logger.warning("Unable to receive data from flight computer!")
else:
logger.info(
f"Processed {self.values_processed} values in {runtime:.2f} sec ({self.values_processed / runtime:.2f} values/sec)"
)

logger.info("=" * 60)

async def _connect_fc(
self, ip_addr: str, port: int
) -> tuple[StreamReader, StreamWriter]:
Expand All @@ -156,65 +159,82 @@ async def _connect_fc(
f"Unable to connect to flight computer at {ip_addr}:{port}."
)

async def _tcp_read(self):
async def _fc_tcp_read(self):
"""Handle incoming data from the TCP connection.

Returns:
The number of telemetry values processed.
"""
self.values_processed = 0
while True:
msg_length = await self.tcp_reader.read(1)
if not msg_length:
break
try:
message = await self.lmp_framer.receive_message()
except (FramingError, ValueError) as err:
logger.error(str(err))
logger.opt(exception=err).debug("Traceback: ", exc_info=True)
continue

msg_length = int.from_bytes(msg_length)
msg_bytes = await self.tcp_reader.readexactly(msg_length)
if not msg_bytes:
if message is None:
break

msg_id = int.from_bytes(msg_bytes[0:1])
match msg_id:
case TelemetryMessage.MSG_ID:
await self.queue.put(msg_bytes)
num_values = (len(msg_bytes) - 1 - 1 - 8) // 4
self.values_processed += num_values
case ValveStateMessage.MSG_ID:
await self.queue.put(msg_bytes)
self.values_processed += 1
case _:
raise ValueError(
f"Received invalid LMP message identifier: 0x{msg_id:X}"
)
if type(message) is ValveStateMessage:
await self.queue.put(message)
else:
pass
# TODO: log warning

async def _fc_telemetry_listen(self):
"""Listen for telemetry messages."""
while True:
message = await self.telemetry_framer.receive_message()
await self.queue.put(message)

async def _synnax_write(self) -> None:
"""Write telemetry data and valve state data to Synnax."""
while True:
# Parse message bytes into TelemetryMessage
msg_bytes = await self.queue.get()
msg_id = int.from_bytes(msg_bytes[0:1])
message = await self.queue.get()

if msg_id == TelemetryMessage.MSG_ID:
msg = TelemetryMessage.from_bytes(msg_bytes)
if not isinstance(message, TelemetryMessage) and not isinstance(
message, ValveStateMessage
):
logger.warning(
f"Invalid message type '{str(type(message))}' in queue."
)
self.queue.task_done()
continue

try:
frame = self._build_telemetry_frame(msg)
except KeyError as err:
logger.error(str(err), extra={"error_code": "0006"})
self.queue.task_done()
continue
else:
msg = ValveStateMessage.from_bytes(msg_bytes)
frame = self._build_valve_state_frame(msg)
frame = self._build_synnax_frame(message)
if frame is None:
self.queue.task_done()
continue

if self.synnax_writer is None:
self.synnax_writer = await self._open_synnax_writer(
msg.timestamp
message.timestamp
)

try:
self.synnax_writer.write(frame)
except sy.ValidationError as err:
logger.warning(
f"Synnax validation error '{str(err)}', skipping frame"
)
self.synnax_writer.write(frame)

self.queue.task_done()

def _build_synnax_frame(
self, msg: TelemetryMessage | ValveStateMessage
) -> dict | None:
if isinstance(msg, TelemetryMessage):
try:
frame = self._build_telemetry_frame(msg)
except KeyError as err:
logger.error(str(err), extra={"error_code": "0006"})
return None
else:
frame = self._build_valve_state_frame(msg)

return frame

def _build_telemetry_frame(self, msg: TelemetryMessage) -> dict:
"""Construct a frame to write to Synnax from a telemetry message.

Expand Down Expand Up @@ -304,9 +324,4 @@ async def _relay_valve_cmds(self):
# For now, let's assume that if multiple values are in the
# frame, we only care about the most recent one
msg = ValveCommandMessage(valve, bool(series[-1]))
msg_bytes = bytes(msg)

self.tcp_writer.write(
len(msg_bytes).to_bytes(1) + msg_bytes
)
await self.tcp_writer.drain()
await self.lmp_framer.send_message(msg)
Loading
Loading