From 7b6fe903b067b252468185206c8ade63e28338e4 Mon Sep 17 00:00:00 2001 From: Stephen Jefferson Date: Thu, 6 Mar 2025 19:35:24 +0000 Subject: [PATCH 01/11] PoC sensor async loop logging values to console --- smibhid/config.py | 3 + smibhid/lib/config/config_template.py | 3 + smibhid/lib/hid.py | 3 + smibhid/lib/sensors/SGP30.py | 198 ++++++++++++++++++++++++++ smibhid/lib/sensors/__init__.py | 41 ++++++ 5 files changed, 248 insertions(+) create mode 100644 smibhid/lib/sensors/SGP30.py create mode 100644 smibhid/lib/sensors/__init__.py diff --git a/smibhid/config.py b/smibhid/config.py index 741a7e1..0af5e49 100644 --- a/smibhid/config.py +++ b/smibhid/config.py @@ -50,6 +50,9 @@ SCL_PIN = 9 I2C_ID = 0 +## Sensors - Populate driver list with connected sensors from this supported list: ["SGP30"] +SENSORS = ["SGP30"] + ## Displays - Populate driver list with connected displays from this supported list: ["LCD1602"] DISPLAY_DRIVERS = ["LCD1602"] # Scroll speed for text on displays in characters per second diff --git a/smibhid/lib/config/config_template.py b/smibhid/lib/config/config_template.py index 741a7e1..0af5e49 100644 --- a/smibhid/lib/config/config_template.py +++ b/smibhid/lib/config/config_template.py @@ -50,6 +50,9 @@ SCL_PIN = 9 I2C_ID = 0 +## Sensors - Populate driver list with connected sensors from this supported list: ["SGP30"] +SENSORS = ["SGP30"] + ## Displays - Populate driver list with connected displays from this supported list: ["LCD1602"] DISPLAY_DRIVERS = ["LCD1602"] # Scroll speed for text on displays in characters per second diff --git a/smibhid/lib/hid.py b/smibhid/lib/hid.py index b6cc186..2540f3f 100644 --- a/smibhid/lib/hid.py +++ b/smibhid/lib/hid.py @@ -12,6 +12,7 @@ from http.website import WebApp from lib.pinger import Pinger from machine import freq +from lib.sensors import Sensors class HID: @@ -38,6 +39,7 @@ def __init__(self) -> None: self.ui_log = self.moduleConfig.get_ui_log() self.space_state = SpaceState(self.moduleConfig, self) self.pinger = Pinger(self.moduleConfig, self) + self.sensors = Sensors() self.error_handler = ErrorHandler("HID") self.error_handler.configure_display(self.display) self.web_app = WebApp(self.moduleConfig, self) @@ -60,6 +62,7 @@ def startup(self) -> None: self.space_state.startup() if self.reader: self.reader.startup() + self.sensors.startup() self.ui_log.startup() self.web_app.startup() diff --git a/smibhid/lib/sensors/SGP30.py b/smibhid/lib/sensors/SGP30.py new file mode 100644 index 0000000..7e8e847 --- /dev/null +++ b/smibhid/lib/sensors/SGP30.py @@ -0,0 +1,198 @@ +# The MIT License (MIT) +# +# Copyright (c) 2017 ladyada for Adafruit Industries +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +""" +`adafruit_sgp30` +==================================================== + +I2C driver for SGP30 Sensirion VoC sensor + +* Author(s): ladyada, Alexandre Marquet. + +Implementation Notes +-------------------- + +**Hardware:** + +* Adafruit `SGP30 Air Quality Sensor Breakout - VOC and eCO2 + `_ (Product ID: 3709) + +**Software and Dependencies:** + +* MicroPython: + https://github.com/micropython/micropython + +""" +import math +import time +from micropython import const + +__version__ = "0.0.0-auto.0" +__repo__ = "https://github.com/alexmrqt/Adafruit_CircuitPython_SGP30.git" + + +# pylint: disable=bad-whitespace +_SGP30_DEFAULT_I2C_ADDR = const(0x58) +_SGP30_FEATURESET_0 = const(0x0020) +_SGP30_FEATURESET_1 = const(0x0022) + +_SGP30_CRC8_POLYNOMIAL = const(0x31) +_SGP30_CRC8_INIT = const(0xFF) +_SGP30_WORD_LEN = const(2) +# pylint: enable=bad-whitespace + +class SGP30: + """ + A driver for the SGP30 gas sensor. + + :param i2c: The `I2C` object to use. This is the only required parameter. + :param int address: (optional) The I2C address of the device. + """ + + def __init__(self, i2c, address=_SGP30_DEFAULT_I2C_ADDR): + """Initialize the sensor, get the serial # and verify that we found a proper SGP30""" + self._i2c = i2c + self._addr = address + + # get unique serial, its 48 bits so we store in an array + self.serial = self._i2c_read_words_from_cmd([0x36, 0x82], 0.01, 3) + # get featuerset + featureset = self._i2c_read_words_from_cmd([0x20, 0x2f], 0.01, 1) + if featureset[0] not in [ _SGP30_FEATURESET_0, _SGP30_FEATURESET_1 ]: + raise RuntimeError('SGP30 Not detected') + self.iaq_init() + + @property + def tvoc(self): + """Total Volatile Organic Compound in parts per billion.""" + return self.iaq_measure()[1] + + + @property + def baseline_tvoc(self): + """Total Volatile Organic Compound baseline value""" + return self.get_iaq_baseline()[1] + + + @property + def co2eq(self): + """Carbon Dioxide Equivalent in parts per million""" + return self.iaq_measure()[0] + + + @property + def baseline_co2eq(self): + """Carbon Dioxide Equivalent baseline value""" + return self.get_iaq_baseline()[0] + + + def iaq_init(self): + """Initialize the IAQ algorithm""" + # name, command, signals, delay + self._run_profile(["iaq_init", [0x20, 0x03], 0, 0.01]) + + def iaq_measure(self): + """Measure the CO2eq and TVOC""" + # name, command, signals, delay + return self._run_profile(["iaq_measure", [0x20, 0x08], 2, 0.05]) + + def get_iaq_baseline(self): + """Retreive the IAQ algorithm baseline for CO2eq and TVOC""" + # name, command, signals, delay + return self._run_profile(["iaq_get_baseline", [0x20, 0x15], 2, 0.01]) + + def set_iaq_baseline(self, co2eq, tvoc): + """Set the previously recorded IAQ algorithm baseline for CO2eq and TVOC""" + if co2eq == 0 and tvoc == 0: + raise RuntimeError('Invalid baseline') + buffer = [] + for value in [tvoc, co2eq]: + arr = [value >> 8, value & 0xFF] + arr.append(self._generate_crc(arr)) + buffer += arr + self._run_profile(["iaq_set_baseline", [0x20, 0x1e] + buffer, 0, 0.01]) + + def set_iaq_humidity(self, gramsPM3): + """Set the humidity in g/m3 for eCO2 and TVOC compensation algorithm""" + tmp = int(gramsPM3 * 256) + buffer = [] + for value in [tmp]: + arr = [value >> 8, value & 0xFF] + arr.append(self._generate_crc(arr)) + buffer += arr + self._run_profile(["iaq_set_humidity", [0x20, 0x61] + buffer, 0, 0.01]) + + def set_iaq_rel_humidity(self, rh, temp): + """Set the relative humidity in % for eCO2 and TVOC compensation algorithm""" + # Formula from "Generic SGP Driver Integration for Software I2C" + gramsPM3 = rh/100.0 * 6.112 * math.exp(17.62*temp/(243.12+temp)) + gramsPM3 *= 216.7 / (273.15 + temp) + + self.set_iaq_humidity(gramsPM3) + + # Low level command functions + + def _run_profile(self, profile): + """Run an SGP 'profile' which is a named command set""" + # pylint: disable=unused-variable + name, command, signals, delay = profile + # pylint: enable=unused-variable + + #print("\trunning profile: %s, command %s, %d, delay %0.02f" % + # (name, ["0x%02x" % i for i in command], signals, delay)) + return self._i2c_read_words_from_cmd(command, delay, signals) + + + def _i2c_read_words_from_cmd(self, command, delay, reply_size): + """Run an SGP command query, get a reply and CRC results if necessary""" + self._i2c.writeto(self._addr, bytes(command)) + time.sleep(delay) + if not reply_size: + return None + crc_result = bytearray(reply_size * (_SGP30_WORD_LEN +1)) + self._i2c.readfrom_into(self._addr, crc_result) + #print("\tRaw Read: ", crc_result) + result = [] + for i in range(reply_size): + word = [crc_result[3*i], crc_result[3*i+1]] + crc = crc_result[3*i+2] + if self._generate_crc(word) != crc: + raise RuntimeError('CRC Error') + result.append(word[0] << 8 | word[1]) + #print("\tOK Data: ", [hex(i) for i in result]) + return result + + # pylint: disable=no-self-use + def _generate_crc(self, data): + """8-bit CRC algorithm for checking data""" + crc = _SGP30_CRC8_INIT + # calculates 8-Bit checksum with given polynomial + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ _SGP30_CRC8_POLYNOMIAL + else: + crc <<= 1 + return crc & 0xFF + + def get_reading(self): + return self.iaq_measure() \ No newline at end of file diff --git a/smibhid/lib/sensors/__init__.py b/smibhid/lib/sensors/__init__.py new file mode 100644 index 0000000..8ce5710 --- /dev/null +++ b/smibhid/lib/sensors/__init__.py @@ -0,0 +1,41 @@ +from asyncio import create_task, sleep +from machine import I2C +from config import SENSORS, SDA_PIN, SCL_PIN, I2C_ID +from lib.ulogging import uLogger +from lib.sensors.SGP30 import SGP30 + +class Sensors: + def __init__(self): + self.log = uLogger("Sensors") + self.i2c = I2C(I2C_ID, sda = SDA_PIN, scl = SCL_PIN, freq = 400000) # TODO: load from HID for here and display + self.SENSORS = SENSORS + self.available_sensors = {} + self.available_sensors["SGP30"] = SGP30(self.i2c) + self.configured_sensors = {} + self._load_sensors() + + def _load_sensors(self): + self.log.info(f"Attempting to locate drivers for: {self.SENSORS}") + for sensor in self.SENSORS: + if sensor in self.available_sensors: + self.log.info(f"Found driver for {sensor}") + self.configured_sensors[sensor] = self.available_sensors[sensor] + else: + self.log.error(f"Driver not found for {sensor}") + + def startup(self): + self.log.info(f"Starting sensors: {self.configured_sensors}") + create_task(self._poll_sensors()) + + async def _poll_sensors(self): + while True: + readings = self.get_readings() + for reading in readings: + self.log.info(f"Sensor: {reading}: {readings[reading]}") + await sleep(5) + + def get_readings(self): + readings = {} + for name, instance in self.configured_sensors.items(): + readings[name] = instance.get_reading() + return readings From cb1310ad788291d3840c9df3942c5a08c8b67088 Mon Sep 17 00:00:00 2001 From: Stephen Jefferson Date: Thu, 6 Mar 2025 21:24:20 +0000 Subject: [PATCH 02/11] Refactored to allow APi calls to query available modules and sensors and their readings. --- smibhid/config.py | 4 +- smibhid/lib/config/config_template.py | 4 +- smibhid/lib/sensors/SGP30.py | 16 ++++-- smibhid/lib/sensors/__init__.py | 70 +++++++++++++++++++-------- smibhid/lib/sensors/sensor_module.py | 21 ++++++++ 5 files changed, 87 insertions(+), 28 deletions(-) create mode 100644 smibhid/lib/sensors/sensor_module.py diff --git a/smibhid/config.py b/smibhid/config.py index 0af5e49..28b9ce3 100644 --- a/smibhid/config.py +++ b/smibhid/config.py @@ -50,8 +50,8 @@ SCL_PIN = 9 I2C_ID = 0 -## Sensors - Populate driver list with connected sensors from this supported list: ["SGP30"] -SENSORS = ["SGP30"] +## Sensors - Populate driver list with connected sensor modules from this supported list: ["SGP30"] +SENSOR_MODULES = ["SGP30"] ## Displays - Populate driver list with connected displays from this supported list: ["LCD1602"] DISPLAY_DRIVERS = ["LCD1602"] diff --git a/smibhid/lib/config/config_template.py b/smibhid/lib/config/config_template.py index 0af5e49..28b9ce3 100644 --- a/smibhid/lib/config/config_template.py +++ b/smibhid/lib/config/config_template.py @@ -50,8 +50,8 @@ SCL_PIN = 9 I2C_ID = 0 -## Sensors - Populate driver list with connected sensors from this supported list: ["SGP30"] -SENSORS = ["SGP30"] +## Sensors - Populate driver list with connected sensor modules from this supported list: ["SGP30"] +SENSOR_MODULES = ["SGP30"] ## Displays - Populate driver list with connected displays from this supported list: ["LCD1602"] DISPLAY_DRIVERS = ["LCD1602"] diff --git a/smibhid/lib/sensors/SGP30.py b/smibhid/lib/sensors/SGP30.py index 7e8e847..a3deba1 100644 --- a/smibhid/lib/sensors/SGP30.py +++ b/smibhid/lib/sensors/SGP30.py @@ -44,6 +44,8 @@ import math import time from micropython import const +from lib.ulogging import uLogger +from lib.sensors.sensor_module import SensorModule __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/alexmrqt/Adafruit_CircuitPython_SGP30.git" @@ -59,7 +61,7 @@ _SGP30_WORD_LEN = const(2) # pylint: enable=bad-whitespace -class SGP30: +class SGP30(SensorModule): """ A driver for the SGP30 gas sensor. @@ -69,6 +71,9 @@ class SGP30: def __init__(self, i2c, address=_SGP30_DEFAULT_I2C_ADDR): """Initialize the sensor, get the serial # and verify that we found a proper SGP30""" + super().__init__(["co2eq", "tvoc"]) + self.log = uLogger("SGP30") + self.log.info("SGP30 sensor module loaded") self._i2c = i2c self._addr = address @@ -193,6 +198,11 @@ def _generate_crc(self, data): else: crc <<= 1 return crc & 0xFF - + def get_reading(self): - return self.iaq_measure() \ No newline at end of file + reading = self.iaq_measure() + data = { + "co2eq": reading[0], + "tvoc": reading[1] + } + return data \ No newline at end of file diff --git a/smibhid/lib/sensors/__init__.py b/smibhid/lib/sensors/__init__.py index 8ce5710..330323c 100644 --- a/smibhid/lib/sensors/__init__.py +++ b/smibhid/lib/sensors/__init__.py @@ -1,41 +1,69 @@ from asyncio import create_task, sleep from machine import I2C -from config import SENSORS, SDA_PIN, SCL_PIN, I2C_ID +from config import SENSOR_MODULES, SDA_PIN, SCL_PIN, I2C_ID from lib.ulogging import uLogger from lib.sensors.SGP30 import SGP30 +from lib.sensors.sensor_module import SensorModule class Sensors: - def __init__(self): + def __init__(self) -> None: self.log = uLogger("Sensors") self.i2c = I2C(I2C_ID, sda = SDA_PIN, scl = SCL_PIN, freq = 400000) # TODO: load from HID for here and display - self.SENSORS = SENSORS - self.available_sensors = {} - self.available_sensors["SGP30"] = SGP30(self.i2c) - self.configured_sensors = {} - self._load_sensors() + self.SENSOR_MODULES = SENSOR_MODULES + self.available_modules: dict[str, SensorModule] = {} + self.available_modules["SGP30"] = SGP30(self.i2c) + self.configured_modules: dict[str, SensorModule] = {} + self._load_modules() - def _load_sensors(self): - self.log.info(f"Attempting to locate drivers for: {self.SENSORS}") - for sensor in self.SENSORS: - if sensor in self.available_sensors: - self.log.info(f"Found driver for {sensor}") - self.configured_sensors[sensor] = self.available_sensors[sensor] + def _load_modules(self) -> None: + self.log.info(f"Attempting to locate drivers for: {self.SENSOR_MODULES}") + for sensor_module in self.SENSOR_MODULES: + if sensor_module in self.available_modules: + self.log.info(f"Found driver for {sensor_module}") + self.configured_modules[sensor_module] = self.available_modules[sensor_module] + self.log.info(f"Loaded {sensor_module} sensor module") + self.log.info(f"Available sensors: {self.configured_modules[sensor_module].get_sensors()}") else: - self.log.error(f"Driver not found for {sensor}") + self.log.error(f"Driver not found for {sensor_module}") - def startup(self): - self.log.info(f"Starting sensors: {self.configured_sensors}") + self.log.info(f"Configured modules: {self.get_modules()}") + + def startup(self) -> None: + self.log.info(f"Starting sensors: {self.configured_modules}") create_task(self._poll_sensors()) - async def _poll_sensors(self): + async def _poll_sensors(self) -> None: + """ + Asynchronously poll sensors and log readings every X seconds as per + config. + """ while True: readings = self.get_readings() for reading in readings: - self.log.info(f"Sensor: {reading}: {readings[reading]}") + self.log.info(f"Module: {reading}: {readings[reading]}") await sleep(5) - def get_readings(self): + def get_modules(self) -> list: + """ + Return a dictionary of configured sensor modules. + """ + return list(self.configured_modules.keys()) + + def get_sensors(self, module: SensorModule) -> list: + """ + Return list of sensors for a specific module. + """ + return module.get_sensors() + + def get_readings(self, module: str = "") -> dict: + """ + Return readings from a specific module by passing it's name as a + string, or all modules if none specified. + """ readings = {} - for name, instance in self.configured_sensors.items(): - readings[name] = instance.get_reading() + if module: + readings[module] = self.configured_modules[module].get_reading() + else: + for name, instance in self.configured_modules.items(): + readings[name] = instance.get_reading() return readings diff --git a/smibhid/lib/sensors/sensor_module.py b/smibhid/lib/sensors/sensor_module.py new file mode 100644 index 0000000..a1df076 --- /dev/null +++ b/smibhid/lib/sensors/sensor_module.py @@ -0,0 +1,21 @@ +from lib.ulogging import uLogger + +class SensorModule: + """ + Base class for sensor modules. + """ + def __init__(self, sensors: list) -> None: + self.log = uLogger("SensorModule") + self.sensors = sensors + + def get_sensors(self) -> list: + """ + Return list of sensors for a specific module. + """ + return self.sensors + + def get_reading(self) -> dict: + """ + Return a dictionary of sensor name and value pairs + """ + raise NotImplementedError("Subclasses must implement this method") \ No newline at end of file From f5b8f491dd79527e768edf0ce9368223ee30a74a Mon Sep 17 00:00:00 2001 From: Stephen Jefferson Date: Thu, 6 Mar 2025 22:37:44 +0000 Subject: [PATCH 03/11] API endpoints return sensor modules data. --- smibhid/http/website.py | 29 +++++++++++++++++++++++++++++ smibhid/http/www/api.html | 4 ++++ smibhid/lib/hid.py | 3 ++- smibhid/lib/module_config.py | 11 ++++++++++- smibhid/lib/sensors/__init__.py | 9 +++++---- 5 files changed, 50 insertions(+), 6 deletions(-) diff --git a/smibhid/http/website.py b/smibhid/http/website.py index 32c9f93..7b8f83c 100644 --- a/smibhid/http/website.py +++ b/smibhid/http/website.py @@ -20,6 +20,7 @@ def __init__(self, module_config: ModuleConfig, hid: object) -> None: self.hid = hid self.wifi = module_config.get_wifi() self.display = module_config.get_display() + self.sensors = module_config.get_sensors() self.updater = Updater() self.port = 80 self.running = False @@ -69,6 +70,10 @@ async def api(request, response): self.app.add_resource(Version, '/api/version', hid = self.hid, logger = self.log) self.app.add_resource(FirmwareFiles, '/api/firmware_files', updater = self.updater, logger = self.log) self.app.add_resource(Reset, '/api/reset', updater = self.updater, logger = self.log) + self.app.add_resource(Modules, '/api/sensors/modules', sensors = self.sensors, logger = self.log) + self.app.add_resource(Sensors, '/api/sensors/', sensors = self.sensors, logger = self.log) + self.app.add_resource(Readings, '/api/sensors/readings/', sensors = self.sensors, logger = self.log) + self.app.add_resource(Readings, '/api/sensors/readings', module = "", sensors = self.sensors, logger = self.log) class WLANMAC(): @@ -113,3 +118,27 @@ def post(self, data, updater: Updater, logger: uLogger) -> None: logger.info("API request - reset") updater.reset() return + +class Modules(): + + def get(self, data, sensors, logger: uLogger) -> str: + logger.info("API request - sensors/modules") + html = dumps(sensors.get_modules()) + logger.info(f"Return value: {html}") + return html + +class Sensors(): + + def get(self, data, module: str, sensors, logger: uLogger) -> str: + logger.info(f"API request - sensors/{module}") + html = dumps(sensors.get_sensors(module)) + logger.info(f"Return value: {html}") + return html + +class Readings(): + + def get(self, data, module: str, sensors, logger: uLogger) -> str: + logger.info(f"API request - sensors/readings - Module: {module}") + html = dumps(sensors.get_readings(module)) + logger.info(f"Return value: {html}") + return html \ No newline at end of file diff --git a/smibhid/http/www/api.html b/smibhid/http/www/api.html index 6cae4b7..41795ac 100644 --- a/smibhid/http/www/api.html +++ b/smibhid/http/www/api.html @@ -10,6 +10,10 @@

Endpoints