From a73a8d4261e8aa8f8bff4c67acd2f13ee56436fb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:45:37 +0000 Subject: [PATCH 1/2] Initial plan From 84bed72c2fdb9c544cc078d0064ef8d4323d6519 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 22:12:29 +0000 Subject: [PATCH 2/2] Add ISOBUS (ISO 11783) protocol implementation and tests Co-authored-by: polybassa <1676055+polybassa@users.noreply.github.com> --- scapy/contrib/automotive/isobus.py | 619 +++++++++++++++++++++++++++++ test/contrib/automotive/isobus.uts | 480 ++++++++++++++++++++++ 2 files changed, 1099 insertions(+) create mode 100644 scapy/contrib/automotive/isobus.py create mode 100644 test/contrib/automotive/isobus.uts diff --git a/scapy/contrib/automotive/isobus.py b/scapy/contrib/automotive/isobus.py new file mode 100644 index 00000000000..d0549c896b5 --- /dev/null +++ b/scapy/contrib/automotive/isobus.py @@ -0,0 +1,619 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information + +# scapy.contrib.description = ISOBUS (ISO 11783) +# scapy.contrib.status = loads + +"""ISOBUS (ISO 11783) protocol implementation for Scapy. + +ISOBUS is the communication standard for agriculture and forestry machinery, +based on SAE J1939 which uses extended CAN frames (29-bit identifiers). + +The 29-bit CAN identifier encodes J1939/ISOBUS addressing: + + - Priority (3 bits, bits 28-26): message priority, 0 = highest, 7 = lowest + - Reserved R (1 bit, bit 25): reserved, should be 0 + - Data Page DP (1 bit, bit 24): selects parameter group number page + - PDU Format PF (8 bits, bits 23-16): determines PDU type and PGN + - PDU Specific PS (8 bits, bits 15-8): + For PDU1 (PF < 0xF0): destination address + For PDU2 (PF >= 0xF0): group extension (part of PGN) + - Source Address SA (8 bits, bits 7-0): source address of sending node + +References: + - ISO 11783 (ISOBUS) + - SAE J1939 +""" + +from scapy.packet import Packet, bind_layers +from scapy.fields import ( + BitField, + ByteField, + ByteEnumField, + XByteField, + FlagsField, + LELongField, + LEShortField, + StrFixedLenField, + ThreeBytesField, + XLE3BytesField, + ConditionalField, +) +from scapy.layers.can import CAN + +# Typing imports +from typing import Optional, Tuple + +# Special ISOBUS/J1939 addresses +ISOBUS_ADDR_GLOBAL = 0xFF # Global destination address (broadcast) +ISOBUS_ADDR_NULL = 0xFE # Null/anonymous source address + +# ISOBUS Industry Groups (ISO 11783-1) +ISOBUS_INDUSTRY_GROUPS = { + 0: "Global", + 1: "On-Highway", + 2: "Agricultural and Forestry", + 3: "Construction", + 4: "Marine", + 5: "Industrial", +} + +# Common ISOBUS/J1939 PGNs (Parameter Group Numbers) +ISOBUS_PGNS = { + 0x00E600: "Working Set Master", + 0x00E700: "Working Set Member", + 0x00E800: "Acknowledgment", + 0x00EA00: "Request for PGN", + 0x00EB00: "Transport Protocol - Data Transfer (TP.DT)", + 0x00EC00: "Transport Protocol - Connection Management (TP.CM)", + 0x00EE00: "Address Claimed / Cannot Claim Address", + 0x00EF00: "Proprietary A", + 0x00FED8: "Commanded Address", +} + +# Transport Protocol Connection Management control bytes (TP.CM) +ISOBUS_TP_CM_CONTROL = { + 16: "Request to Send (RTS)", + 17: "Clear to Send (CTS)", + 19: "End of Message Acknowledgment (EOM ACK)", + 32: "Broadcast Announce Message (BAM)", + 255: "Connection Abort", +} + +# Acknowledgment control bytes (PGN 0x00E800) +ISOBUS_ACK_CONTROL = { + 0: "Positive Acknowledgment (ACK)", + 1: "Negative Acknowledgment (NACK)", + 2: "Access Denied", + 3: "Cannot Respond", +} + +# Connection Abort reason codes +ISOBUS_TP_CM_ABORT_REASONS = { + 1: "Already in one or more connection managed sessions", + 2: "System resources were needed for another task", + 3: "A timeout occurred", + 4: "CTS messages received when data transfer is in progress", + 5: "Maximum retransmit request limit reached", + 6: "Unexpected data transfer packet", + 7: "Bad sequence number (and connection is aborted)", + 8: "Duplicate sequence number (and connection is aborted)", + 250: "Other reasons", +} + + +def build_isobus_name(identity_number=0, manufacturer_code=0, ecu_instance=0, + function_instance=0, function=0, reserved_name=0, + device_class=0, device_class_instance=0, + industry_group=2, self_configurable_address=1): + # type: (int, int, int, int, int, int, int, int, int, int) -> int + """Build a J1939/ISOBUS 64-bit NAME value from its component fields. + + The NAME is an 8-byte little-endian field used in Address Claiming and + Commanded Address messages to uniquely identify a node on the bus. + + :param identity_number: 21-bit identity number unique to the manufacturer + :param manufacturer_code: 11-bit manufacturer code (assigned by SAE) + :param ecu_instance: 3-bit ECU instance (0 if only one ECU per function) + :param function_instance: 5-bit function instance + :param function: 8-bit function code (defined per industry group) + :param reserved_name: 1-bit reserved field (should be 0) + :param device_class: 7-bit device class + :param device_class_instance: 4-bit device class instance + :param industry_group: 3-bit industry group (2 = Agricultural and Forestry) + :param self_configurable_address: 1-bit flag (1 = can self-configure address) + :return: 64-bit NAME value (for use with LELongField) + + Example:: + + >>> nn = build_isobus_name( + ... identity_number=0x001, + ... manufacturer_code=0x123, + ... industry_group=2, + ... device_class=4, + ... ) + >>> pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSAddressClaimed(node_name=nn) + """ + name = (identity_number & 0x1FFFFF) + name |= (manufacturer_code & 0x7FF) << 21 + name |= (ecu_instance & 0x7) << 32 + name |= (function_instance & 0x1F) << 35 + name |= (function & 0xFF) << 40 + name |= (reserved_name & 0x1) << 48 + name |= (device_class & 0x7F) << 49 + name |= (device_class_instance & 0xF) << 56 + name |= (industry_group & 0x7) << 60 + name |= (self_configurable_address & 0x1) << 63 + return name + + +class ISOBUS(CAN): + """ISOBUS (ISO 11783 / SAE J1939) CAN frame with J1939 addressing. + + Extends the CAN layer by parsing the 29-bit identifier as individual + J1939 header sub-fields: + + - **priority**: 3-bit message priority (0=highest, 7=lowest, default 6) + - **reserved**: 1-bit reserved field (should be 0) + - **data_page**: 1-bit data page selector + - **pdu_format**: 8-bit PDU format (determines PDU type and PGN) + - **pdu_specific**: 8-bit destination address (PDU1) or group extension (PDU2) + - **source_address**: 8-bit source address of the sending node + - **length**: number of data bytes in this CAN frame (0-8) + - **reserved2**: 3-byte reserved field (CAN frame structure) + + PDU1 (pdu_format < 0xF0): peer-to-peer message, pdu_specific is the + destination address. PGN = (reserved << 17) | (data_page << 16) | (pdu_format << 8). + + PDU2 (pdu_format >= 0xF0): broadcast message, pdu_specific is the group + extension (part of PGN). PGN = (reserved << 17) | (data_page << 16) | + (pdu_format << 8) | pdu_specific. + + Use the :attr:`pgn` property to compute the PGN from the header fields. + + Example - Build a Request for PGN message:: + + >>> pkt = ISOBUS(priority=6, pdu_format=0xEA, pdu_specific=0xFF, + ... source_address=0x80, length=3) + >>> pkt /= ISOBUSRequestForPGN(requested_pgn=0x00EE00) + + Example - Build an Address Claimed message:: + + >>> nn = build_isobus_name(identity_number=0x001, + ... manufacturer_code=0x123, + ... industry_group=2, device_class=4) + >>> pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSAddressClaimed(node_name=nn) + + Example - Dissect a raw ISOBUS frame:: + + >>> raw_frame = b'\\xc8\\xee\\xff\\x80\\x08\\x00\\x00\\x00' + b'\\x00' * 8 + >>> pkt = ISOBUS(raw_frame) + >>> pkt.pdu_format # 0xEE = Address Claimed PGN + 238 + >>> pkt.source_address # 0x80 + 128 + """ + + name = "ISOBUS" + + # The 29-bit CAN identifier is split into J1939 sub-fields. + # Bit layout of the first 4 bytes (32 bits total): + # bits 31-29: flags (3 bits) + # bits 28-26: priority (3 bits) + # bit 25: reserved (1 bit) + # bit 24: data_page (1 bit) + # bits 23-16: pdu_format (8 bits) + # bits 15-8: pdu_specific (8 bits) + # bits 7-0: source_address (8 bits) + fields_desc = [ + FlagsField('flags', 0, 3, ['error', + 'remote_transmission_request', + 'extended']), + BitField('priority', 6, 3), + BitField('reserved', 0, 1), + BitField('data_page', 0, 1), + ByteField('pdu_format', 0), + XByteField('pdu_specific', 0), + XByteField('source_address', ISOBUS_ADDR_NULL), + ByteField('length', 8), + ThreeBytesField('reserved2', 0), + ] + + def extract_padding(self, p): + # type: (bytes) -> Tuple[bytes, Optional[bytes]] + """Pass CAN data bytes (up to ``length``) to the next protocol layer.""" + return p[:self.length], None + + @property + def pgn(self): + # type: () -> int + """Compute the Parameter Group Number (PGN) from the J1939 header. + + For PDU1 (pdu_format < 0xF0): the destination address is NOT part + of the PGN. PGN = (reserved << 17) | (data_page << 16) | (pdu_format << 8). + + For PDU2 (pdu_format >= 0xF0): the group extension IS part of the PGN. + PGN = (reserved << 17) | (data_page << 16) | (pdu_format << 8) | pdu_specific. + """ + r = self.reserved & 0x1 + dp = self.data_page & 0x1 + pf = self.pdu_format & 0xFF + if pf < 0xF0: + return (r << 17) | (dp << 16) | (pf << 8) + else: + ps = self.pdu_specific & 0xFF + return (r << 17) | (dp << 16) | (pf << 8) | ps + + +class ISOBUSRequestForPGN(Packet): + """Request for PGN message (PGN 0x00EA00). + + Requests a specific PGN from another node. The 3-byte requested PGN + is encoded in little-endian byte order. Bytes 4-8 are set to 0xFF. + + Example:: + + >>> pkt = ISOBUS(priority=6, pdu_format=0xEA, pdu_specific=0xFF, + ... source_address=0x80, length=3) + >>> pkt /= ISOBUSRequestForPGN(requested_pgn=0x00EE00) + """ + + name = "ISOBUS Request for PGN" + fields_desc = [ + XLE3BytesField('requested_pgn', 0), + ] + + +class ISOBUSAcknowledgment(Packet): + """Acknowledgment message (PGN 0x00E800). + + Sent in response to PGN requests or commands. Byte 1 indicates the + acknowledgment type; bytes 2-8 provide additional context. + + Example:: + + >>> pkt = ISOBUS(priority=6, pdu_format=0xE8, pdu_specific=0xFF, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSAcknowledgment(control_byte=0, + ... group_function_value=0xFF, + ... pgn=0x00EE00) + """ + + name = "ISOBUS Acknowledgment" + fields_desc = [ + ByteEnumField('control_byte', 0, ISOBUS_ACK_CONTROL), + ByteField('group_function_value', 0xFF), + # Bytes 3-5: reserved (0xFF) + ByteField('reserved_ack_b3', 0xFF), + ByteField('reserved_ack_b4', 0xFF), + ByteField('reserved_ack_b5', 0xFF), + # Bytes 6-8: PGN being acknowledged (little-endian) + XLE3BytesField('pgn', 0), + ] + + +class ISOBUSAddressClaimed(Packet): + """Address Claimed / Cannot Claim Address (PGN 0x00EE00). + + Contains the J1939/ISOBUS NAME of the claiming node. The NAME is an + 8-byte little-endian value encoding the node's identity. + + Use :func:`build_isobus_name` to construct the NAME value from individual + component fields. + + NAME bit layout (64-bit little-endian integer): + + ========================= ===== ========================================= + Field Bits Description + ========================= ===== ========================================= + identity_number 0-20 21-bit unique identity per manufacturer + manufacturer_code 21-31 11-bit SAE-assigned manufacturer code + ecu_instance 32-34 3-bit ECU instance + function_instance 35-39 5-bit function instance + function 40-47 8-bit function code (industry-specific) + reserved_name 48 1-bit reserved + device_class 49-55 7-bit device class + device_class_instance 56-59 4-bit device class instance + industry_group 60-62 3-bit industry group + self_configurable_address 63 1-bit self-configurable address flag + ========================= ===== ========================================= + + Subfield access via properties:: + + >>> pkt = ISOBUSAddressClaimed( + ... node_name=build_isobus_name(identity_number=0x001, + ... manufacturer_code=0x123, + ... industry_group=2, device_class=4)) + >>> pkt.identity_number + 1 + >>> pkt.manufacturer_code + 291 + >>> pkt.industry_group + 2 + """ + + name = "ISOBUS Address Claimed" + fields_desc = [ + LELongField('node_name', 0), + ] + + @property + def identity_number(self): + # type: () -> int + """Extract the 21-bit Identity Number from the NAME.""" + return self.node_name & 0x1FFFFF + + @property + def manufacturer_code(self): + # type: () -> int + """Extract the 11-bit Manufacturer Code from the NAME.""" + return (self.node_name >> 21) & 0x7FF + + @property + def ecu_instance(self): + # type: () -> int + """Extract the 3-bit ECU Instance from the NAME.""" + return (self.node_name >> 32) & 0x7 + + @property + def function_instance(self): + # type: () -> int + """Extract the 5-bit Function Instance from the NAME.""" + return (self.node_name >> 35) & 0x1F + + @property + def function(self): + # type: () -> int + """Extract the 8-bit Function from the NAME.""" + return (self.node_name >> 40) & 0xFF + + @property + def reserved_name(self): + # type: () -> int + """Extract the 1-bit Reserved field from the NAME.""" + return (self.node_name >> 48) & 0x1 + + @property + def device_class(self): + # type: () -> int + """Extract the 7-bit Device Class from the NAME.""" + return (self.node_name >> 49) & 0x7F + + @property + def device_class_instance(self): + # type: () -> int + """Extract the 4-bit Device Class Instance from the NAME.""" + return (self.node_name >> 56) & 0xF + + @property + def industry_group(self): + # type: () -> int + """Extract the 3-bit Industry Group from the NAME.""" + return (self.node_name >> 60) & 0x7 + + @property + def self_configurable_address(self): + # type: () -> int + """Extract the Self-Configurable Address bit from the NAME.""" + return (self.node_name >> 63) & 0x1 + + +class ISOBUSCommandedAddress(Packet): + """Commanded Address message (PGN 0x00FED8). + + Instructs a specific node (identified by NAME) to use a new address. + + Example:: + + >>> nn = build_isobus_name(identity_number=0x001, + ... manufacturer_code=0x123) + >>> pkt = ISOBUS(priority=6, pdu_format=0xFE, pdu_specific=0xD8, + ... source_address=0x26, length=9) + >>> pkt /= ISOBUSCommandedAddress(node_name=nn, new_source_address=0x42) + """ + + name = "ISOBUS Commanded Address" + fields_desc = [ + LELongField('node_name', 0), + XByteField('new_source_address', 0), + ] + + +class ISOBUSWorkingSetMaster(Packet): + """Working Set Master message (PGN 0x00E600). + + Sent periodically by the Working Set Master (WSM) to announce itself + and the number of members in the Working Set (ISO 11783-7). + + Example:: + + >>> pkt = ISOBUS(priority=7, pdu_format=0xE6, pdu_specific=0xFF, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSWorkingSetMaster(number_of_members=3) + """ + + name = "ISOBUS Working Set Master" + fields_desc = [ + ByteField('number_of_members', 1), + ] + + +class ISOBUSWorkingSetMember(Packet): + """Working Set Member message (PGN 0x00E700). + + Sent by each Working Set Member to announce itself to the Working Set + Master during Working Set formation (ISO 11783-7). + + Example:: + + >>> nn = build_isobus_name(identity_number=0x002, + ... manufacturer_code=0x123) + >>> pkt = ISOBUS(priority=7, pdu_format=0xE7, pdu_specific=0x80, + ... source_address=0x81, length=8) + >>> pkt /= ISOBUSWorkingSetMember(node_name=nn) + """ + + name = "ISOBUS Working Set Member" + fields_desc = [ + LELongField('node_name', 0), + ] + + +class ISOBUSTransportProtocolCM(Packet): + """Transport Protocol - Connection Management (TP.CM) (PGN 0x00EC00). + + Manages multi-packet message transfers (9 to 1785 bytes). Supports five + message types selected by ``control_byte``: + + - **BAM** (32): Broadcast Announce Message, announces a broadcast transfer + - **RTS** (16): Request to Send, initiates a peer-to-peer transfer + - **CTS** (17): Clear to Send, authorises the sender to transmit packets + - **EOM ACK** (19): End of Message Acknowledgment, confirms transfer complete + - **Abort** (255): Aborts an in-progress connection + + Example - BAM:: + + >>> pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0xFF, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSTransportProtocolCM( + ... control_byte=32, + ... total_message_size=20, + ... total_number_of_packets=3, + ... pgn=0x00FED8) + + Example - RTS:: + + >>> pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0x42, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSTransportProtocolCM( + ... control_byte=16, + ... total_message_size=20, + ... total_number_of_packets=3, + ... max_packets_per_cts=3, + ... pgn=0x00FED8) + """ + + name = "ISOBUS TP.CM" + + # Convenience references for control_byte values + BAM = 32 + RTS = 16 + CTS = 17 + EOM_ACK = 19 + ABORT = 255 + + fields_desc = [ + ByteEnumField('control_byte', 32, ISOBUS_TP_CM_CONTROL), + + # BAM / RTS / EOM ACK: total_message_size (bytes 2-3, LE 16-bit) + ConditionalField( + LEShortField('total_message_size', 0), + lambda pkt: pkt.control_byte in (16, 19, 32) + ), + # BAM / RTS / EOM ACK: total_number_of_packets (byte 4) + ConditionalField( + ByteField('total_number_of_packets', 0), + lambda pkt: pkt.control_byte in (16, 19, 32) + ), + # RTS only: max packets per CTS (byte 5); 0xFF = no limit + ConditionalField( + ByteField('max_packets_per_cts', 0xFF), + lambda pkt: pkt.control_byte == 16 + ), + # BAM / EOM ACK: reserved byte 5 (0xFF) + ConditionalField( + ByteField('reserved_b5', 0xFF), + lambda pkt: pkt.control_byte in (19, 32) + ), + + # CTS: number of packets that can be sent (byte 2) + ConditionalField( + ByteField('number_of_packets_cts', 0), + lambda pkt: pkt.control_byte == 17 + ), + # CTS: next packet number to be sent (byte 3, starts at 1) + ConditionalField( + ByteField('next_packet_number', 1), + lambda pkt: pkt.control_byte == 17 + ), + # CTS: reserved bytes 4-5 (0xFF 0xFF) + ConditionalField( + ByteField('reserved_cts_b4', 0xFF), + lambda pkt: pkt.control_byte == 17 + ), + ConditionalField( + ByteField('reserved_cts_b5', 0xFF), + lambda pkt: pkt.control_byte == 17 + ), + + # Abort: connection abort reason (byte 2) + ConditionalField( + ByteEnumField('abort_reason', 1, ISOBUS_TP_CM_ABORT_REASONS), + lambda pkt: pkt.control_byte == 255 + ), + # Abort: reserved bytes 3-5 (0xFF 0xFF 0xFF) + ConditionalField( + ByteField('reserved_abort_b3', 0xFF), + lambda pkt: pkt.control_byte == 255 + ), + ConditionalField( + ByteField('reserved_abort_b4', 0xFF), + lambda pkt: pkt.control_byte == 255 + ), + ConditionalField( + ByteField('reserved_abort_b5', 0xFF), + lambda pkt: pkt.control_byte == 255 + ), + + # Bytes 6-8: PGN of the multi-packet message (little-endian) + XLE3BytesField('pgn', 0), + ] + + +class ISOBUSTransportProtocolDT(Packet): + """Transport Protocol - Data Transfer (TP.DT) (PGN 0x00EB00). + + Carries individual data packets in a multi-packet message transfer. + Each packet carries up to 7 bytes of data and a sequence number. + + Sequence numbers start at 1 and increment for each packet. The last + packet is padded to 7 bytes with 0xFF if the data does not fill it. + + Example:: + + >>> pkt = ISOBUS(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + ... source_address=0x80, length=8) + >>> pkt /= ISOBUSTransportProtocolDT( + ... sequence_number=1, + ... data=b'\\x01\\x02\\x03\\x04\\x05\\x06\\x07') + """ + + name = "ISOBUS TP.DT" + fields_desc = [ + ByteField('sequence_number', 1), + # 7 bytes of data payload (padded with 0xFF if the message is shorter) + StrFixedLenField('data', b'\xff' * 7, length=7), + ] + + +# --------------------------------------------------------------------------- +# Layer bindings: dispatch ISOBUS data to PGN-specific handlers +# --------------------------------------------------------------------------- + +# PDU1 PGNs (pdu_format < 0xF0): pdu_specific is the destination address +bind_layers(ISOBUS, ISOBUSWorkingSetMaster, pdu_format=0xE6) +bind_layers(ISOBUS, ISOBUSWorkingSetMember, pdu_format=0xE7) +bind_layers(ISOBUS, ISOBUSAcknowledgment, pdu_format=0xE8) +bind_layers(ISOBUS, ISOBUSRequestForPGN, pdu_format=0xEA) +bind_layers(ISOBUS, ISOBUSTransportProtocolDT, pdu_format=0xEB) +bind_layers(ISOBUS, ISOBUSTransportProtocolCM, pdu_format=0xEC) +bind_layers(ISOBUS, ISOBUSAddressClaimed, pdu_format=0xEE) + +# PDU2 PGN (pdu_format >= 0xF0): pdu_specific is the group extension +# PGN 0x00FED8: Commanded Address (pdu_format=0xFE, pdu_specific=0xD8) +bind_layers(ISOBUS, ISOBUSCommandedAddress, pdu_format=0xFE, pdu_specific=0xD8) diff --git a/test/contrib/automotive/isobus.uts b/test/contrib/automotive/isobus.uts new file mode 100644 index 00000000000..47997d4dab0 --- /dev/null +++ b/test/contrib/automotive/isobus.uts @@ -0,0 +1,480 @@ +% Regression tests for the ISOBUS (ISO 11783) layer + +############ +############ + ++ Basic operations + += Load module +load_contrib("automotive.isobus", globals_dict=globals()) + += Build a minimal ISOBUS header (PDU1 - Address Claimed) + +pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + source_address=0x80, length=8) + +assert pkt.priority == 6 +assert pkt.reserved == 0 +assert pkt.data_page == 0 +assert pkt.pdu_format == 0xEE +assert pkt.pdu_specific == 0xFF +assert pkt.source_address == 0x80 +assert pkt.length == 8 + += Check ISOBUS header byte encoding + +pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + source_address=0x80, length=8) +raw = bytes(pkt) + +# Byte 0: flags(000) + priority(110=6) + reserved(0) + data_page(0) = 0x18 +assert raw[0] == 0x18 +# Byte 1: pdu_format = 0xEE +assert raw[1] == 0xEE +# Byte 2: pdu_specific = 0xFF +assert raw[2] == 0xFF +# Byte 3: source_address = 0x80 +assert raw[3] == 0x80 +# Byte 4: length = 8 +assert raw[4] == 0x08 + += PGN computation for PDU1 (pdu_format < 0xF0) + +pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, source_address=0x80) +# PDU1: PGN = (reserved=0 << 17) | (data_page=0 << 16) | (pdu_format=0xEE << 8) +assert pkt.pgn == 0x00EE00 + +pkt2 = ISOBUS(priority=6, pdu_format=0xEA, pdu_specific=0x42, source_address=0x80) +# Destination address (0x42) is NOT part of the PGN for PDU1 +assert pkt2.pgn == 0x00EA00 + += PGN computation for PDU2 (pdu_format >= 0xF0) + +pkt = ISOBUS(priority=6, pdu_format=0xFE, pdu_specific=0xD8, source_address=0x26) +# PDU2: PGN = (reserved=0 << 17) | (data_page=0 << 16) | (pdu_format=0xFE << 8) | pdu_specific=0xD8 +assert pkt.pgn == 0x00FED8 + +pkt2 = ISOBUS(priority=6, data_page=1, pdu_format=0xF0, pdu_specific=0x01, source_address=0x80) +assert pkt2.pgn == 0x01F001 + += ISOBUS inherits CAN swap-bytes and byte-swap logic + +conf.contribs['CAN']['swap-bytes'] = False +pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, source_address=0x80, length=8) +raw_normal = bytes(pkt) + +conf.contribs['CAN']['swap-bytes'] = True +raw_swapped = bytes(pkt) +assert raw_normal != raw_swapped + +# Restore default +conf.contribs['CAN']['swap-bytes'] = False + +############ +############ + ++ build_isobus_name helper function + += Build NAME with all components + +name_val = build_isobus_name( + identity_number=0x001, + manufacturer_code=0x123, + ecu_instance=0, + function_instance=0, + function=0, + reserved_name=0, + device_class=4, + device_class_instance=0, + industry_group=2, + self_configurable_address=1, +) + +# Verify each component is encoded correctly +assert (name_val & 0x1FFFFF) == 0x001 # identity_number +assert ((name_val >> 21) & 0x7FF) == 0x123 # manufacturer_code +assert ((name_val >> 32) & 0x7) == 0 # ecu_instance +assert ((name_val >> 35) & 0x1F) == 0 # function_instance +assert ((name_val >> 40) & 0xFF) == 0 # function +assert ((name_val >> 48) & 0x1) == 0 # reserved_name +assert ((name_val >> 49) & 0x7F) == 4 # device_class +assert ((name_val >> 56) & 0xF) == 0 # device_class_instance +assert ((name_val >> 60) & 0x7) == 2 # industry_group +assert ((name_val >> 63) & 0x1) == 1 # self_configurable_address + += Build NAME with default values + +name_default = build_isobus_name() +assert (name_default & 0x1FFFFF) == 0 # identity_number=0 +assert ((name_default >> 21) & 0x7FF) == 0 # manufacturer_code=0 +assert ((name_default >> 60) & 0x7) == 2 # industry_group=2 (default) +assert ((name_default >> 63) & 0x1) == 1 # self_configurable_address=1 (default) + += NAME field masking (out-of-range values are masked) + +name_val = build_isobus_name(identity_number=0x3FFFFF) # 22 bits, should mask to 21 +assert (name_val & 0x1FFFFF) == (0x3FFFFF & 0x1FFFFF) + +############ +############ + ++ ISOBUSAddressClaimed (PGN 0x00EE00) + += Build Address Claimed packet + +nn = build_isobus_name( + identity_number=0x001, + manufacturer_code=0x123, + industry_group=2, + device_class=4, +) +hdr = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + source_address=0x80, length=8) +pkt = hdr / ISOBUSAddressClaimed(node_name=nn) + +assert bytes(pkt)[1] == 0xEE # pdu_format +assert len(bytes(pkt)) == 16 # 8 header + 8 NAME + += Dissect Address Claimed packet + +nn = build_isobus_name( + identity_number=0x001, + manufacturer_code=0x123, + industry_group=2, + device_class=4, +) +raw = bytes(ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + source_address=0x80, length=8) / ISOBUSAddressClaimed(node_name=nn)) +parsed = ISOBUS(raw) + +assert isinstance(parsed.payload, ISOBUSAddressClaimed) +assert parsed.payload.identity_number == 0x001 +assert parsed.payload.manufacturer_code == 0x123 +assert parsed.payload.industry_group == 2 +assert parsed.payload.device_class == 4 + += ISOBUSAddressClaimed NAME property access + +nn = build_isobus_name( + identity_number=0x7F001, + manufacturer_code=0x5A3, + ecu_instance=2, + function_instance=5, + function=130, + reserved_name=0, + device_class=71, + device_class_instance=3, + industry_group=2, + self_configurable_address=1, +) +acl = ISOBUSAddressClaimed(node_name=nn) + +assert acl.identity_number == 0x7F001 +assert acl.manufacturer_code == 0x5A3 +assert acl.ecu_instance == 2 +assert acl.function_instance == 5 +assert acl.function == 130 +assert acl.reserved_name == 0 +assert acl.device_class == 71 +assert acl.device_class_instance == 3 +assert acl.industry_group == 2 +assert acl.self_configurable_address == 1 + += ISOBUSAddressClaimed round-trip + +nn = build_isobus_name(identity_number=0x12345, manufacturer_code=0x7FF, + industry_group=3, device_class=10) +pkt = ISOBUS(priority=6, pdu_format=0xEE, pdu_specific=0xFF, + source_address=0x80, length=8) / ISOBUSAddressClaimed(node_name=nn) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSAddressClaimed) +assert parsed.payload.node_name == nn +assert parsed.payload.identity_number == 0x12345 +assert parsed.payload.manufacturer_code == 0x7FF +assert parsed.payload.industry_group == 3 + +############ +############ + ++ ISOBUSRequestForPGN (PGN 0x00EA00) + += Build Request for PGN + +pkt = ISOBUS(priority=6, pdu_format=0xEA, pdu_specific=0xFF, + source_address=0x80, length=3) / ISOBUSRequestForPGN(requested_pgn=0x00EE00) + +raw = bytes(pkt) +assert raw[1] == 0xEA # pdu_format + += Dissect Request for PGN + +raw = bytes(ISOBUS(priority=6, pdu_format=0xEA, pdu_specific=0xFF, + source_address=0x80, length=3) / ISOBUSRequestForPGN(requested_pgn=0x00EE00)) +parsed = ISOBUS(raw) + +assert isinstance(parsed.payload, ISOBUSRequestForPGN) +assert parsed.payload.requested_pgn == 0x00EE00 + += Request for PGN with different PGNs + +for pgn in [0x00E800, 0x00EA00, 0x00EC00, 0x00EE00, 0x00FED8]: + pkt = ISOBUS(priority=6, pdu_format=0xEA, pdu_specific=0xFF, + source_address=0x80, length=3) / ISOBUSRequestForPGN(requested_pgn=pgn) + parsed = ISOBUS(bytes(pkt)) + assert isinstance(parsed.payload, ISOBUSRequestForPGN) + assert parsed.payload.requested_pgn == pgn + +############ +############ + ++ ISOBUSAcknowledgment (PGN 0x00E800) + += Build Acknowledgment (Positive ACK) + +pkt = ISOBUS(priority=6, pdu_format=0xE8, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSAcknowledgment(control_byte=0, group_function_value=0xFF, + pgn=0x00EE00) + +raw = bytes(pkt) +assert raw[1] == 0xE8 # pdu_format + += Dissect Acknowledgment (Negative ACK) + +pkt = ISOBUS(priority=6, pdu_format=0xE8, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSAcknowledgment(control_byte=1, group_function_value=0xFF, + pgn=0x00EE00) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSAcknowledgment) +assert parsed.payload.control_byte == 1 +assert parsed.payload.pgn == 0x00EE00 + +############ +############ + ++ ISOBUSTransportProtocolCM (PGN 0x00EC00) + += Build TP.CM BAM (Broadcast Announce Message) + +pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSTransportProtocolCM(control_byte=32, + total_message_size=20, + total_number_of_packets=3, + pgn=0x00FED8) + +raw = bytes(pkt) +assert raw[1] == 0xEC # pdu_format +assert len(raw) == 16 # 8-byte header + 8-byte TP.CM + += Dissect TP.CM BAM + +pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSTransportProtocolCM(control_byte=32, + total_message_size=20, + total_number_of_packets=3, + pgn=0x00FED8) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSTransportProtocolCM) +assert parsed.payload.control_byte == 32 +assert parsed.payload.total_message_size == 20 +assert parsed.payload.total_number_of_packets == 3 +assert parsed.payload.pgn == 0x00FED8 + += Build and dissect TP.CM RTS (Request to Send) + +pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0x42, + source_address=0x80, length=8) / \ + ISOBUSTransportProtocolCM(control_byte=16, + total_message_size=20, + total_number_of_packets=3, + max_packets_per_cts=3, + pgn=0x00EE00) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSTransportProtocolCM) +assert parsed.payload.control_byte == 16 +assert parsed.payload.total_message_size == 20 +assert parsed.payload.total_number_of_packets == 3 +assert parsed.payload.max_packets_per_cts == 3 +assert parsed.payload.pgn == 0x00EE00 + += Build and dissect TP.CM CTS (Clear to Send) + +pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0x80, + source_address=0x42, length=8) / \ + ISOBUSTransportProtocolCM(control_byte=17, + number_of_packets_cts=3, + next_packet_number=1, + pgn=0x00EE00) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSTransportProtocolCM) +assert parsed.payload.control_byte == 17 +assert parsed.payload.number_of_packets_cts == 3 +assert parsed.payload.next_packet_number == 1 +assert parsed.payload.pgn == 0x00EE00 + += Build and dissect TP.CM EOM ACK (End of Message Acknowledgment) + +pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0x80, + source_address=0x42, length=8) / \ + ISOBUSTransportProtocolCM(control_byte=19, + total_message_size=20, + total_number_of_packets=3, + pgn=0x00EE00) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSTransportProtocolCM) +assert parsed.payload.control_byte == 19 +assert parsed.payload.total_message_size == 20 +assert parsed.payload.total_number_of_packets == 3 +assert parsed.payload.pgn == 0x00EE00 + += Build and dissect TP.CM Connection Abort + +pkt = ISOBUS(priority=7, pdu_format=0xEC, pdu_specific=0x42, + source_address=0x80, length=8) / \ + ISOBUSTransportProtocolCM(control_byte=255, + abort_reason=3, + pgn=0x00EE00) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSTransportProtocolCM) +assert parsed.payload.control_byte == 255 +assert parsed.payload.abort_reason == 3 +assert parsed.payload.pgn == 0x00EE00 + +############ +############ + ++ ISOBUSTransportProtocolDT (PGN 0x00EB00) + += Build TP.DT (Data Transfer) packet + +pkt = ISOBUS(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSTransportProtocolDT(sequence_number=1, + data=b'\x01\x02\x03\x04\x05\x06\x07') + +raw = bytes(pkt) +assert raw[1] == 0xEB # pdu_format +assert len(raw) == 16 # 8-byte header + 8-byte TP.DT + += Dissect TP.DT packet + +pkt = ISOBUS(priority=7, pdu_format=0xEB, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSTransportProtocolDT(sequence_number=2, + data=b'\x08\x09\x0A\x0B\x0C\x0D\x0E') +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSTransportProtocolDT) +assert parsed.payload.sequence_number == 2 +assert parsed.payload.data == b'\x08\x09\x0A\x0B\x0C\x0D\x0E' + +############ +############ + ++ ISOBUSCommandedAddress (PGN 0x00FED8) + += Build Commanded Address packet + +nn = build_isobus_name(identity_number=0x001, manufacturer_code=0x123) +pkt = ISOBUS(priority=6, pdu_format=0xFE, pdu_specific=0xD8, + source_address=0x26, length=9) / \ + ISOBUSCommandedAddress(node_name=nn, new_source_address=0x42) + +raw = bytes(pkt) +assert raw[1] == 0xFE # pdu_format +assert raw[2] == 0xD8 # pdu_specific +assert len(raw) == 17 # 8-byte header + 9-byte payload + += Dissect Commanded Address packet + +nn = build_isobus_name(identity_number=0x001, manufacturer_code=0x123) +pkt = ISOBUS(priority=6, pdu_format=0xFE, pdu_specific=0xD8, + source_address=0x26, length=9) / \ + ISOBUSCommandedAddress(node_name=nn, new_source_address=0x42) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSCommandedAddress) +assert parsed.payload.node_name == nn +assert parsed.payload.new_source_address == 0x42 + +############ +############ + ++ ISOBUSWorkingSetMaster (PGN 0x00E600) + += Build and dissect Working Set Master + +pkt = ISOBUS(priority=7, pdu_format=0xE6, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSWorkingSetMaster(number_of_members=3) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSWorkingSetMaster) +assert parsed.payload.number_of_members == 3 + += Working Set Master with different member counts + +for count in [1, 5, 10, 255]: + pkt = ISOBUS(priority=7, pdu_format=0xE6, pdu_specific=0xFF, + source_address=0x80, length=8) / \ + ISOBUSWorkingSetMaster(number_of_members=count) + parsed = ISOBUS(bytes(pkt)) + assert parsed.payload.number_of_members == count + +############ +############ + ++ ISOBUSWorkingSetMember (PGN 0x00E700) + += Build and dissect Working Set Member + +nn = build_isobus_name(identity_number=0x002, manufacturer_code=0x123) +pkt = ISOBUS(priority=7, pdu_format=0xE7, pdu_specific=0x80, + source_address=0x81, length=8) / \ + ISOBUSWorkingSetMember(node_name=nn) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSWorkingSetMember) +assert parsed.payload.node_name == nn + +############ +############ + ++ Layer bindings and automatic dissection + += Automatic dissection by pdu_format (PDU1 messages) + +for pdu_fmt, expected_cls in [ + (0xE6, ISOBUSWorkingSetMaster), + (0xE7, ISOBUSWorkingSetMember), + (0xE8, ISOBUSAcknowledgment), + (0xEA, ISOBUSRequestForPGN), + (0xEB, ISOBUSTransportProtocolDT), + (0xEC, ISOBUSTransportProtocolCM), + (0xEE, ISOBUSAddressClaimed), +]: + pkt = ISOBUS(priority=6, pdu_format=pdu_fmt, pdu_specific=0xFF, + source_address=0x80, length=8) / expected_cls() + parsed = ISOBUS(bytes(pkt)) + assert type(parsed.payload) == expected_cls, \ + "pdu_format=0x%02X: expected %s, got %s" % (pdu_fmt, expected_cls.__name__, type(parsed.payload).__name__) + += Automatic dissection for PDU2 Commanded Address (pdu_format=0xFE, pdu_specific=0xD8) + +nn = build_isobus_name(identity_number=0x001, manufacturer_code=0x123) +pkt = ISOBUS(priority=6, pdu_format=0xFE, pdu_specific=0xD8, + source_address=0x26, length=9) / \ + ISOBUSCommandedAddress(node_name=nn, new_source_address=0x42) +parsed = ISOBUS(bytes(pkt)) + +assert isinstance(parsed.payload, ISOBUSCommandedAddress)