Skip to content

Add bulk read temperature method #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,33 @@ for sensor in W1ThermSensor.get_available_sensors([Sensor.DS18B20]):
print("Sensor %s has temperature %.2f" % (sensor.id, sensor.get_temperature()))
```

### Multiple sensors using bulk conversion

Using bulk conversion method returns a list of available sensors in a bus, with property `temperature`. By default, uses temperature in Celsius, if `Unit` parameter is not specified. Otherwise, use `w1thermsensor.Unit` to convert to your desired unit.

This method speeds up the process of reading all sensors in the bus to the single sensor conversion time.

E.g. Having 3 sensors in the bus with a 12bit resolution will result in `~750ms * 3 sensors = ~2,3s` using traditional method above. With the bulk conversion it will last around `~750ms`

*Note: Bulk conversion requires root privileges or permissions change to `w1_master*` path*

*Method do not support sensor calibration or offset configuration!*

`*w1_master` path is:
- (symbolic link) - `/sys/bus/w1/devices/w1_bus_master1/therm_bulk_read`
- (destination link) - `/sys/devices/w1_bus_master1/therm_bulk_read`

If you don't want to run your script as a root - do `chmod +w` or `chown` to a user executing your python script to the one of the paths above.

```python
from w1thermsensor import W1ThermSensor, Unit

for sensor in W1ThermSensor.get_bulk_temperature():
# W1ThermSensor.bulk_read(Unit.KELVIN)
# W1ThermSensor.bulk_read(Unit.DEGREES_F)
print(f"Sensor: {sensor.id}, temp: {sensor.temperature}")
```

### Set sensor resolution

Some w1 therm sensors support changing the resolution for the temperature reads.
Expand Down
80 changes: 79 additions & 1 deletion src/w1thermsensor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from w1thermsensor.calibration_data import CalibrationData
from w1thermsensor.errors import (
BulkConversionStatusError,
BulkTemperatureDataError,
InvalidCalibrationDataError,
NoSensorFoundError,
ResetValueError,
Expand Down Expand Up @@ -54,6 +56,10 @@ class W1ThermSensor:

>>> sensor.get_temperature(Unit.DEGREES_F)

Get all sensors temperature

>>> sensor.get_bulk_temperature()

Supported sensors are:
* DS18S20
* DS1822
Expand All @@ -71,6 +77,7 @@ class W1ThermSensor:
#: Holds information about the location of the needed
# sensor devices on the system provided by the kernel modules
BASE_DIRECTORY = Path("/sys/bus/w1/devices")
MASTER_DIRECTORY_BULK = Path(f"{BASE_DIRECTORY}/w1_bus_master1/therm_bulk_read")
SLAVE_FILE = "w1_slave"

#: Holds the sensor reset value in Degrees Celsius
Expand Down Expand Up @@ -114,6 +121,75 @@ def is_sensor(dir_name):
if is_sensor(s.name)
]

@classmethod
def get_bulk_temperature(cls, unit: Unit = Unit.DEGREES_C) -> List["W1ThermSensor"]:
"""Returns a list of available sensors with property `'temperature'` to access its read value in requested unit.
If unit is not specified it will use Celsius by default.

Result temperature is then accessed by reading the temperature entry of each device, which may return empty if conversion is still in progress

:param unit: the unit of the temperature requested
:return: a list of sensor instances with its temperatures
:rtype: List["W1ThermSensor"]

:raises BulkConversionStatusError: if after the trigger do not indicate ready state or after reading do not indicate successful finish
:raises BulkTemperatureDataError: if the sensor return empty temperature data
:raises UnsupportedUnitError: if the unit is not supported
"""

bulk_status = cls._bulk_trigger()
if bulk_status != 1:
raise BulkConversionStatusError(
bulk_status, "Sensors should be waiting for reading(1)!"
)

sensors = cls.get_available_sensors()
for s in sensors:
temp_path = s.BASE_DIRECTORY / (s.slave_prefix + s.id + "/temperature")
with open(temp_path, "r") as f:
temp = f.read().strip()
if not temp:
raise BulkTemperatureDataError(s.id)
unit_conversion = Unit.get_conversion_function(Unit.DEGREES_C, unit)
s.temperature = unit_conversion(float(temp) / 1000)
else:
bulk_status = cls._get_bulk_trigger_status()
if bulk_status != 0:
raise BulkConversionStatusError(
bulk_status, 'Sensors read finished. Status should be "0"!'
)
return sensors

@classmethod
def _bulk_trigger(cls) -> int:
"""Initiates sensors' bulk conversion writing 'trigger' to a w1-master.

https://docs.kernel.org/w1/slaves/w1_therm.html

A bulk read of all devices on the bus could be done writing trigger to therm_bulk_read entry at w1_bus_master level

:returns: the status after triggering bulk conversion
:rtype: int
"""
with cls.MASTER_DIRECTORY_BULK.open("w") as f:
f.write("trigger\n")
return cls._get_bulk_trigger_status()

@classmethod
def _get_bulk_trigger_status(cls) -> int:
"""Check bulk trigger current status.

Reading therm_bulk_read will return
0 if no bulk conversion pending,
-1 if at least one sensor still in conversion,
1 if conversion is complete but at least one sensor value has not been read yet.

:returns: bulk conversion status
:rtype: int
"""
with cls.MASTER_DIRECTORY_BULK.open("r") as f:
return int(f.read().strip())

def __init__(
self,
sensor_type: Optional[Sensor] = None,
Expand Down Expand Up @@ -199,6 +275,7 @@ def _init_with_first_sensor_by_id(self, sensor_id: str) -> None:
def _init_with_type_and_id(self, sensor_type: Sensor, sensor_id: str) -> None:
self.type = sensor_type
self.id = sensor_id
self.temperature = None

def __repr__(self) -> str: # pragma: no cover
"""Returns a string that eval can turn back into this object
Expand Down Expand Up @@ -275,7 +352,7 @@ def get_temperature(self, unit: Unit = Unit.DEGREES_C) -> float:
:raises ResetValueError: if the sensor has still the initial value and no measurement
"""
raw_temperature_line = self.get_raw_sensor_strings()[1]
return evaluate_temperature(
self.temperature = evaluate_temperature(
raw_temperature_line,
self.RAW_VALUE_TO_DEGREE_CELSIUS_FACTOR,
unit,
Expand All @@ -284,6 +361,7 @@ def get_temperature(self, unit: Unit = Unit.DEGREES_C) -> float:
self.offset,
self.SENSOR_RESET_VALUE,
)
return self.temperature

def get_corrected_temperature(self, unit: Unit = Unit.DEGREES_C) -> float:
"""Returns the temperature in the specified unit, corrected based on the calibration data
Expand Down
18 changes: 18 additions & 0 deletions src/w1thermsensor/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,21 @@ def __init__(self, message, calibration_data):
super().__init__(
"Calibration data {} is invalid: {}.".format(message, calibration_data)
)


class BulkConversionStatusError(W1ThermSensorError):
"""Exception when the conversion status for a bulk action fails"""

def __init__(self, status, message):
super().__init__(
f"Bulk conversion status indicates failure. Status: {status}\nMessage: {message}"
)


class BulkTemperatureDataError(W1ThermSensorError):
"""Exception when the bulk conversion temperatures returns empty data"""

def __init__(self, sensor_id):
super().__init__(
f"Bulk conversion temperature did NOT return data. Sensor ID: {sensor_id}"
)