Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/limewire/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
"fc_address", type=SocketAddress(), default="141.212.192.170:5000"
)
@click.option("--debug", is_flag=True)
def main(fc_address: tuple[str, int], debug: bool):
@click.option("--overwrite-timestamps", is_flag=True)
def main(fc_address: tuple[str, int], debug: bool, overwrite_timestamps: bool):
"""Run Limewire."""

set_up_logging(debug)

limewire = Limewire()
limewire = Limewire(overwrite_timestamps)

try:
asyncio.run(limewire.start(fc_address)) # pyright: ignore[reportPrivateLocalImportUsage]
Expand Down
55 changes: 41 additions & 14 deletions src/limewire/limewire.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import asyncudp
import synnax as sy
from loguru import logger
from scapy.error import Scapy_Exception

from lmp import (
HeartbeatMessage,
Expand All @@ -33,12 +32,16 @@
class Limewire:
"""A class to manage Limewire's resources."""

def __init__(self) -> None:
logger.info("Limewire started.")
def __init__(self, overwrite_timestamps: bool = False) -> None:
logger.info(
f"Limewire started (overwrite_timestamps={overwrite_timestamps})."
)

self.synnax_client, self.channels = synnax_init()
self.synnax_writer = None
self.lmp_framer = None
self.queue: asyncio.Queue[LMPMessage] = asyncio.Queue()
self.overwrite_timestamps = overwrite_timestamps

async def start(self, fc_addr: tuple[str, int]) -> None:
"""Open a connection to the flight computer and start Limewire.
Expand Down Expand Up @@ -72,12 +75,16 @@ async def lifespan():

self.connected = False
while True:
# Send NTP sync before connecting to ensure correct
# telemetry message timestamps.
send_ntp_sync(logger)

try:
logger.info(
f"Connecting to flight computer at {fc_addr[0]}:{fc_addr[1]}..."
)

async with asyncio.timeout(1):
async with asyncio.timeout(5):
tcp_reader, tcp_writer = await self._connect_fc(
*fc_addr
)
Expand Down Expand Up @@ -122,6 +129,19 @@ async def lifespan():
except* ConnectionResetError:
logger.error("Connection to flight computer lost.")
reconnect = True
except* OSError as eg:
for err in eg.exceptions:
if (
platform.system() == "Windows"
and getattr(err, "winerr", None)
== WINERROR_SEMAPHORE_TIMEOUT
):
logger.warning(
f"Connection attempt timed out (Windows OSError: {str(err)})."
)
reconnect = True
else:
raise eg
except* Exception as eg:
logger.error(
f"Tasks failed with {len(eg.exceptions)} error(s)"
Expand All @@ -136,7 +156,8 @@ async def lifespan():
async def stop(self):
"""Run shutdown code."""

await self.lmp_framer.close()
if self.lmp_framer is not None:
await self.lmp_framer.close()

if self.synnax_writer is not None:
try:
Expand Down Expand Up @@ -211,6 +232,10 @@ async def _fc_telemetry_listen(self):
"""Listen for telemetry messages."""
while True:
message = await self.telemetry_framer.receive_message()

if self.overwrite_timestamps:
message.timestamp = sy.TimeStamp.now()

await self.queue.put(message)

async def _synnax_write(self) -> None:
Expand All @@ -233,9 +258,16 @@ async def _synnax_write(self) -> None:
continue

if self.synnax_writer is None:
self.synnax_writer = await self._open_synnax_writer(
message.timestamp
)
try:
self.synnax_writer = await self._open_synnax_writer(
message.timestamp
)
except sy.ValidationError as err:
logger.warning(
f"Synnax validation error '{str(err)}', skipping frame"
)
send_ntp_sync(logger)
continue

try:
self.synnax_writer.write(frame)
Expand All @@ -253,12 +285,7 @@ async def _synnax_write(self) -> None:
# Writer will get re-initialzed during next loop iteration
self.synnax_writer = None

logger.info("Sending NTP sync.")

try:
send_ntp_sync()
except Scapy_Exception:
logger.warning("NTP sync failed.")
send_ntp_sync(logger)

self.queue.task_done()

Expand Down
13 changes: 11 additions & 2 deletions src/limewire/ntp_sync.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from scapy.all import IP, UDP, send
from scapy.error import Scapy_Exception
from scapy.layers.ntp import NTPHeader


def send_ntp_sync():
def send_ntp_sync(logger=None):
if logger is not None:
logger.info("Sending NTP sync.")

ntp_packet = NTPHeader(mode=5)
packet = IP(dst="141.212.192.255") / UDP(dport=123) / ntp_packet
send(packet)

try:
send(packet)
except Scapy_Exception:
if logger is not None:
logger.warning("NTP sync failed.")
Loading