Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ authors = [
requires-python = ">=3.11"
dependencies = [
"crc8>=0.2.1",
"pydantic>=2.12.3",
]
classifiers = [
"Development Status :: 2 - Pre-Alpha",
Expand Down
33 changes: 33 additions & 0 deletions src/parsley/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,46 @@
from . import fields
from . import message_definitions
from . import message_types
from .parsley_message import ParsleyObject, ParsleyError
from .parse_to_object import (
ParsleyParser,
USBDebugParser,
LiveTelemetryParser,
LoggerParser,
BitstringParser,
)
from .parsley import (
parse_fields, parse, parse_board_type_id, parse_board_inst_id,

Check warning on line 14 in src/parsley/__init__.py

View workflow job for this annotation

GitHub Actions / Run Basedpyright

Type of "parse_board_type_id" is partially unknown   Type of "parse_board_type_id" is "(encoded_board_type_id: bytes) -> dict[Unknown, Unknown]" (reportUnknownVariableType)

Check warning on line 14 in src/parsley/__init__.py

View workflow job for this annotation

GitHub Actions / Run Basedpyright

Type of "parse" is partially unknown   Type of "parse" is "(msg_sid: bytes, msg_data: bytes) -> dict[Unknown, Unknown]" (reportUnknownVariableType)
parse_bitstring,
parse_live_telemetry,
parse_usb_debug,
parse_logger,
format_line,

Check warning on line 19 in src/parsley/__init__.py

View workflow job for this annotation

GitHub Actions / Run Basedpyright

Type of "format_line" is partially unknown   Type of "format_line" is "(parsed_data: dict[Unknown, Unknown]) -> str" (reportUnknownVariableType)
encode_data,

Check warning on line 20 in src/parsley/__init__.py

View workflow job for this annotation

GitHub Actions / Run Basedpyright

Type of "encode_data" is partially unknown   Type of "encode_data" is "(parsed_data: dict[Unknown, Unknown]) -> tuple[int, list[int]]" (reportUnknownVariableType)
calculate_msg_bit_len

Check warning on line 21 in src/parsley/__init__.py

View workflow job for this annotation

GitHub Actions / Run Basedpyright

Type of "calculate_msg_bit_len" is partially unknown   Type of "calculate_msg_bit_len" is "(can_message: Unknown) -> (Unknown | Literal[0])" (reportUnknownVariableType)
)

__all__ = [
"BitString",
"fields",
"message_definitions",
"message_types",
"ParsleyObject",
"ParsleyError",
"ParsleyParser",
"USBDebugParser",
"LiveTelemetryParser",
"LoggerParser",
"BitstringParser",
"parse_fields",
"parse",
"parse_board_type_id",
"parse_board_inst_id",
"parse_bitstring",
"parse_live_telemetry",
"parse_usb_debug",
"parse_logger",
"format_line",
"encode_data",
"calculate_msg_bit_len",
]
264 changes: 264 additions & 0 deletions src/parsley/parse_to_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
'''
Contains the new static class implementation of Parsley.py
'''
from typing import Any
from parsley.parsley_message import ParsleyObject, ParsleyError
from parsley.bitstring import BitString
from parsley.message_definitions import CAN_MESSAGE, MESSAGE_PRIO, MESSAGE_TYPE, BOARD_TYPE_ID, BOARD_INST_ID, MESSAGE_SID
import parsley.parse_utils as pu
from parsley.fields import Field, Switch, Bitfield
from abc import ABC, abstractmethod
import struct
import crc8

Check failure on line 12 in src/parsley/parse_to_object.py

View workflow job for this annotation

GitHub Actions / Run Basedpyright

Import "crc8" could not be resolved (reportMissingImports)
import parsley.message_types as mt

#Used for formatting lines
MSG_PRIO_LEN = max([len(msg_prio) for msg_prio in mt.msg_prio])
MSG_TYPE_LEN = max([len(msg_type) for msg_type in mt.msg_type])
BOARD_TYPE_ID_LEN = max([len(board_type_id) for board_type_id in mt.board_type_id])
BOARD_INST_ID_LEN = max([len(board_inst_id) for board_inst_id in mt.board_inst_id])

class _ParsleyParseInternal:
def __init__(self):
raise NotImplementedError("This class is static only do not instantiate it")

@staticmethod
def format_line(parsed_data: dict) -> str:
msg_prio = parsed_data['msg_prio']
msg_type = parsed_data['msg_type']
board_type_id = parsed_data['board_type_id']
board_inst_id = parsed_data['board_inst_id']
data = parsed_data['data']
res = f'[ {msg_prio:<{MSG_PRIO_LEN}} {msg_type:<{MSG_TYPE_LEN}} {board_type_id:<{BOARD_TYPE_ID_LEN}} {board_inst_id:<{BOARD_INST_ID_LEN}} ]'
for k, v in data.items():
formatted_value = f"{v:.3f}" if isinstance(v, float) else v
res += f' {k}: {formatted_value}'
return res

@staticmethod
def calculate_msg_bit_len(can_message):
bit_len = 0
for field in can_message:
bit_len += field.length
return bit_len

@staticmethod
def encode_data(parsed_data: dict) -> tuple[int, list[int]]:
msg_prio = parsed_data['msg_prio']
msg_type = parsed_data['msg_type']
board_type_id = parsed_data['board_type_id']
board_inst_id = parsed_data['board_inst_id']

bit_str = BitString()
bit_str.push(*MESSAGE_PRIO.encode(msg_prio))
bit_str.push(*MESSAGE_TYPE.encode(msg_type))
bit_str.push(bytes([0, 0]), 2)
bit_str.push(*BOARD_TYPE_ID.encode(board_type_id))
bit_str.push(*BOARD_INST_ID.encode(board_inst_id))
msg_sid = int.from_bytes(bit_str.pop(bit_str.length), byteorder='big')

# skip the first field (board_id) since thats parsed separately
for field in CAN_MESSAGE.get_fields(msg_type)[3:]:
bit_str.push(*field.encode(parsed_data[field.name]))
msg_data = [byte for byte in bit_str.pop(bit_str.length)]
return msg_sid, msg_data

@staticmethod
def parse_fields(bit_str: BitString, fields: list[Field]) -> dict[str, Any]:
"""
Parses binary data stored in a BitString and decodes the data
based on each field's decode() implementation. Returns a dictionary
of each field's name to its decoded python value.
"""
res: dict[str, Any] = {}
for field in fields:
data = bit_str.pop(field.length, field.variable_length)
res[field.name] = field.decode(data)
if isinstance(field, Switch):
nested_fields = field.get_fields(res[field.name])
res.update(_ParsleyParseInternal.parse_fields(bit_str, nested_fields))
if isinstance(field, Bitfield):
res[field.name] = field.decode(data)

return res

@staticmethod
def format_can_message(msg_sid: int, msg_data: list[int]) -> tuple[bytes, bytes]:
msg_sid_length = (msg_sid.bit_length() + 7) // 8
formatted_msg_sid = msg_sid.to_bytes(msg_sid_length, byteorder='big')
formatted_msg_data = bytes(msg_data)
return formatted_msg_sid, formatted_msg_data

@staticmethod
def parse_board_type_id(encoded_board_type_id: bytes) -> str:
board_type_id = None
try:
board_type_id = BOARD_TYPE_ID.decode(encoded_board_type_id)
except ValueError:
board_type_id = pu.hexify(encoded_board_type_id)
return board_type_id

@staticmethod
def parse_board_inst_id(encoded_board_inst_id: bytes) -> str:
board_inst_id = None
try:
board_inst_id = BOARD_INST_ID.decode(encoded_board_inst_id)
except ValueError:
board_inst_id = pu.hexify(encoded_board_inst_id)
return board_inst_id

@staticmethod
def parse_to_object(msg_sid: bytes, msg_data: bytes) -> ParsleyObject | ParsleyError:
"""
Extracts the message_type and board_id from msg_sid to construct a Parsley Object along with message_data.
Upon reading poorly formatted data, the error is caught and returned in a ParsleyError object.
"""
# Allow callers to pass integer SID
if isinstance(msg_sid, int):
sid_bytes, data_bytes = _ParsleyParseInternal.format_can_message(msg_sid,list(msg_data))
msg_sid = sid_bytes
msg_data = data_bytes

# begin parsing
bit_str_msg_sid = BitString(msg_sid, MESSAGE_SID.length)
encoded_msg_prio = bit_str_msg_sid.pop(MESSAGE_PRIO.length)
encoded_msg_type = bit_str_msg_sid.pop(MESSAGE_TYPE.length)
bit_str_msg_sid.pop(2) # reserved field
encoded_board_type_id = bit_str_msg_sid.pop(BOARD_TYPE_ID.length)
encoded_board_inst_id = bit_str_msg_sid.pop(BOARD_INST_ID.length)

board_type_id = _ParsleyParseInternal.parse_board_type_id(encoded_board_type_id)
board_inst_id = _ParsleyParseInternal.parse_board_inst_id(encoded_board_inst_id)

msg_prio = None
msg_type = None
data: dict[str, Any] = {}

try:
msg_prio = MESSAGE_PRIO.decode(encoded_msg_prio)
msg_type = MESSAGE_TYPE.decode(encoded_msg_type)
# we splice the first element since we've already manually parsed BOARD_ID
# if BOARD_ID threw an error, we want to try and parse the rest of the CAN message
fields = CAN_MESSAGE.get_fields(msg_type)[3:]
data = _ParsleyParseInternal.parse_fields(BitString(msg_data), fields)
except (ValueError, IndexError, KeyError) as error:
# convert the 6-bit msg_type into its canlib 12-bit form and include an error object
return ParsleyError(
board_type_id=board_type_id,
board_inst_id=board_inst_id,
msg_type=pu.hexify(encoded_msg_type, is_msg_type=True),
msg_data =pu.hexify(msg_data),
error=f"error: {error}"
)

return ParsleyObject(
msg_prio=msg_prio,
msg_type=msg_type,
board_type_id=board_type_id,
board_inst_id=board_inst_id,
data=data,
)

class ParsleyParser(ABC):
""" Abstract base for different input-format parsers """

@abstractmethod
def parse(self, *args, **kwargs):
raise NotImplementedError("This class is an abstract class")

class USBDebugParser(ParsleyParser):
""" Parse ASCII USB-debug lines """

def parse(self, line: str) -> ParsleyObject | ParsleyError:
line = line.strip(' \0\r\n')
if len(line) == 0 or line[0] != '$':
raise ValueError('Incorrect line format')
line = line[1:]

if ':' in line:
msg_sid, msg_data = line.split(':')
msg_sid_int = int(msg_sid, 16)
msg_data_list = [int(byte, 16) for byte in msg_data.split(',')]
else:
msg_sid_int = int(line, 16)
msg_data_list = []

return _ParsleyParseInternal.parse_to_object(msg_sid_int, msg_data_list)

class LiveTelemetryParser(ParsleyParser):
""" Parse binary live-telemetry """

def parse(self, frame: bytes) -> ParsleyObject | ParsleyError:
if len(frame) < 7:
raise ValueError('Incorrect frame length')
if frame[0] != 0x02:
raise ValueError('Incorrect frame header')

frame_len = frame[1]
msg_sid = int.from_bytes(bytes([frame[2] & 0x1F]) + frame[3:6], byteorder='big')
msg_data = frame[6:frame_len-1]
exp_crc = frame[frame_len-1]
msg_crc = crc8.crc8(frame[:frame_len-1]).digest()[0]

if msg_crc != exp_crc:
raise ValueError(f'Bad checksum, expected {exp_crc:02X} but got {msg_crc:02X}')

return _ParsleyParseInternal.parse_to_object(msg_sid, list(msg_data))

class LoggerParser(ParsleyParser):
""" Parses logger pages and yields `ParsleyObject` items """

"""
Parse one logger record.

Layout (little-endian unless stated):
0 - 2 : ASCII 'L','O','G'
3 : page number (uint8)
4 - 12 : SID (uint32 LE) | timestamp (uint32 LE) | DLC (uint8)
13 - .. : up to 8 bytes CAN payload
-- ff-padding may follow, removed before parsing --

Returns whatever `format_can_message()` returns.
Raises ValueError on any structural problem.
"""

LOG_MAGIC = b'LOG' # ASCII “LOG” = 0x4c4f47
HEADER_FMT = '<IIB' # SID(uint32 LE), timestamp(uint32 LE), DLC(uint8)
HEADER_LEN = struct.calcsize(HEADER_FMT) # == 9
PARSE_LOGGER_PAGE_SIZE = 4096

def parse(self, buf: bytes, page_number: int) -> ParsleyObject | ParsleyError:
# Strip the buffer to 4096 bytes, as required by the logger.
if len(buf) != self.PARSE_LOGGER_PAGE_SIZE:
raise ValueError('Logger message must be exactly 4096 bytes')

if not buf.startswith(self.LOG_MAGIC):
raise ValueError("Missing 'LOG' signature")

if buf[3] != page_number % 256:
raise ValueError(f'Page number mismatch: expected {page_number % 256}, got {buf[3]}')

offset = 4 # start of the header

while (self.PARSE_LOGGER_PAGE_SIZE - offset > self.HEADER_LEN): # at least one message
sid, _, dlc = struct.unpack_from(self.HEADER_FMT, buf, offset)

if sid & 0xE000_0000:
break

if not 0 <= dlc <= 8:
raise ValueError(f'DLC out of range (0-8), got {dlc}')

offset += self.HEADER_LEN
data_list: list[int] = list(buf[offset: offset + dlc])
offset += dlc

yield _ParsleyParseInternal.parse_to_object(sid, data_list)

class BitstringParser(ParsleyParser):
''' Parse BitString objects '''

def parse(self, bit_str: BitString) -> ParsleyObject | ParsleyError:
msg_sid = int.from_bytes(bit_str.pop(MESSAGE_SID.length), byteorder='big')
msg_data = [byte for byte in bit_str.pop(bit_str.length)]
return _ParsleyParseInternal.parse_to_object(msg_sid, msg_data)
Loading
Loading