|
1 | | -from collections.abc import Iterable |
2 | | -from pyshimmer.dev.channels import EChannelType, ChannelDataType, ESensorGroup |
3 | | - |
| 1 | +import operator |
4 | 2 | from abc import ABC, abstractmethod |
| 3 | +from collections.abc import Iterable |
| 4 | +from functools import reduce |
5 | 5 | from typing import overload |
| 6 | + |
6 | 7 | import numpy as np |
7 | 8 |
|
| 9 | +from pyshimmer.dev.channels import EChannelType, ChannelDataType, ESensorGroup |
| 10 | +from pyshimmer.util import bit_is_set, flatten_list |
| 11 | + |
8 | 12 |
|
9 | 13 | class HardwareRevision(ABC): |
10 | 14 |
|
@@ -101,6 +105,11 @@ def get_enabled_channels( |
101 | 105 | """ |
102 | 106 | pass |
103 | 107 |
|
| 108 | + @property |
| 109 | + @abstractmethod |
| 110 | + def sensorlist_size(self) -> int: |
| 111 | + pass |
| 112 | + |
104 | 113 | @abstractmethod |
105 | 114 | def sensors2bitfield(self, sensors: Iterable[ESensorGroup]) -> int: |
106 | 115 | """Convert an iterable of sensors into the corresponding bitfield transmitted to |
@@ -151,3 +160,91 @@ def sort_sensors(self, sensors: Iterable[ESensorGroup]) -> list[ESensorGroup]: |
151 | 160 | appearance order in the data file |
152 | 161 | """ |
153 | 162 | pass |
| 163 | + |
| 164 | + |
| 165 | +class BaseRevision(HardwareRevision): |
| 166 | + |
| 167 | + def __init__( |
| 168 | + self, |
| 169 | + dev_clock_rate: float, |
| 170 | + sensor_list_dtype: ChannelDataType, |
| 171 | + channel_data_types: dict[EChannelType, ChannelDataType], |
| 172 | + sensor_channel_assignment: dict[ESensorGroup, list[EChannelType]], |
| 173 | + sensor_bit_assignment: dict[ESensorGroup, int], |
| 174 | + sensor_order: dict[ESensorGroup, int], |
| 175 | + ): |
| 176 | + self._dev_clock_rate = dev_clock_rate |
| 177 | + self._sensor_list_dtype = sensor_list_dtype |
| 178 | + self._channel_data_types = channel_data_types |
| 179 | + self._sensor_channel_assignment = sensor_channel_assignment |
| 180 | + self._sensor_bit_assignment = sensor_bit_assignment |
| 181 | + self._sensor_order = sensor_order |
| 182 | + |
| 183 | + def sr2dr(self, sr: float) -> int: |
| 184 | + dr_dec = self._dev_clock_rate / sr |
| 185 | + return round(dr_dec) |
| 186 | + |
| 187 | + def dr2sr(self, dr: int) -> float: |
| 188 | + return self._dev_clock_rate / dr |
| 189 | + |
| 190 | + @overload |
| 191 | + def sec2ticks(self, t_sec: float) -> int: ... |
| 192 | + |
| 193 | + @overload |
| 194 | + def sec2ticks(self, t_sec: np.ndarray) -> np.ndarray: ... |
| 195 | + |
| 196 | + def sec2ticks(self, t_sec: float | np.ndarray) -> int | np.ndarray: |
| 197 | + return round(t_sec * self._dev_clock_rate) |
| 198 | + |
| 199 | + @overload |
| 200 | + def ticks2sec(self, t_ticks: int) -> float: ... |
| 201 | + |
| 202 | + @overload |
| 203 | + def ticks2sec(self, t_ticks: np.ndarray) -> np.ndarray: ... |
| 204 | + |
| 205 | + def ticks2sec(self, t_ticks: int | np.ndarray) -> float | np.ndarray: |
| 206 | + return t_ticks / self._dev_clock_rate |
| 207 | + |
| 208 | + def get_channel_dtypes( |
| 209 | + self, channels: Iterable[EChannelType] |
| 210 | + ) -> list[ChannelDataType]: |
| 211 | + dtypes = [self._channel_data_types[ch] for ch in channels] |
| 212 | + return dtypes |
| 213 | + |
| 214 | + def get_enabled_channels( |
| 215 | + self, sensors: Iterable[ESensorGroup] |
| 216 | + ) -> list[EChannelType]: |
| 217 | + channels = [self._sensor_channel_assignment[e] for e in sensors] |
| 218 | + return flatten_list(channels) |
| 219 | + |
| 220 | + @property |
| 221 | + def sensorlist_size(self) -> int: |
| 222 | + return self._sensor_list_dtype.size |
| 223 | + |
| 224 | + def sensors2bitfield(self, sensors: Iterable[ESensorGroup]) -> int: |
| 225 | + bit_values = [1 << self._sensor_bit_assignment[g] for g in sensors] |
| 226 | + return reduce(operator.or_, bit_values) |
| 227 | + |
| 228 | + def bitfield2sensors(self, bitfield: int) -> list[ESensorGroup]: |
| 229 | + enabled_sensors = [] |
| 230 | + for sensor in ESensorGroup: |
| 231 | + bit_mask = 1 << self._sensor_bit_assignment[sensor] |
| 232 | + if bit_is_set(bitfield, bit_mask): |
| 233 | + enabled_sensors += [sensor] |
| 234 | + |
| 235 | + return self.sort_sensors(enabled_sensors) |
| 236 | + |
| 237 | + def serialize_sensorlist(self, sensors: Iterable[ESensorGroup]) -> bytes: |
| 238 | + bitfield = self.sensors2bitfield(sensors) |
| 239 | + return self._sensor_list_dtype.encode(bitfield) |
| 240 | + |
| 241 | + def deserialize_sensorlist(self, bitfield_bin: bytes) -> list[ESensorGroup]: |
| 242 | + bitfield = self._sensor_list_dtype.decode(bitfield_bin) |
| 243 | + return self.bitfield2sensors(bitfield) |
| 244 | + |
| 245 | + def sort_sensors(self, sensors: Iterable[ESensorGroup]) -> list[ESensorGroup]: |
| 246 | + def sort_key_fn(x): |
| 247 | + return self._sensor_order[x] |
| 248 | + |
| 249 | + sensors_sorted = sorted(sensors, key=sort_key_fn) |
| 250 | + return sensors_sorted |
0 commit comments