diff --git a/tests/test_types.py b/tests/test_types.py index d68955d..bf0016d 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -81,3 +81,30 @@ def test_nwk(): assert str(nwk) == "0x1234" assert repr(nwk) == "0x1234" + + +def test_iosample(): + data = b"\x01\x55\x55\x85\x11\x11\x01\x55\x02\xAA\x0c\xe9" + sample_report, rest = t.IOSample.deserialize(data) + + assert sample_report == { + "digital_samples": [ + 1, + None, + 0, + None, + 1, + None, + 0, + None, + 1, + None, + 0, + None, + 1, + None, + 0, + ], + "analog_samples": [341, None, 682, None, None, None, None, 3305], + } + assert rest == b"" diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index 6e488b0..5489523 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -221,7 +221,7 @@ class ModemStatus(t.uint8_t, t.UndefinedEnum): "CB": t.uint8_t, "ND": t, # "optional 2-Byte NI value" "DN": t.Bytes, # "up to 20-Byte printable ASCII string" - "IS": None, + "IS": t.IOSample, "1S": None, "AS": None, # Stuff I've guessed diff --git a/zigpy_xbee/types.py b/zigpy_xbee/types.py index 8250239..8bd4400 100644 --- a/zigpy_xbee/types.py +++ b/zigpy_xbee/types.py @@ -247,3 +247,71 @@ class TXOptions(zigpy.types.bitmap8): Disable_Retries_and_Route_Repair = 0x01 Enable_APS_Encryption = 0x20 Use_Extended_TX_Timeout = 0x40 + + +class IOSample(dict): + """Parse an XBee IO sample report.""" + + serialize = None + + @classmethod + def deserialize(cls, data): + """Deserialize an xbee IO sample report. + + xbee digital sample format + Sample set count byte 0 + Digital mask byte 1, 2 + Analog mask byte 3 + Digital samples byte 4, 5 (if any sample exists) + Analog Sample, 2 bytes per + """ + sample_sets = int.from_bytes(data[0:1], byteorder="big") + if sample_sets != 1: + raise ValueError("Number of sets is not 1") + digital_mask = data[1:3] + analog_mask = data[3:4] + digital_sample = data[4:6] + num_bits = 15 + digital_pins = [ + (int.from_bytes(digital_mask, byteorder="big") >> bit) & 1 + for bit in range(num_bits - 1, -1, -1) + ] + digital_pins = list(reversed(digital_pins)) + analog_pins = [ + (int.from_bytes(analog_mask, byteorder="big") >> bit) & 1 + for bit in range(8 - 1, -1, -1) + ] + analog_pins = list(reversed(analog_pins)) + if 1 in digital_pins: + digital_samples = [ + (int.from_bytes(digital_sample, byteorder="big") >> bit) & 1 + for bit in range(num_bits - 1, -1, -1) + ] + digital_samples = list(reversed(digital_samples)) + sample_index = 6 + else: + # skip digital samples block + digital_samples = digital_pins + sample_index = 4 + analog_samples = [] + for apin in analog_pins: + if apin == 1: + analog_samples.append( + int.from_bytes( + data[sample_index : sample_index + 2], byteorder="big" + ) + ) + sample_index += 2 + else: + analog_samples.append(None) + for dpin in range(len(digital_pins)): + if digital_pins[dpin] == 0: + digital_samples[dpin] = None + + return ( + { + "digital_samples": digital_samples, + "analog_samples": analog_samples, + }, + data[sample_index:], + )