|
| 1 | +''' |
| 2 | +Contains the new static class implementation of Parsley.py |
| 3 | +''' |
| 4 | +from typing import Any |
| 5 | +from parsley.parsley_message import ParsleyObject, ParsleyError |
| 6 | +from parsley.bitstring import BitString |
| 7 | +from parsley.message_definitions import CAN_MESSAGE, MESSAGE_PRIO, MESSAGE_TYPE, BOARD_TYPE_ID, BOARD_INST_ID, MESSAGE_SID |
| 8 | +import parsley.parse_utils as pu |
| 9 | +from parsley.fields import Field, Switch, Bitfield |
| 10 | +from abc import ABC, abstractmethod |
| 11 | +import struct |
| 12 | +import crc8 |
| 13 | +import parsley.message_types as mt |
| 14 | + |
| 15 | +#Used for formatting lines |
| 16 | +MSG_PRIO_LEN = max([len(msg_prio) for msg_prio in mt.msg_prio]) |
| 17 | +MSG_TYPE_LEN = max([len(msg_type) for msg_type in mt.msg_type]) |
| 18 | +BOARD_TYPE_ID_LEN = max([len(board_type_id) for board_type_id in mt.board_type_id]) |
| 19 | +BOARD_INST_ID_LEN = max([len(board_inst_id) for board_inst_id in mt.board_inst_id]) |
| 20 | + |
| 21 | +class _ParsleyParseInternal: |
| 22 | + def __init__(self): |
| 23 | + raise NotImplementedError("This class is static only do not instantiate it") |
| 24 | + |
| 25 | + @staticmethod |
| 26 | + def format_line(parsed_data: dict) -> str: |
| 27 | + msg_prio = parsed_data['msg_prio'] |
| 28 | + msg_type = parsed_data['msg_type'] |
| 29 | + board_type_id = parsed_data['board_type_id'] |
| 30 | + board_inst_id = parsed_data['board_inst_id'] |
| 31 | + data = parsed_data['data'] |
| 32 | + res = f'[ {msg_prio:<{MSG_PRIO_LEN}} {msg_type:<{MSG_TYPE_LEN}} {board_type_id:<{BOARD_TYPE_ID_LEN}} {board_inst_id:<{BOARD_INST_ID_LEN}} ]' |
| 33 | + for k, v in data.items(): |
| 34 | + formatted_value = f"{v:.3f}" if isinstance(v, float) else v |
| 35 | + res += f' {k}: {formatted_value}' |
| 36 | + return res |
| 37 | + |
| 38 | + @staticmethod |
| 39 | + def calculate_msg_bit_len(can_message): |
| 40 | + bit_len = 0 |
| 41 | + for field in can_message: |
| 42 | + bit_len += field.length |
| 43 | + return bit_len |
| 44 | + |
| 45 | + @staticmethod |
| 46 | + def encode_data(parsed_data: dict) -> tuple[int, list[int]]: |
| 47 | + msg_prio = parsed_data['msg_prio'] |
| 48 | + msg_type = parsed_data['msg_type'] |
| 49 | + board_type_id = parsed_data['board_type_id'] |
| 50 | + board_inst_id = parsed_data['board_inst_id'] |
| 51 | + |
| 52 | + bit_str = BitString() |
| 53 | + bit_str.push(*MESSAGE_PRIO.encode(msg_prio)) |
| 54 | + bit_str.push(*MESSAGE_TYPE.encode(msg_type)) |
| 55 | + bit_str.push(bytes([0, 0]), 2) |
| 56 | + bit_str.push(*BOARD_TYPE_ID.encode(board_type_id)) |
| 57 | + bit_str.push(*BOARD_INST_ID.encode(board_inst_id)) |
| 58 | + msg_sid = int.from_bytes(bit_str.pop(bit_str.length), byteorder='big') |
| 59 | + |
| 60 | + # skip the first field (board_id) since thats parsed separately |
| 61 | + for field in CAN_MESSAGE.get_fields(msg_type)[3:]: |
| 62 | + bit_str.push(*field.encode(parsed_data[field.name])) |
| 63 | + msg_data = [byte for byte in bit_str.pop(bit_str.length)] |
| 64 | + return msg_sid, msg_data |
| 65 | + |
| 66 | + @staticmethod |
| 67 | + def parse_fields(bit_str: BitString, fields: list[Field]) -> dict[str, Any]: |
| 68 | + """ |
| 69 | + Parses binary data stored in a BitString and decodes the data |
| 70 | + based on each field's decode() implementation. Returns a dictionary |
| 71 | + of each field's name to its decoded python value. |
| 72 | + """ |
| 73 | + res: dict[str, Any] = {} |
| 74 | + for field in fields: |
| 75 | + data = bit_str.pop(field.length, field.variable_length) |
| 76 | + res[field.name] = field.decode(data) |
| 77 | + if isinstance(field, Switch): |
| 78 | + nested_fields = field.get_fields(res[field.name]) |
| 79 | + res.update(_ParsleyParseInternal.parse_fields(bit_str, nested_fields)) |
| 80 | + if isinstance(field, Bitfield): |
| 81 | + res[field.name] = field.decode(data) |
| 82 | + |
| 83 | + return res |
| 84 | + |
| 85 | + @staticmethod |
| 86 | + def format_can_message(msg_sid: int, msg_data: list[int]) -> tuple[bytes, bytes] | None: |
| 87 | + msg_sid_length = (msg_sid.bit_length() + 7) // 8 |
| 88 | + formatted_msg_sid = msg_sid.to_bytes(msg_sid_length, byteorder='big') |
| 89 | + formatted_msg_data = bytes(msg_data) |
| 90 | + return formatted_msg_sid, formatted_msg_data |
| 91 | + |
| 92 | + @staticmethod |
| 93 | + def parse_board_type_id(encoded_board_type_id: bytes) -> str: |
| 94 | + board_type_id = None |
| 95 | + try: |
| 96 | + board_type_id = BOARD_TYPE_ID.decode(encoded_board_type_id) |
| 97 | + except ValueError: |
| 98 | + board_type_id = pu.hexify(encoded_board_type_id) |
| 99 | + return board_type_id |
| 100 | + |
| 101 | + @staticmethod |
| 102 | + def parse_board_inst_id(encoded_board_inst_id: bytes) -> str: |
| 103 | + board_inst_id = None |
| 104 | + try: |
| 105 | + board_inst_id = BOARD_INST_ID.decode(encoded_board_inst_id) |
| 106 | + except ValueError: |
| 107 | + board_inst_id = pu.hexify(encoded_board_inst_id) |
| 108 | + return board_inst_id |
| 109 | + |
| 110 | + @staticmethod |
| 111 | + def parse_to_object(msg_sid: bytes, msg_data: bytes) -> ParsleyObject | ParsleyError: |
| 112 | + """ |
| 113 | + Extracts the message_type and board_id from msg_sid to construct a Parsley Object along with message_data. |
| 114 | + Upon reading poorly formatted data, the error is caught and returned in a ParsleyError object. |
| 115 | + """ |
| 116 | + # Allow callers to pass integer SID |
| 117 | + if isinstance(msg_sid, int): |
| 118 | + sid_bytes, data_bytes = _ParsleyParseInternal.format_can_message(msg_sid,list(msg_data)) |
| 119 | + msg_sid = sid_bytes |
| 120 | + msg_data = data_bytes |
| 121 | + |
| 122 | + # begin parsing |
| 123 | + bit_str_msg_sid = BitString(msg_sid, MESSAGE_SID.length) |
| 124 | + encoded_msg_prio = bit_str_msg_sid.pop(MESSAGE_PRIO.length) |
| 125 | + encoded_msg_type = bit_str_msg_sid.pop(MESSAGE_TYPE.length) |
| 126 | + bit_str_msg_sid.pop(2) # reserved field |
| 127 | + encoded_board_type_id = bit_str_msg_sid.pop(BOARD_TYPE_ID.length) |
| 128 | + encoded_board_inst_id = bit_str_msg_sid.pop(BOARD_INST_ID.length) |
| 129 | + |
| 130 | + board_type_id = _ParsleyParseInternal.parse_board_type_id(encoded_board_type_id) |
| 131 | + board_inst_id = _ParsleyParseInternal.parse_board_inst_id(encoded_board_inst_id) |
| 132 | + |
| 133 | + msg_prio = None |
| 134 | + msg_type = None |
| 135 | + data: dict[str, Any] = {} |
| 136 | + |
| 137 | + try: |
| 138 | + msg_prio = MESSAGE_PRIO.decode(encoded_msg_prio) |
| 139 | + msg_type = MESSAGE_TYPE.decode(encoded_msg_type) |
| 140 | + # we splice the first element since we've already manually parsed BOARD_ID |
| 141 | + # if BOARD_ID threw an error, we want to try and parse the rest of the CAN message |
| 142 | + fields = CAN_MESSAGE.get_fields(msg_type)[3:] |
| 143 | + data = _ParsleyParseInternal.parse_fields(BitString(msg_data), fields) |
| 144 | + except (ValueError, IndexError, KeyError) as error: |
| 145 | + # convert the 6-bit msg_type into its canlib 12-bit form and include an error object |
| 146 | + return ParsleyError( |
| 147 | + board_type_id=board_type_id, |
| 148 | + board_inst_id=board_inst_id, |
| 149 | + msg_type=pu.hexify(encoded_msg_type, is_msg_type=True), |
| 150 | + msg_data =pu.hexify(msg_data), |
| 151 | + error=f"error: {error}" |
| 152 | + ) |
| 153 | + |
| 154 | + return ParsleyObject( |
| 155 | + msg_prio=msg_prio, |
| 156 | + msg_type=msg_type, |
| 157 | + board_type_id=board_type_id, |
| 158 | + board_inst_id=board_inst_id, |
| 159 | + data=data, |
| 160 | + ) |
| 161 | + |
| 162 | +class ParsleyParser(ABC): |
| 163 | + """ Abstract base for different input-format parsers """ |
| 164 | + |
| 165 | + @abstractmethod |
| 166 | + def parse(self, *args, **kwargs): |
| 167 | + raise NotImplementedError("This class is an abstract class") |
| 168 | + |
| 169 | +class USBDebugParser(ParsleyParser): |
| 170 | + """ Parse ASCII USB-debug lines """ |
| 171 | + |
| 172 | + def parse(self, line: str) -> ParsleyObject | ParsleyError: |
| 173 | + line = line.strip(' \0\r\n') |
| 174 | + if len(line) == 0 or line[0] != '$': |
| 175 | + raise ValueError('Incorrect line format') |
| 176 | + line = line[1:] |
| 177 | + |
| 178 | + if ':' in line: |
| 179 | + msg_sid, msg_data = line.split(':') |
| 180 | + msg_sid_int = int(msg_sid, 16) |
| 181 | + msg_data_list = [int(byte, 16) for byte in msg_data.split(',')] |
| 182 | + else: |
| 183 | + msg_sid_int = int(line, 16) |
| 184 | + msg_data_list = [] |
| 185 | + |
| 186 | + return _ParsleyParseInternal.parse_to_object(msg_sid_int, msg_data_list) |
| 187 | + |
| 188 | +class LiveTelemetryParser(ParsleyParser): |
| 189 | + """ Parse binary live-telemetry """ |
| 190 | + |
| 191 | + def parse(self, frame: bytes) -> ParsleyObject | ParsleyError: |
| 192 | + if len(frame) < 7: |
| 193 | + raise ValueError('Incorrect frame length') |
| 194 | + if frame[0] != 0x02: |
| 195 | + raise ValueError('Incorrect frame header') |
| 196 | + |
| 197 | + frame_len = frame[1] |
| 198 | + msg_sid = int.from_bytes(bytes([frame[2] & 0x1F]) + frame[3:6], byteorder='big') |
| 199 | + msg_data = frame[6:frame_len-1] |
| 200 | + exp_crc = frame[frame_len-1] |
| 201 | + msg_crc = crc8.crc8(frame[:frame_len-1]).digest()[0] |
| 202 | + |
| 203 | + if msg_crc != exp_crc: |
| 204 | + raise ValueError(f'Bad checksum, expected {exp_crc:02X} but got {msg_crc:02X}') |
| 205 | + |
| 206 | + return _ParsleyParseInternal.parse_to_object(msg_sid, list(msg_data)) |
| 207 | + |
| 208 | +class LoggerParser(ParsleyParser): |
| 209 | + """ Parses logger pages and yields `ParsleyObject` items """ |
| 210 | + |
| 211 | + """ |
| 212 | + Parse one logger record. |
| 213 | +
|
| 214 | + Layout (little-endian unless stated): |
| 215 | + 0 – 2 : ASCII 'L','O','G' |
| 216 | + 3 : page number (uint8) |
| 217 | + 4 – 12 : SID (uint32 LE) | timestamp (uint32 LE) | DLC (uint8) |
| 218 | + 13 – .. : up to 8 bytes CAN payload |
| 219 | + -- ff-padding may follow, removed before parsing -- |
| 220 | +
|
| 221 | + Returns whatever `format_can_message()` returns. |
| 222 | + Raises ValueError on any structural problem. |
| 223 | + """ |
| 224 | + |
| 225 | + LOG_MAGIC = b'LOG' # ASCII “LOG” = 0x4c4f47 |
| 226 | + HEADER_FMT = '<IIB' # SID(uint32 LE), timestamp(uint32 LE), DLC(uint8) |
| 227 | + HEADER_LEN = struct.calcsize(HEADER_FMT) # == 9 |
| 228 | + PARSE_LOGGER_PAGE_SIZE = 4096 |
| 229 | + |
| 230 | + def parse(self, buf: bytes, page_number: int) -> ParsleyObject | ParsleyError: |
| 231 | + # Strip the buffer to 4096 bytes, as required by the logger. |
| 232 | + if len(buf) != self.PARSE_LOGGER_PAGE_SIZE: |
| 233 | + raise ValueError('Logger message must be exactly 4096 bytes') |
| 234 | + |
| 235 | + if not buf.startswith(self.LOG_MAGIC): |
| 236 | + raise ValueError("Missing 'LOG' signature") |
| 237 | + |
| 238 | + if buf[3] != page_number % 256: |
| 239 | + raise ValueError(f'Page number mismatch: expected {page_number % 256}, got {buf[3]}') |
| 240 | + |
| 241 | + offset = 4 # start of the header |
| 242 | + |
| 243 | + while (self.PARSE_LOGGER_PAGE_SIZE - offset > self.HEADER_LEN): # at least one message |
| 244 | + sid, _, dlc = struct.unpack_from(self.HEADER_FMT, buf, offset) |
| 245 | + |
| 246 | + if sid & 0xE000_0000: |
| 247 | + break |
| 248 | + |
| 249 | + if not 0 <= dlc <= 8: |
| 250 | + raise ValueError(f'DLC out of range (0-8), got {dlc}') |
| 251 | + |
| 252 | + offset += self.HEADER_LEN |
| 253 | + data_list: list[int] = list(buf[offset: offset + dlc]) |
| 254 | + offset += dlc |
| 255 | + |
| 256 | + yield _ParsleyParseInternal.parse_to_object(sid, data_list) |
| 257 | + |
| 258 | +class BitstringParser(ParsleyParser): |
| 259 | + ''' Parse BitString objects ''' |
| 260 | + |
| 261 | + def parse(self, bit_str: BitString) -> ParsleyObject | ParsleyError: |
| 262 | + msg_sid = int.from_bytes(bit_str.pop(MESSAGE_SID.length), byteorder='big') |
| 263 | + msg_data = [byte for byte in bit_str.pop(bit_str.length)] |
| 264 | + return _ParsleyParseInternal.parse_to_object(msg_sid, msg_data) |
0 commit comments