diff --git a/pyshimmer/dev/channels.py b/pyshimmer/dev/channels.py index 3f31564..877d94a 100644 --- a/pyshimmer/dev/channels.py +++ b/pyshimmer/dev/channels.py @@ -482,6 +482,7 @@ class ESensorGroup(Enum): ESensorGroup.EXG1_16BIT: 19, ESensorGroup.EXG2_24BIT: 20, ESensorGroup.EXG2_16BIT: 21, + ESensorGroup.TEMP: 22, } ENABLED_SENSORS_LEN = 0x03 diff --git a/pyshimmer/dev/revision.py b/pyshimmer/dev/revision.py new file mode 100644 index 0000000..53455ce --- /dev/null +++ b/pyshimmer/dev/revision.py @@ -0,0 +1,250 @@ +import operator +from abc import ABC, abstractmethod +from collections.abc import Iterable +from functools import reduce +from typing import overload + +import numpy as np + +from pyshimmer.dev.channels import EChannelType, ChannelDataType, ESensorGroup +from pyshimmer.util import bit_is_set, flatten_list + + +class HardwareRevision(ABC): + + @abstractmethod + def sr2dr(self, sr: float) -> int: + """Calculate equivalent device-specific rate for a sample rate in Hz + + Device-specific sample rates are given in absolute clock ticks per unit of time. + This function can be used to calculate such a rate for the Shimmer3. + + :param sr: The sampling rate in Hz + :return: An integer which represents the equivalent device-specific sampling rate + """ + pass + + @abstractmethod + def dr2sr(self, dr: int) -> float: + """Calculate equivalent sampling rate for a given device-specific rate + + Device-specific sample rates are given in absolute clock ticks per unit of time. + This function can be used to calculate a regular sampling rate in Hz from such a + rate. + + :param dr: The absolute device rate as integer + :return: A floating-point number that represents the sampling rate in Hz + """ + pass + + @overload + def sec2ticks(self, t_sec: float) -> int: ... + + @overload + def sec2ticks(self, t_sec: np.ndarray) -> np.ndarray: ... + + @abstractmethod + def sec2ticks(self, t_sec: float | np.ndarray) -> int | np.ndarray: + """Calculate equivalent device clock ticks for a time in seconds + + Args: + t_sec: A time in seconds + Returns: + An integer which represents the equivalent number of clock ticks + """ + pass + + @overload + def ticks2sec(self, t_ticks: int) -> float: ... + + @overload + def ticks2sec(self, t_ticks: np.ndarray) -> np.ndarray: ... + + @abstractmethod + def ticks2sec(self, t_ticks: int | np.ndarray) -> float | np.ndarray: + """Calculate the time in seconds equivalent to a device clock ticks count + + Args: + t_ticks: A clock tick counter for which to calculate the time in seconds + Returns: + A floating point time in seconds that is equivalent to the number of clock ticks + """ + pass + + def get_channel_dtype(self, channel: EChannelType) -> ChannelDataType: + """ + + :param channel: + :return: A list of channel data types with the same order + """ + pass + + @abstractmethod + def get_channel_dtypes( + self, channels: Iterable[EChannelType] + ) -> list[ChannelDataType]: + """Return the channel data types for a set of channels + + :param channels: A list of channels + :return: A list of channel data types with the same order + """ + pass + + @abstractmethod + def get_enabled_channels( + self, sensors: Iterable[ESensorGroup] + ) -> list[EChannelType]: + """Determine the set of data channels for a set of enabled sensors + + There exists a one-to-many mapping between enabled sensors and their corresponding + data channels. This function determines the set of necessary channels for a given + set of enabled sensors. + + :param sensors: A list of sensors that are enabled on a Shimmer + :return: A list of channels in the corresponding order + """ + pass + + @property + @abstractmethod + def sensorlist_size(self) -> int: + pass + + @abstractmethod + def sensors2bitfield(self, sensors: Iterable[ESensorGroup]) -> int: + """Convert an iterable of sensors into the corresponding bitfield transmitted to + the Shimmer + + :param sensors: A list of active sensors + :return: A bitfield that conveys the set of active sensors to the Shimmer + """ + pass + + @abstractmethod + def bitfield2sensors(self, bitfield: int) -> list[ESensorGroup]: + """Decode a bitfield returned from the Shimmer to a list of active sensors + + :param bitfield: The bitfield received from the Shimmer encoding the active sensors + :return: The corresponding list of active sensors + """ + pass + + @abstractmethod + def serialize_sensorlist(self, sensors: Iterable[ESensorGroup]) -> bytes: + """Serialize a list of sensors to the three-byte bitfield accepted by the Shimmer + + :param sensors: The list of sensors + :return: A byte string with length 3 that encodes the sensors + """ + pass + + @abstractmethod + def deserialize_sensorlist(self, bitfield_bin: bytes) -> list[ESensorGroup]: + """Deserialize the list of active sensors from the three-byte input received from + the Shimmer + + :param bitfield_bin: The input bitfield as byte string with length 3 + :return: The list of active sensors + """ + pass + + @abstractmethod + def sort_sensors(self, sensors: Iterable[ESensorGroup]) -> list[ESensorGroup]: + """Sorts the sensors in the list according to the sensor order + + This function is useful to determine the order in which sensor data will appear in + a data file by ordering the list of sensors according to their order in the file. + + :param sensors: An unsorted list of sensors + :return: A list with the same sensors as content but sorted according to their + appearance order in the data file + """ + pass + + +class BaseRevision(HardwareRevision): + + def __init__( + self, + dev_clock_rate: float, + sensor_list_dtype: ChannelDataType, + channel_data_types: dict[EChannelType, ChannelDataType], + sensor_channel_assignment: dict[ESensorGroup, list[EChannelType]], + sensor_bit_assignment: dict[ESensorGroup, int], + sensor_order: dict[ESensorGroup, int], + ): + self._dev_clock_rate = dev_clock_rate + self._sensor_list_dtype = sensor_list_dtype + self._channel_data_types = channel_data_types + self._sensor_channel_assignment = sensor_channel_assignment + self._sensor_bit_assignment = sensor_bit_assignment + self._sensor_order = sensor_order + + def sr2dr(self, sr: float) -> int: + dr_dec = self._dev_clock_rate / sr + return round(dr_dec) + + def dr2sr(self, dr: int) -> float: + return self._dev_clock_rate / dr + + @overload + def sec2ticks(self, t_sec: float) -> int: ... + + @overload + def sec2ticks(self, t_sec: np.ndarray) -> np.ndarray: ... + + def sec2ticks(self, t_sec: float | np.ndarray) -> int | np.ndarray: + return round(t_sec * self._dev_clock_rate) + + @overload + def ticks2sec(self, t_ticks: int) -> float: ... + + @overload + def ticks2sec(self, t_ticks: np.ndarray) -> np.ndarray: ... + + def ticks2sec(self, t_ticks: int | np.ndarray) -> float | np.ndarray: + return t_ticks / self._dev_clock_rate + + def get_channel_dtypes( + self, channels: Iterable[EChannelType] + ) -> list[ChannelDataType]: + dtypes = [self._channel_data_types[ch] for ch in channels] + return dtypes + + def get_enabled_channels( + self, sensors: Iterable[ESensorGroup] + ) -> list[EChannelType]: + channels = [self._sensor_channel_assignment[e] for e in sensors] + return flatten_list(channels) + + @property + def sensorlist_size(self) -> int: + return self._sensor_list_dtype.size + + def sensors2bitfield(self, sensors: Iterable[ESensorGroup]) -> int: + bit_values = [1 << self._sensor_bit_assignment[g] for g in sensors] + return reduce(operator.or_, bit_values) + + def bitfield2sensors(self, bitfield: int) -> list[ESensorGroup]: + enabled_sensors = [] + for sensor in ESensorGroup: + bit_mask = 1 << self._sensor_bit_assignment[sensor] + if bit_is_set(bitfield, bit_mask): + enabled_sensors += [sensor] + + return self.sort_sensors(enabled_sensors) + + def serialize_sensorlist(self, sensors: Iterable[ESensorGroup]) -> bytes: + bitfield = self.sensors2bitfield(sensors) + return self._sensor_list_dtype.encode(bitfield) + + def deserialize_sensorlist(self, bitfield_bin: bytes) -> list[ESensorGroup]: + bitfield = self._sensor_list_dtype.decode(bitfield_bin) + return self.bitfield2sensors(bitfield) + + def sort_sensors(self, sensors: Iterable[ESensorGroup]) -> list[ESensorGroup]: + def sort_key_fn(x): + return self._sensor_order[x] + + sensors_sorted = sorted(sensors, key=sort_key_fn) + return sensors_sorted diff --git a/pyshimmer/dev/revisions/__init__.py b/pyshimmer/dev/revisions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyshimmer/dev/revisions/shimmer3.py b/pyshimmer/dev/revisions/shimmer3.py new file mode 100644 index 0000000..c10d082 --- /dev/null +++ b/pyshimmer/dev/revisions/shimmer3.py @@ -0,0 +1,184 @@ +from ..channels import EChannelType, ChannelDataType, ESensorGroup +from ..revision import BaseRevision + + +class Shimmer3Revision(BaseRevision): + + # Device clock rate in ticks per second + DEV_CLOCK_RATE: float = 32768.0 + ENABLED_SENSORS_LEN = 0x03 + SENSOR_DTYPE = ChannelDataType(size=ENABLED_SENSORS_LEN, signed=False, le=True) + + CH_DTYPE_ASSIGNMENT: dict[EChannelType, ChannelDataType] = { + EChannelType.ACCEL_LN_X: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_LN_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_LN_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.VBATT: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_WR_X: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_WR_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.ACCEL_WR_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.MAG_REG_X: ChannelDataType(2, signed=True, le=True), + EChannelType.MAG_REG_Y: ChannelDataType(2, signed=True, le=True), + EChannelType.MAG_REG_Z: ChannelDataType(2, signed=True, le=True), + EChannelType.GYRO_X: ChannelDataType(2, signed=True, le=False), + EChannelType.GYRO_Y: ChannelDataType(2, signed=True, le=False), + EChannelType.GYRO_Z: ChannelDataType(2, signed=True, le=False), + EChannelType.EXTERNAL_ADC_A0: ChannelDataType(2, signed=False, le=True), + EChannelType.EXTERNAL_ADC_A1: ChannelDataType(2, signed=False, le=True), + EChannelType.EXTERNAL_ADC_A2: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A3: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A0: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A1: ChannelDataType(2, signed=False, le=True), + EChannelType.INTERNAL_ADC_A2: ChannelDataType(2, signed=False, le=True), + EChannelType.ACCEL_HG_X: None, + EChannelType.ACCEL_HG_Y: None, + EChannelType.ACCEL_HG_Z: None, + EChannelType.MAG_WR_X: None, + EChannelType.MAG_WR_Y: None, + EChannelType.MAG_WR_Z: None, + EChannelType.TEMPERATURE: ChannelDataType(2, signed=False, le=False), + EChannelType.PRESSURE: ChannelDataType(3, signed=False, le=False), + EChannelType.GSR_RAW: ChannelDataType(2, signed=False, le=True), + EChannelType.EXG1_STATUS: ChannelDataType(1, signed=False, le=True), + EChannelType.EXG1_CH1_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG1_CH2_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG2_STATUS: ChannelDataType(1, signed=False, le=True), + EChannelType.EXG2_CH1_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG2_CH2_24BIT: ChannelDataType(3, signed=True, le=False), + EChannelType.EXG1_CH1_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.EXG1_CH2_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.EXG2_CH1_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.EXG2_CH2_16BIT: ChannelDataType(2, signed=True, le=False), + EChannelType.STRAIN_HIGH: ChannelDataType(2, signed=False, le=True), + EChannelType.STRAIN_LOW: ChannelDataType(2, signed=False, le=True), + EChannelType.TIMESTAMP: ChannelDataType(3, signed=False, le=True), + } + + SENSOR_CHANNEL_ASSIGNMENT: dict[ESensorGroup, list[EChannelType]] = { + ESensorGroup.ACCEL_LN: [ + EChannelType.ACCEL_LN_X, + EChannelType.ACCEL_LN_Y, + EChannelType.ACCEL_LN_Z, + ], + ESensorGroup.BATTERY: [EChannelType.VBATT], + ESensorGroup.EXT_CH_A0: [EChannelType.EXTERNAL_ADC_A0], + ESensorGroup.EXT_CH_A1: [EChannelType.EXTERNAL_ADC_A1], + ESensorGroup.EXT_CH_A2: [EChannelType.EXTERNAL_ADC_A2], + ESensorGroup.INT_CH_A0: [EChannelType.INTERNAL_ADC_A0], + ESensorGroup.INT_CH_A1: [EChannelType.INTERNAL_ADC_A1], + ESensorGroup.INT_CH_A2: [EChannelType.INTERNAL_ADC_A2], + ESensorGroup.STRAIN: [EChannelType.STRAIN_HIGH, EChannelType.STRAIN_LOW], + ESensorGroup.INT_CH_A3: [EChannelType.INTERNAL_ADC_A3], + ESensorGroup.GSR: [EChannelType.GSR_RAW], + ESensorGroup.GYRO: [ + EChannelType.GYRO_X, + EChannelType.GYRO_Y, + EChannelType.GYRO_Z, + ], + ESensorGroup.ACCEL_WR: [ + EChannelType.ACCEL_WR_X, + EChannelType.ACCEL_WR_Y, + EChannelType.ACCEL_WR_Z, + ], + ESensorGroup.MAG_REG: [ + EChannelType.MAG_REG_X, + EChannelType.MAG_REG_Y, + EChannelType.MAG_REG_Z, + ], + ESensorGroup.ACCEL_HG: [ + EChannelType.ACCEL_HG_X, + EChannelType.ACCEL_HG_Y, + EChannelType.ACCEL_HG_Z, + ], + ESensorGroup.MAG_WR: [ + EChannelType.MAG_WR_X, + EChannelType.MAG_WR_Y, + EChannelType.MAG_WR_Z, + ], + ESensorGroup.PRESSURE: [EChannelType.TEMPERATURE, EChannelType.PRESSURE], + ESensorGroup.EXG1_24BIT: [ + EChannelType.EXG1_STATUS, + EChannelType.EXG1_CH1_24BIT, + EChannelType.EXG1_CH2_24BIT, + ], + ESensorGroup.EXG1_16BIT: [ + EChannelType.EXG1_STATUS, + EChannelType.EXG1_CH1_16BIT, + EChannelType.EXG1_CH2_16BIT, + ], + ESensorGroup.EXG2_24BIT: [ + EChannelType.EXG2_STATUS, + EChannelType.EXG2_CH1_24BIT, + EChannelType.EXG2_CH2_24BIT, + ], + ESensorGroup.EXG2_16BIT: [ + EChannelType.EXG2_STATUS, + EChannelType.EXG2_CH1_16BIT, + EChannelType.EXG2_CH2_16BIT, + ], + # The MPU9150 Temp sensor is not yet available as a channel in the LogAndStream + # firmware + ESensorGroup.TEMP: [], + } + + SENSOR_BIT_ASSIGNMENT: dict[ESensorGroup, int] = { + ESensorGroup.EXT_CH_A1: 0, + ESensorGroup.EXT_CH_A0: 1, + ESensorGroup.GSR: 2, + ESensorGroup.EXG2_24BIT: 3, + ESensorGroup.EXG1_24BIT: 4, + ESensorGroup.MAG_REG: 5, + ESensorGroup.GYRO: 6, + ESensorGroup.ACCEL_LN: 7, + ESensorGroup.INT_CH_A1: 8, + ESensorGroup.INT_CH_A0: 9, + ESensorGroup.INT_CH_A3: 10, + ESensorGroup.EXT_CH_A2: 11, + ESensorGroup.ACCEL_WR: 12, + ESensorGroup.BATTERY: 13, + # No assignment 14 + ESensorGroup.STRAIN: 15, + # No assignment 16 + ESensorGroup.TEMP: 17, + ESensorGroup.PRESSURE: 18, + ESensorGroup.EXG2_16BIT: 19, + ESensorGroup.EXG1_16BIT: 20, + ESensorGroup.MAG_WR: 21, + ESensorGroup.ACCEL_HG: 22, + ESensorGroup.INT_CH_A2: 23, + } + + SENSOR_ORDER: dict[ESensorGroup, int] = { + ESensorGroup.ACCEL_LN: 1, + ESensorGroup.BATTERY: 2, + ESensorGroup.EXT_CH_A0: 3, + ESensorGroup.EXT_CH_A1: 4, + ESensorGroup.EXT_CH_A2: 5, + ESensorGroup.INT_CH_A0: 6, + ESensorGroup.INT_CH_A1: 7, + ESensorGroup.INT_CH_A2: 8, + ESensorGroup.STRAIN: 9, + ESensorGroup.INT_CH_A3: 10, + ESensorGroup.GSR: 11, + ESensorGroup.GYRO: 12, + ESensorGroup.ACCEL_WR: 13, + ESensorGroup.MAG_REG: 14, + ESensorGroup.ACCEL_HG: 15, + ESensorGroup.MAG_WR: 16, + ESensorGroup.PRESSURE: 17, + ESensorGroup.EXG1_24BIT: 18, + ESensorGroup.EXG1_16BIT: 19, + ESensorGroup.EXG2_24BIT: 20, + ESensorGroup.EXG2_16BIT: 21, + ESensorGroup.TEMP: 22, + } + + def __init__(self): + super().__init__( + self.DEV_CLOCK_RATE, + self.SENSOR_DTYPE, + self.CH_DTYPE_ASSIGNMENT, + self.SENSOR_CHANNEL_ASSIGNMENT, + self.SENSOR_BIT_ASSIGNMENT, + self.SENSOR_ORDER, + ) diff --git a/test/dev/test_device_channels.py b/test/dev/test_device_channels.py index f0bea02..429389c 100644 --- a/test/dev/test_device_channels.py +++ b/test/dev/test_device_channels.py @@ -28,6 +28,8 @@ EChannelType, ESensorGroup, sort_sensors, + sensors2bitfield, + bitfield2sensors, ) @@ -168,6 +170,19 @@ def test_sensor_channel_assignments(self): if sensor not in SensorChannelAssignment: self.fail(f"No channels assigned to sensor type: {sensor}") + def test_sensor_list_to_bitfield(self): + assert sensors2bitfield((ESensorGroup.ACCEL_LN, ESensorGroup.EXT_CH_A1)) == 0x81 + assert sensors2bitfield((ESensorGroup.STRAIN, ESensorGroup.INT_CH_A1)) == 0x8100 + assert sensors2bitfield((ESensorGroup.INT_CH_A2, ESensorGroup.TEMP)) == 0x820000 + + def test_bitfield_to_sensors(self): + assert bitfield2sensors(0x81) == [ESensorGroup.ACCEL_LN, ESensorGroup.EXT_CH_A1] + assert bitfield2sensors(0x8100) == [ESensorGroup.INT_CH_A1, ESensorGroup.STRAIN] + assert bitfield2sensors(0x820000) == [ + ESensorGroup.INT_CH_A2, + ESensorGroup.TEMP, + ] + def test_sensor_bit_assignments_uniqueness(self): for s1 in SensorBitAssignments.keys(): for s2 in SensorBitAssignments.keys():