|
| 1 | +from binascii import crc_hqx |
| 2 | + |
| 3 | +from ax25 import Address, Control, Frame, FrameType |
| 4 | +from pyStuffing import BitStuffing |
| 5 | + |
| 6 | + |
| 7 | +class AX25: |
| 8 | + """ |
| 9 | + A class with the encode and decode methods for the Ax25 Protocol |
| 10 | + """ |
| 11 | + |
| 12 | + _DEFAULT_SSID: int = 0 |
| 13 | + _CRC_LENGTH_BITS = 16 |
| 14 | + |
| 15 | + def __init__(self, src: str, dst: str) -> None: |
| 16 | + """ |
| 17 | + :param src: Source Address Callsign (Should be a maximum of 6 characters long) |
| 18 | + :param dst: Destination Address Callsign (Should be a maximum of 6 characters long) |
| 19 | + """ |
| 20 | + if Address.valid_call(src) and Address.valid_call(dst): |
| 21 | + self.src_callsign = src |
| 22 | + self.dst_callsign = dst |
| 23 | + elif not Address.valid_call(src) and not Address.valid_call(dst): |
| 24 | + raise ValueError("Both Addresses are not valid") |
| 25 | + elif not Address.valid_call(src): |
| 26 | + raise ValueError("Source Address is not valid") |
| 27 | + elif not Address.valid_call(dst): |
| 28 | + raise ValueError("Destination Address is not valid") |
| 29 | + |
| 30 | + def encode_frame( |
| 31 | + self, |
| 32 | + data_to_send: bytes, |
| 33 | + frame_type: FrameType, |
| 34 | + sequence_number: int = 0, |
| 35 | + ) -> bytes: |
| 36 | + """ |
| 37 | + Encodes and Information Frame with the requested data using the ax25 library |
| 38 | + Note: The source and destination call signs passed in the constructor of the class are used |
| 39 | + |
| 40 | + :param data_to_send: Data that needs to be sent in the frame |
| 41 | + :param ns: Send Sequence Number |
| 42 | + :return: Generated Frame |
| 43 | + """ |
| 44 | + |
| 45 | + # Generate Frame Object as per Library Specfications |
| 46 | + control_block = Control(frame_type, poll_final=False, send_seqno=sequence_number) |
| 47 | + src_address = Address(call=self.src_callsign, ssid=self._DEFAULT_SSID) |
| 48 | + dst_address = Address(call=self.dst_callsign, ssid=self._DEFAULT_SSID) |
| 49 | + frame_bytes = bytearray( |
| 50 | + Frame( |
| 51 | + dst=dst_address, |
| 52 | + src=src_address, |
| 53 | + via=None, |
| 54 | + control=control_block, |
| 55 | + pid=0, |
| 56 | + data=data_to_send, |
| 57 | + ).pack() |
| 58 | + ) |
| 59 | + |
| 60 | + # Calculate fcs using CRC 16 and then reverse it |
| 61 | + initial_crc_value = 0 |
| 62 | + binary = bin(crc_hqx(frame_bytes, initial_crc_value)) |
| 63 | + reverse = binary[-1:1:-1] |
| 64 | + reverse = reverse + (self._CRC_LENGTH_BITS - len(reverse)) * "0" |
| 65 | + fcs = bytearray(int(reverse, self._CRC_LENGTH_BITS // 8).to_bytes(self._CRC_LENGTH_BITS // 8, "big")) |
| 66 | + |
| 67 | + frame_bytes = frame_bytes + fcs |
| 68 | + |
| 69 | + # Define the flags |
| 70 | + start_end_flag = bytearray(bytes.fromhex("7E")) |
| 71 | + |
| 72 | + # Use the mutability of bytearrays to append everything into a huge bytearray that contains what we want to send |
| 73 | + frame_bytes = start_end_flag + frame_bytes + start_end_flag |
| 74 | + |
| 75 | + # Convert the bytearray to bytes |
| 76 | + return_frame = bytes(frame_bytes) |
| 77 | + |
| 78 | + return return_frame |
| 79 | + |
| 80 | + def decode_frame(self, input_data: bytes) -> Frame: |
| 81 | + """ |
| 82 | + Decodes frames passed in as bytes using the ax25 library. |
| 83 | + |
| 84 | + :param input_data: Unstuffed frame in bytes |
| 85 | + :return: The decoded frame |
| 86 | + """ |
| 87 | + data = input_data[1:-1] |
| 88 | + |
| 89 | + # Get the FCS flags from the original data transmission |
| 90 | + fcs_original = int.from_bytes(data[-2:], byteorder="big", signed=False) |
| 91 | + # Remove the fcs flags |
| 92 | + data = data[:-2] |
| 93 | + |
| 94 | + # Calculate fcs of recieved frame and then reversing it |
| 95 | + binary = bin(crc_hqx(data, 0)) |
| 96 | + reverse = binary[-1:1:-1] |
| 97 | + reverse = reverse + (self._CRC_LENGTH_BITS - len(reverse)) * "0" |
| 98 | + fcs_data = int(reverse, self._CRC_LENGTH_BITS // 8) |
| 99 | + |
| 100 | + if fcs_original != fcs_data: |
| 101 | + raise ValueError("Check sums do not match") |
| 102 | + |
| 103 | + return Frame.unpack(data) |
| 104 | + |
| 105 | + def unstuff(self, input_data: bytes) -> bytes: |
| 106 | + """ |
| 107 | + Unstuffs frames passed in as bytes using the pyStuffing library. |
| 108 | + |
| 109 | + :param input_data: Stuffed frame to be unstuffed as bytes |
| 110 | + :return: The unStuffed frame |
| 111 | + """ |
| 112 | + data = input_data[1:-1] |
| 113 | + |
| 114 | + byte_list = [] |
| 115 | + for byte in data: |
| 116 | + bits = bin(byte).removeprefix("0b") |
| 117 | + byte_list.append(("0" * (8 - len(bits))) + bits) |
| 118 | + byte_string = "".join(byte_list) |
| 119 | + bin_list = [int(s) for s in byte_string] |
| 120 | + |
| 121 | + unstuff = BitStuffing(bin_list) |
| 122 | + unstuff.stuffed = bin_list |
| 123 | + unstuff.startUnstuffing() |
| 124 | + res = "".join([str(s) for s in unstuff.unStuffed]) |
| 125 | + res = res + ("0" * (8 - len(res) % 8)) |
| 126 | + data = bytes(int(res[i : i + 8], 2) for i in range(0, len(res), 8)) |
| 127 | + |
| 128 | + # Remove a 0 at the end of the string that might have been created as a result of adding in 0s |
| 129 | + if data[-1] == 0: |
| 130 | + data = data[:-1] |
| 131 | + |
| 132 | + data_bytes = bytearray(data) |
| 133 | + start_end_flag = bytearray(bytes.fromhex("7E")) |
| 134 | + return bytes(start_end_flag + data_bytes + start_end_flag) |
| 135 | + |
| 136 | + def stuff(self, input_data: bytes) -> bytes: |
| 137 | + """ |
| 138 | + Stuffs frames passed in as bytes using the pyStuffing library. |
| 139 | + |
| 140 | + :param input_data: Unstuffed frame to be stuffed as bytes |
| 141 | + :return: The stuffed frame |
| 142 | + """ |
| 143 | + data_stripped = input_data[1:-1] |
| 144 | + byte_list = [] |
| 145 | + for byte in data_stripped: |
| 146 | + bits = bin(byte).removeprefix("0b") |
| 147 | + byte_list.append(("0" * (8 - len(bits))) + bits) |
| 148 | + byte_string = "".join(byte_list) |
| 149 | + bin_list = [int(s) for s in byte_string] |
| 150 | + |
| 151 | + unstuff = BitStuffing(bin_list) |
| 152 | + unstuff.startStuffing() |
| 153 | + res = "".join([str(s) for s in unstuff.stuffed]) |
| 154 | + res = res + ("0" * (8 - len(res) % 8)) |
| 155 | + data_bytes = bytearray(bytes(int(res[i : i + 8], 2) for i in range(0, len(res), 8))) |
| 156 | + |
| 157 | + # Define the flags |
| 158 | + start_end_flag = bytearray(bytes.fromhex("7E")) |
| 159 | + |
| 160 | + return bytes(start_end_flag + data_bytes + start_end_flag) |
| 161 | + |
| 162 | + def _bytes_to_int_list(self, data: bytes) -> list[int]: |
| 163 | + """ |
| 164 | + A Function that converts bytes to a list of integers representing bits |
| 165 | + |
| 166 | + :param data: The data that needs to be converted |
| 167 | + :return: A list of integers representing bits |
| 168 | + """ |
| 169 | + return [] |
| 170 | + |
| 171 | + def _int_list_to_bytes(self, data: list[int]): |
| 172 | + """ |
| 173 | + A Function that converts a list of integers to bytes |
| 174 | + |
| 175 | + :param data: The list of integers that needs to be converted |
| 176 | + :return: The converted bytes |
| 177 | + """ |
| 178 | + |
| 179 | + |
| 180 | +# Example Usage |
| 181 | +if __name__ == "__main__": |
| 182 | + comm_1 = AX25("ATLAS", "AKITO") |
| 183 | + send_frame = comm_1.encode_frame(b"UW Orbital", FrameType.I, 0) |
| 184 | + print(send_frame) |
| 185 | + |
| 186 | + rcv_frame = comm_1.decode_frame(send_frame) |
| 187 | + print("Source: " + str(rcv_frame.src)) |
| 188 | + print("Destination: " + str(rcv_frame.dst)) |
| 189 | + print("Frame Type: " + str(rcv_frame.control.frame_type)) |
| 190 | + print("Data: " + str(rcv_frame.data.decode("UTF-8"))) |
| 191 | + print("") |
| 192 | + |
| 193 | + comm_2 = AX25("LEAFS", "CANUCK") |
| 194 | + send_frame = comm_2.encode_frame(b"Leafs in four", FrameType.U, 0) |
| 195 | + print(send_frame) |
| 196 | + |
| 197 | + rcv_frame = comm_2.decode_frame(send_frame) |
| 198 | + print("Source: " + str(rcv_frame.src)) |
| 199 | + print("Destination: " + str(rcv_frame.dst)) |
| 200 | + print("Frame Type: " + str(rcv_frame.control.frame_type)) |
| 201 | + print("Data: " + str(rcv_frame.data.decode("UTF-8"))) |
0 commit comments