diff --git a/smibhid/README.md b/smibhid/README.md index a1e9f57..b923c7d 100644 --- a/smibhid/README.md +++ b/smibhid/README.md @@ -26,13 +26,18 @@ Press the space_open or space_closed buttons to call the smib server endpoint ap - API page that details API endpoints available and their usage - Update page for performing over the air firmware updates and remote reset to apply them - Pinger watchdog - Optionally ping an IP address and toggle a GPIO pin on ping failure. Useful for network device monitoring and reset. +- Extensible sensor module framework for async polling of I2C sensors (currently only writes to log out) and presentation of sensors and readings on the web API + - Supported sensors + - SGP30 (Equivalent CO2 and VOC) + - BME280 + - SCD30 ## Circuit diagram ### Pico W Connections ![Circuit diagram](images/SMIBHID%20circuit%20diagram.drawio.png) -### Pico W pinout -![Pico W pinout](images/pico_w_pinout.png) +### Pico 2 W pinout +![Pico 2 W pinout](images/pico_2_w_pinout.png) ### Example breadboard build ![Breadboard photo](images/breadboard.jpg) @@ -41,20 +46,27 @@ Press the space_open or space_closed buttons to call the smib server endpoint ap ## Hardware -Below is a list of hardware ad links for my specific build: +Below is a list of hardware and links for my specific build: - [Raspberry Pi Pico W](https://thepihut.com/products/raspberry-pi-pico-w?variant=41952994754755) - [Prototype board](https://thepihut.com/products/pico-proto-pcb?variant=41359085568195) - [LED push button switch - Red](https://thepihut.com/products/rugged-metal-pushbutton-with-red-led-ring?variant=27740444561) - [LED push button switch - Green](https://thepihut.com/products/rugged-metal-pushbutton-with-green-led-ring?variant=27740444625) - [JST connectors](https://www.amazon.co.uk/dp/B07449V33P) - [2x16 Character I2C display](https://thepihut.com/products/lcd1602-i2c-module?variant=42422810083523) +- [SGP30 I2C sensor](https://thepihut.com/products/sgp30-air-quality-sensor-breakout) +- [BME280 sensor](https://thepihut.com/products/bme280-breakout-temperature-pressure-humidity-sensor) +- [SCD30 sensor](https://thepihut.com/products/adafruit-scd-30-ndir-co2-temperature-and-humidity-sensor) ## Deployment -Copy the files from the smibhib folder into the root of a Pico W running Micropython (minimum Pico W Micropython firmware v1.22.2 https://micropython.org/download/RPI_PICO_W/) and update values in config.py as necessary +Copy the files from the smibhib folder into the root of a Pico 2 W running Micropython (minimum Pico 2 W Micropython firmware v1.25.0-preview.365 https://micropython.org/download/RPI_PICO2_W/) and update values in config.py as necessary. + +This project should work on a Pico W on recent firmware, but we have moved development, testing and our production SMIBHIDs to Pico 2 Ws. ### Configuration - Ensure the pins for the space open/closed LEDs and buttons are correctly specified for your wiring -- Configure I2C pins for the display if using, display will detect automatically or disable if not found +- Configure I2C pins for the display and sensors if using, display will detect automatically or disable if not found +- Populate the display list with displays in use (must have appropriate driver module) +- Populate the sensors list with sensors in use (must have appropriate driver module) - Populate Wifi SSID and password - Configure the webserver hostname/IP and port as per your smib.webserver configuration - Set the space state poll frequency in seconds (>= 5), set to 0 to disable the state poll @@ -105,6 +117,16 @@ Use existing space state buttons, lights, slack API wrapper and watchers as an e - Ensure the driver registers itself with the driver registry, use LCD1602 as an example - Import the new driver module in display.py - Update the config.py file to include the option for your new driver +- I2C Sensor boards can be added by providing a driver module that extends the SensorModule base class + - Copy an appropriate python driver module into the sensors sub folder + - Ensure the init method takes one mandatory parameter for the I2C interface + - Modify the driver module to extend SensorModule + - Provide a list of sensor names on this module to class super init + - Ensure the init method raises an error if device not found or has any configuration error to be caught by the sensors module driver load method + - Overload the get_reading() method to return a dictionary of sensor name - reading value pairs + - Update the config.py file to include the option for your new driver + - Add the module import to sensors.\_\_init\_\_.py + - Copy and adjust appropriately the try except block in sensors.\_\_init\_\_.load_modules method - UIState machine - A state machine exists and can be extended by various modules such as space_state to manage the state of the buttons and display output - The current state instance is held in hid.ui_state_instance diff --git a/smibhid/config.py b/smibhid/config.py index 741a7e1..a878af5 100644 --- a/smibhid/config.py +++ b/smibhid/config.py @@ -49,6 +49,10 @@ SDA_PIN = 8 SCL_PIN = 9 I2C_ID = 0 +I2C_FREQ = 400000 + +## Sensors - Populate driver list with connected sensor modules from this supported list: ["SGP30", "BME280", "SCD30"] +SENSOR_MODULES = [] ## Displays - Populate driver list with connected displays from this supported list: ["LCD1602"] DISPLAY_DRIVERS = ["LCD1602"] diff --git a/smibhid/http/website.py b/smibhid/http/website.py index 32c9f93..f3a36f1 100644 --- a/smibhid/http/website.py +++ b/smibhid/http/website.py @@ -3,7 +3,7 @@ from lib.module_config import ModuleConfig from json import dumps import uasyncio -from lib.updater import Updater +from lib.updater import UpdateCore class WebApp: @@ -20,7 +20,8 @@ def __init__(self, module_config: ModuleConfig, hid: object) -> None: self.hid = hid self.wifi = module_config.get_wifi() self.display = module_config.get_display() - self.updater = Updater() + self.sensors = module_config.get_sensors() + self.update_core = UpdateCore() self.port = 80 self.running = False self.create_style_css() @@ -32,7 +33,7 @@ def __init__(self, module_config: ModuleConfig, hid: object) -> None: def startup(self): network_access = uasyncio.run(self.wifi.check_network_access()) - if network_access == True: + if network_access: self.log.info("Starting web server") self.app.run(host='0.0.0.0', port=self.port, loop_forever=False) self.log.info(f"Web server started: {self.wifi.get_ip()}:{self.port}") @@ -67,8 +68,14 @@ async def api(request, response): self.app.add_resource(WLANMAC, '/api/wlan/mac', wifi = self.wifi, logger = self.log) self.app.add_resource(Version, '/api/version', hid = self.hid, logger = self.log) - self.app.add_resource(FirmwareFiles, '/api/firmware_files', updater = self.updater, logger = self.log) - self.app.add_resource(Reset, '/api/reset', updater = self.updater, logger = self.log) + + self.app.add_resource(FirmwareFiles, '/api/firmware_files', update_core = self.update_core, logger = self.log) + self.app.add_resource(Reset, '/api/reset', update_core = self.update_core, logger = self.log) + + self.app.add_resource(Modules, '/api/sensors/modules', sensors = self.sensors, logger = self.log) + self.app.add_resource(Sensors, '/api/sensors/modules/', sensors = self.sensors, logger = self.log) + #self.app.add_resource(Readings, '/api/sensors/modules//readings/latest', sensors = self.sensors, logger = self.log) #TODO: Fix tinyweb to allow for multiple parameters https://github.com/belyalov/tinyweb/pull/51 + self.app.add_resource(Readings, '/api/sensors/readings/latest', module = "", sensors = self.sensors, logger = self.log) class WLANMAC(): @@ -88,28 +95,54 @@ def get(self, data, hid, logger: uLogger) -> str: class FirmwareFiles(): - def get(self, data, updater: Updater, logger: uLogger) -> str: + def get(self, data, update_core: UpdateCore, logger: uLogger) -> str: logger.info("API request - GET Firmware files") - html = dumps(updater.process_update_file()) + html = dumps(update_core.process_update_file()) logger.info(f"Return value: {html}") return html - def post(self, data, updater: Updater, logger: uLogger) -> str: + def post(self, data, update_core: UpdateCore, logger: uLogger) -> str: logger.info("API request - POST Firmware files") logger.info(f"Data: {data}") if data["action"] == "add": logger.info("Adding update - data: {data}") - html = updater.stage_update_url(data["url"]) + html = update_core.stage_update_url(data["url"]) elif data["action"] == "remove": logger.info("Removing update - data: {data}") - html = updater.unstage_update_url(data["url"]) + html = update_core.unstage_update_url(data["url"]) else: html = f"Invalid request: {data["action"]}" return dumps(html) class Reset(): - def post(self, data, updater: Updater, logger: uLogger) -> None: + def post(self, data, update_core: UpdateCore, logger: uLogger) -> None: logger.info("API request - reset") - updater.reset() + update_core.reset() return + +class Modules(): + + def get(self, data, sensors, logger: uLogger) -> str: + logger.info("API request - sensors/modules") + html = dumps(sensors.get_modules()) + logger.info(f"Return value: {html}") + return html + +class Sensors(): + + def get(self, data, module: str, sensors, logger: uLogger) -> str: + logger.info(f"API request - sensors/{module}") + sensor_list = sensors.get_sensors(module) + logger.info(f"Available sensors: {sensor_list}") + html = dumps(sensor_list) + logger.info(f"Return value: {html}") + return html + +class Readings(): + + def get(self, data, module: str, sensors, logger: uLogger) -> str: + logger.info(f"API request - sensors/readings - Module: {module}") + html = dumps(sensors.get_readings(module)) + logger.info(f"Return value: {html}") + return html diff --git a/smibhid/http/www/api.html b/smibhid/http/www/api.html index 6cae4b7..bd4fbc3 100644 --- a/smibhid/http/www/api.html +++ b/smibhid/http/www/api.html @@ -7,18 +7,68 @@

SMIBHID - API

Endpoints

- + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
EndpointMethodOptionsDescription
/api/wlan/macGETGet WLAN MAC address
/api/versionGETGet firmware version
/api/firmware_filesGET, POST +
    +
  • List files: GET
  • +
  • Add file: POST: Submit Value = "Add", URL = Str: id = "url"
  • +
  • Remove file: POST: Submit Value = "Remove", Str: URL
  • +
+
List, add or remove URLs staged for download and patch on reset
/api/resetPOSTReset SMIBHID
/api/sensors/modulesGETList sensor modules
/api/sensors/modules/{module}GETInsert module name from module list in {module} parameterList module sensors
/api/sensors/readings/latestGETGet latest sensor readings from all modules

diff --git a/smibhid/http/www/css/style.css b/smibhid/http/www/css/style.css index d38cc5e..6f6a4ad 100644 --- a/smibhid/http/www/css/style.css +++ b/smibhid/http/www/css/style.css @@ -15,4 +15,32 @@ h2 { a { color: purple; font-family: Arial, Helvetica, sans-serif; +} + +table { + border-collapse: collapse; + border: 1px solid black; + width: 80%; +} + +th { + color: darkgreen; + font-family: Arial, Helvetica, sans-serif; + text-align: left; + padding-top: 5px; + padding-left: 5px; + padding-right: 20px; + padding-bottom: 5px; + border: 1px solid black; +} + +td { + color: darkslategray; + font-family: Arial, Helvetica, sans-serif; + text-align: left; + padding-top: 5px; + padding-left: 5px; + padding-right: 20px; + padding-bottom: 5px; + border: 1px solid black; } \ No newline at end of file diff --git a/smibhid/images/SMIBHID circuit diagram.drawio.png b/smibhid/images/SMIBHID circuit diagram.drawio.png index 497cb61..db0ba82 100644 Binary files a/smibhid/images/SMIBHID circuit diagram.drawio.png and b/smibhid/images/SMIBHID circuit diagram.drawio.png differ diff --git a/smibhid/images/pico_2_w_pinout.png b/smibhid/images/pico_2_w_pinout.png new file mode 100644 index 0000000..f3ae087 Binary files /dev/null and b/smibhid/images/pico_2_w_pinout.png differ diff --git a/smibhid/images/pico_w_pinout.png b/smibhid/images/pico_w_pinout.png deleted file mode 100644 index fc5614a..0000000 Binary files a/smibhid/images/pico_w_pinout.png and /dev/null differ diff --git a/smibhid/lib/LCD1602.py b/smibhid/lib/LCD1602.py index 62a6906..5b0458d 100644 --- a/smibhid/lib/LCD1602.py +++ b/smibhid/lib/LCD1602.py @@ -50,7 +50,7 @@ class LCD1602: """Driver for the LCD1602 16x2 character LED display""" - def __init__(self) -> None: + def __init__(self, i2c) -> None: """Configure and connect to display via I2C, throw error on connection issue.""" self.log = uLogger("LCD1602") self.log.info("Init LCD1602 display driver") @@ -60,7 +60,7 @@ def __init__(self) -> None: self.spinner_task = None try: - self.LCD1602_I2C = I2C(I2C_ID, sda = SDA_PIN, scl = SCL_PIN, freq = 400000) + self.LCD1602_I2C = i2c self._showfunction = LCD_4BITMODE | LCD_1LINE | LCD_5x8DOTS self._begin(self._row) except BaseException: diff --git a/smibhid/lib/config/config_template.py b/smibhid/lib/config/config_template.py index 741a7e1..a878af5 100644 --- a/smibhid/lib/config/config_template.py +++ b/smibhid/lib/config/config_template.py @@ -49,6 +49,10 @@ SDA_PIN = 8 SCL_PIN = 9 I2C_ID = 0 +I2C_FREQ = 400000 + +## Sensors - Populate driver list with connected sensor modules from this supported list: ["SGP30", "BME280", "SCD30"] +SENSOR_MODULES = [] ## Displays - Populate driver list with connected displays from this supported list: ["LCD1602"] DISPLAY_DRIVERS = ["LCD1602"] diff --git a/smibhid/lib/display.py b/smibhid/lib/display.py index c2b6db3..bc63f25 100644 --- a/smibhid/lib/display.py +++ b/smibhid/lib/display.py @@ -12,11 +12,12 @@ class Display: Example: If an LCD1602 driver is configured to load, then issuing the command Display.print_startup() will render startup information appropriately on the 2x16 display if connected. """ - def __init__(self) -> None: + def __init__(self, i2c) -> None: self.log = uLogger("Display") self.drivers = DISPLAY_DRIVERS self.log.info("Init display") self.enabled = False + self.i2c = i2c self.screens = [] self._load_configured_drivers() self.state = "Unknown" @@ -30,7 +31,7 @@ def _load_configured_drivers(self) -> None: if driver_class is None: raise ValueError(f"Display driver class '{driver}' not registered.") - self.screens.append(driver_class()) + self.screens.append(driver_class(self.i2c)) except Exception as e: print(f"An error occurred while confguring display driver '{driver}': {e}") diff --git a/smibhid/lib/hid.py b/smibhid/lib/hid.py index b6cc186..6f411a0 100644 --- a/smibhid/lib/hid.py +++ b/smibhid/lib/hid.py @@ -6,12 +6,13 @@ from lib.display import Display from lib.networking import WirelessNetwork from lib.rfid.reader import RFIDReader -from config import RFID_ENABLED, CLOCK_FREQUENCY +from config import RFID_ENABLED, CLOCK_FREQUENCY, SDA_PIN, SCL_PIN, I2C_ID, I2C_FREQ from lib.uistate import UIState from lib.ui_log import UILog from http.website import WebApp from lib.pinger import Pinger -from machine import freq +from machine import freq, I2C +from lib.sensors import Sensors class HID: @@ -26,9 +27,11 @@ def __init__(self) -> None: self.log.info("Setting CPU frequency to: " + str(CLOCK_FREQUENCY / 1000000) + "MHz") freq(CLOCK_FREQUENCY) self.loop_running = False + self.i2c = I2C(I2C_ID, sda = SDA_PIN, scl = SCL_PIN, freq = I2C_FREQ) self.moduleConfig = ModuleConfig() - self.moduleConfig.register_display(Display()) + self.moduleConfig.register_display(Display(self.i2c)) self.moduleConfig.register_wifi(WirelessNetwork()) + self.moduleConfig.register_sensors(Sensors(self.i2c)) if RFID_ENABLED: self.moduleConfig.register_rfid(RFIDReader(Event())) self.display = self.moduleConfig.get_display() @@ -38,6 +41,7 @@ def __init__(self) -> None: self.ui_log = self.moduleConfig.get_ui_log() self.space_state = SpaceState(self.moduleConfig, self) self.pinger = Pinger(self.moduleConfig, self) + self.sensors = self.moduleConfig.get_sensors() self.error_handler = ErrorHandler("HID") self.error_handler.configure_display(self.display) self.web_app = WebApp(self.moduleConfig, self) @@ -60,6 +64,7 @@ def startup(self) -> None: self.space_state.startup() if self.reader: self.reader.startup() + self.sensors.startup() self.ui_log.startup() self.web_app.startup() diff --git a/smibhid/lib/module_config.py b/smibhid/lib/module_config.py index 7a92f3a..7e3a522 100644 --- a/smibhid/lib/module_config.py +++ b/smibhid/lib/module_config.py @@ -22,6 +22,7 @@ def __init__(self) -> None: self.wifi = None self.reader = None self.ui_log = None + self.sensors = None def register_display(self, display: Display) -> None: self.display = display @@ -34,6 +35,9 @@ def register_rfid(self, reader: RFIDReader) -> None: def register_ui_log(self, ui_log: UILog) -> None: self.ui_log = ui_log + + def register_sensors(self, sensors) -> None: + self.sensors = sensors def get_display(self) -> Display: if not self.display: @@ -58,4 +62,9 @@ def get_ui_log(self) -> UILog: self.log.warn("UI Log module not registered") raise ModuleNotRegisteredError("UI Log") return self.ui_log - \ No newline at end of file + + def get_sensors(self): + if not self.sensors: + self.log.warn("Sensors module not registered") + raise ModuleNotRegisteredError("Sensors") + return self.sensors \ No newline at end of file diff --git a/smibhid/lib/sensors/BME280.py b/smibhid/lib/sensors/BME280.py new file mode 100644 index 0000000..7ec04c9 --- /dev/null +++ b/smibhid/lib/sensors/BME280.py @@ -0,0 +1,278 @@ +# Updated 2018 and 2020 +# This module is based on the below cited resources, which are all +# based on the documentation as provided in the Bosch Data Sheet and +# the sample implementation provided therein. +# +# Final Document: BST-BME280-DS002-15 +# +# Authors: Paul Cunnane 2016, Peter Dahlebrg 2016 +# +# This module borrows from the Adafruit BME280 Python library. Original +# Copyright notices are reproduced below. +# +# Those libraries were written for the Raspberry Pi. This modification is +# intended for the MicroPython and esp8266 boards. +# +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Based on the BMP280 driver with BME280 changes provided by +# David J Taylor, Edinburgh (www.satsignal.eu) +# +# Based on Adafruit_I2C.py created by Kevin Townsend. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +import time +from ustruct import unpack +from array import array +from micropython import const +from lib.sensors.sensor_module import SensorModule + +# BME280 default address. +BME280_I2CADDR = 0x76 + +# Operating Modes +BME280_OSAMPLE_1 = 1 +BME280_OSAMPLE_2 = 2 +BME280_OSAMPLE_4 = 3 +BME280_OSAMPLE_8 = 4 +BME280_OSAMPLE_16 = 5 + +BME280_REGISTER_CONTROL_HUM = 0xF2 +BME280_REGISTER_STATUS = 0xF3 +BME280_REGISTER_CONTROL = 0xF4 + +MODE_SLEEP = const(0) +MODE_FORCED = const(1) +MODE_NORMAL = const(3) + +BME280_TIMEOUT = const(100) # about 1 second timeout + +class BME280(SensorModule): + + def __init__(self, + i2c, + mode=BME280_OSAMPLE_8, + address=BME280_I2CADDR, + **kwargs): + super().__init__([{"name": "pressure", "unit": "hPa"}, {"name": "temperature", "unit": "C"}, {"name": "humidity", "unit": "%"}]) # Don't use degrees sign, it breaks json.dumps() + # Check that mode is valid. + if type(mode) is tuple and len(mode) == 3: + self._mode_hum, self._mode_temp, self._mode_press = mode + elif type(mode) == int: + self._mode_hum, self._mode_temp, self._mode_press = mode, mode, mode + else: + raise ValueError("Wrong type for the mode parameter, must be int or a 3 element tuple") + + for mode in (self._mode_hum, self._mode_temp, self._mode_press): + if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4, + BME280_OSAMPLE_8, BME280_OSAMPLE_16]: + raise ValueError( + 'Unexpected mode value {0}. Set mode to one of ' + 'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or ' + 'BME280_ULTRAHIGHRES'.format(mode)) + + self.address = address + if i2c is None: + raise ValueError('An I2C object is required.') + self.i2c = i2c + self.__sealevel = 101325 + + # load calibration data + dig_88_a1 = self.i2c.readfrom_mem(self.address, 0x88, 26) + dig_e1_e7 = self.i2c.readfrom_mem(self.address, 0xE1, 7) + + self.dig_T1, self.dig_T2, self.dig_T3, self.dig_P1, \ + self.dig_P2, self.dig_P3, self.dig_P4, self.dig_P5, \ + self.dig_P6, self.dig_P7, self.dig_P8, self.dig_P9, \ + _, self.dig_H1 = unpack("> 4 + raw_press = ((readout[0] << 16) | (readout[1] << 8) | readout[2]) >> 4 + # temperature(0xFA): ((msb << 16) | (lsb << 8) | xlsb) >> 4 + raw_temp = ((readout[3] << 16) | (readout[4] << 8) | readout[5]) >> 4 + # humidity(0xFD): (msb << 8) | lsb + raw_hum = (readout[6] << 8) | readout[7] + + result[0] = raw_temp + result[1] = raw_press + result[2] = raw_hum + + def read_compensated_data(self, result=None): + """ Reads the data from the sensor and returns the compensated data. + + Args: + result: array of length 3 or alike where the result will be + stored, in temperature, pressure, humidity order. You may use + this to read out the sensor without allocating heap memory + + Returns: + array with temperature, pressure, humidity. Will be the one + from the result parameter if not None + """ + self.read_raw_data(self._l3_resultarray) + raw_temp, raw_press, raw_hum = self._l3_resultarray + # temperature + var1 = (raw_temp/16384.0 - self.dig_T1/1024.0) * self.dig_T2 + var2 = raw_temp/131072.0 - self.dig_T1/8192.0 + var2 = var2 * var2 * self.dig_T3 + self.t_fine = int(var1 + var2) + temp = (var1 + var2) / 5120.0 + temp = max(-40, min(85, temp)) + + # pressure + var1 = (self.t_fine/2.0) - 64000.0 + var2 = var1 * var1 * self.dig_P6 / 32768.0 + var1 * self.dig_P5 * 2.0 + var2 = (var2 / 4.0) + (self.dig_P4 * 65536.0) + var1 = (self.dig_P3 * var1 * var1 / 524288.0 + self.dig_P2 * var1) / 524288.0 + var1 = (1.0 + var1 / 32768.0) * self.dig_P1 + if (var1 == 0.0): + pressure = 30000 # avoid exception caused by division by zero + else: + p = ((1048576.0 - raw_press) - (var2 / 4096.0)) * 6250.0 / var1 + var1 = self.dig_P9 * p * p / 2147483648.0 + var2 = p * self.dig_P8 / 32768.0 + pressure = p + (var1 + var2 + self.dig_P7) / 16.0 + pressure = max(30000, min(110000, pressure)) + + # humidity + h = (self.t_fine - 76800.0) + h = ((raw_hum - (self.dig_H4 * 64.0 + self.dig_H5 / 16384.0 * h)) * + (self.dig_H2 / 65536.0 * (1.0 + self.dig_H6 / 67108864.0 * h * + (1.0 + self.dig_H3 / 67108864.0 * h)))) + humidity = h * (1.0 - self.dig_H1 * h / 524288.0) + if (humidity < 0): + humidity = 0 + if (humidity > 100): + humidity = 100.0 + + if result: + result[0] = temp + result[1] = pressure + result[2] = humidity + return result + + return array("f", (temp, pressure, humidity)) + + @property + def sealevel(self): + return self.__sealevel + + @sealevel.setter + def sealevel(self, value): + if 30000 < value < 120000: # just ensure some reasonable value + self.__sealevel = value + + @property + def altitude(self): + ''' + Altitude in m. + ''' + from math import pow + try: + p = 44330 * (1.0 - pow(self.read_compensated_data()[1] / + self.__sealevel, 0.1903)) + except: + p = 0.0 + return p + + @property + def dew_point(self): + """ + Compute the dew point temperature for the current Temperature + and Humidity measured pair + """ + from math import log + t, p, h = self.read_compensated_data() + h = (log(h, 10) - 2) / 0.4343 + (17.62 * t) / (243.12 + t) + return 243.12 * h / (17.62 - h) + + @property + def values(self) -> tuple: + """ human readable values """ + + t, p, h = self.read_compensated_data() + + return ("{:.2f}C".format(t), "{:.2f}hPa".format(p/100), + "{:.2f}%".format(h)) + + @property + def values_no_units(self) -> tuple: + """ human readable values """ + + t, p, h = self.read_compensated_data() + + return ("{:.2f}".format(t), "{:.2f}".format(p/100), + "{:.2f}".format(h)) + + def get_reading(self) -> dict: + data = self.values_no_units + reading = { + "temperature": data[0], + "pressure": data[1], + "humidity": data[2] + } + return reading diff --git a/smibhid/lib/sensors/SCD30.py b/smibhid/lib/sensors/SCD30.py new file mode 100644 index 0000000..5ca34c6 --- /dev/null +++ b/smibhid/lib/sensors/SCD30.py @@ -0,0 +1,186 @@ +from machine import I2C +import utime +import struct +from lib.sensors.sensor_module import SensorModule + +class SCD30(SensorModule): + + class NotFoundException(Exception): + pass + + class CRCException(Exception): + pass + + START_CONT_MEASURE = 0x0010 + STOP_CONT_MEASURE = 0x0104 + SET_MEASURE_INTERVAL = 0x4600 + GET_STATUS_READY = 0x0202 + READ_MEASUREMENT = 0x0300 + SET_ASC = 0x5306 + SET_FRC = 0x5204 + SET_TEMP_OFFSET = 0x5403 + SET_ALT_COMP = 0x5102 + GET_FIRMWARE_VER = 0xd100 + SOFT_RESET = 0xd304 + + CLOCK_TIME_US = 10 + + # Generated using + # crc_table = [] + # for crc in range(256): + # for crc_bit in range(8): + # if crc & 0x80: + # crc = (crc << 1) ^ CRC8_POLYNOMIAL; + # else: + # crc = (crc << 1); + # crc = crc%256 + # crc_table.append(crc) + + CRC_TABLE = [ + 0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46, + 67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109, + 134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168, + 197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235, + 61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19, + 126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80, + 187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149, + 248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214, + 122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84, + 57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23, + 252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210, + 191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145, + 71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105, + 4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42, + 193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239, + 130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172 + ] + + def __init__(self, i2c: I2C, pause=1000): + super().__init__([{"name": "co2", "unit": "ppm"}, {"name": "temperature", "unit": "C"}, {"name": "relative_humidity", "unit": "%"}]) # Don't use degrees sign, it breaks json.dumps() + self.i2c = i2c + self.pause = pause + self.addr = 0x61 + if not self.addr in i2c.scan(): + raise self.NotFoundException + + def start_continuous_measurement(self, ambient_pressure=0): + bint = struct.pack('>H', ambient_pressure) + crc = self.__crc(bint[0], bint[1]) + data = bint + bytes([crc]) + self.i2c.writeto_mem(self.addr, self.START_CONT_MEASURE, data, addrsize=16) + + def stop_continuous_measurement(self): + self.__write_command(self.STOP_CONT_MEASURE) + + def soft_reset(self): + self.__write_command(self.SOFT_RESET) + + def get_firmware_version(self): + ver = self.__read_bytes(self.GET_FIRMWARE_VER, 3) + self.__check_crc(ver) + return struct.unpack('BB', ver) + + def read_measurement(self): + measurement = self.__read_bytes(self.READ_MEASUREMENT, 18) + for i in range(0, len(measurement), 3): + self.__check_crc(measurement[i:i+3]) + + value = measurement[0:] + co2 = struct.unpack('>f', value[0:2] + value[3:5])[0] + value = measurement[6:] + temperature = struct.unpack('>f', value[0:2] + value[3:5])[0] + value = measurement[12:] + relh = struct.unpack('>f', value[0:2] + value[3:5])[0] + return (co2, temperature, relh) + + def get_status_ready(self): + ready = self.__read_bytes(self.GET_STATUS_READY, 3) + self.__check_crc(ready) + return struct.unpack('>H', ready)[0] + + def get_measurement_interval(self): + bint = self.__read_bytes(self.SET_MEASURE_INTERVAL, 3) + self.__check_crc(bint) + return struct.unpack('>H', bint)[0] + + def set_measurement_interval(self, interval): + bint = struct.pack('>H', interval) + crc = self.__crc(bint[0], bint[1]) + data = bint + bytes([crc]) + self.i2c.writeto_mem(self.addr, self.SET_MEASURE_INTERVAL, data, addrsize=16) + + def get_automatic_recalibration(self): + bint = self.__read_bytes(self.SET_ASC, 3) + self.__check_crc(bint) + return struct.unpack('>H', bint)[0] == 1 + + def set_automatic_recalibration(self, enable): + bint = struct.pack('>H', 1 if enable else 0) + crc = self.__crc(bint[0], bint[1]) + data = bint + bytes([crc]) + self.i2c.writeto_mem(self.addr, self.SET_FRC, data, addrsize=16) + + def get_forced_recalibration(self): + bint = self.__read_bytes(self.SET_FRC, 3) + self.__check_crc(bint) + return struct.unpack('>H', bint)[0] + + def set_forced_recalibration(self, co2ppm): + bint = struct.pack('>H', co2ppm) + crc = self.__crc(bint[0], bint[1]) + data = bint + bytes([crc]) + self.i2c.writeto_mem(self.addr, self.SET_FRC, data, addrsize=16) + + def get_temperature_offset(self): + bint = self.__read_bytes(self.SET_TEMP_OFFSET, 3) + self.__check_crc(bint) + return struct.unpack('>H', bint)[0] / 100.0 + + def set_temperature_offset(self, offset): + bint = struct.pack('>H', int(offset * 100)) + crc = self.__crc(bint[0], bint[1]) + data = bint + bytes([crc]) + self.i2c.writeto_mem(self.addr, self.SET_TEMP_OFFSET, data, addrsize=16) + + def get_altitude_comp(self): + bint = self.__read_bytes(self.SET_ALT_COMP, 3) + self.__check_crc(bint) + return struct.unpack('>H', bint)[0] + + def set_altitude_comp(self, altitude): + bint = struct.pack('>H', altitude) + crc = self.__crc(bint[0], bint[1]) + data = bint + bytes([crc]) + self.i2c.writeto_mem(self.addr, self.SET_ALT_COMP, data, addrsize=16) + + def __write_command(self, cmd): + bcmd = struct.pack('>H', cmd) + self.i2c.writeto(self.addr, bcmd) + + def __read_bytes(self, cmd, count): + self.__write_command(cmd) + utime.sleep_us(self.pause) + return self.i2c.readfrom(self.addr, count) + + def __check_crc(self, arr): + assert (len(arr) == 3) + if self.__crc(arr[0], arr[1]) != arr[2]: + raise self.CRCException + + def __crc(self, msb, lsb): + crc = 0xff + crc ^= msb + crc = self.CRC_TABLE[crc] + if lsb is not None: + crc ^= lsb + crc = self.CRC_TABLE[crc] + return crc + + def get_formatted_reading(self): + co2, temperature, relative_humidity = self.read_measurement() + return (int(co2), "{:.2f}".format(temperature), + "{:.2f}".format(relative_humidity)) + + def get_reading(self) -> dict: + co2, temperature, relative_humidity = self.get_formatted_reading() + return {"co2": co2, "temperature": temperature, "relative_humidity": relative_humidity} diff --git a/smibhid/lib/sensors/SGP30.py b/smibhid/lib/sensors/SGP30.py new file mode 100644 index 0000000..68c5e80 --- /dev/null +++ b/smibhid/lib/sensors/SGP30.py @@ -0,0 +1,209 @@ +# The MIT License (MIT) +# +# Copyright (c) 2017 ladyada for Adafruit Industries +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +""" +`adafruit_sgp30` +==================================================== + +I2C driver for SGP30 Sensirion VoC sensor + +* Author(s): ladyada, Alexandre Marquet. + +Implementation Notes +-------------------- + +**Hardware:** + +* Adafruit `SGP30 Air Quality Sensor Breakout - VOC and eCO2 + `_ (Product ID: 3709) + +**Software and Dependencies:** + +* MicroPython: + https://github.com/micropython/micropython + +""" +import math +import time +from micropython import const +from lib.sensors.sensor_module import SensorModule + +__version__ = "0.0.0-auto.0" +__repo__ = "https://github.com/alexmrqt/Adafruit_CircuitPython_SGP30.git" + + +# pylint: disable=bad-whitespace +_SGP30_DEFAULT_I2C_ADDR = const(0x58) +_SGP30_FEATURESET_0 = const(0x0020) +_SGP30_FEATURESET_1 = const(0x0022) + +_SGP30_CRC8_POLYNOMIAL = const(0x31) +_SGP30_CRC8_INIT = const(0xFF) +_SGP30_WORD_LEN = const(2) +# pylint: enable=bad-whitespace + +class SGP30(SensorModule): + """ + A driver for the SGP30 gas sensor. + + :param i2c: The `I2C` object to use. This is the only required parameter. + :param int address: (optional) The I2C address of the device. + """ + + def __init__(self, i2c, address=_SGP30_DEFAULT_I2C_ADDR): + """Initialize the sensor, get the serial # and verify that we found a proper SGP30""" + super().__init__([{"name": "co2eq", "unit": "ppm"}, {"name": "tvoc", "unit": "ppm"}]) + self._i2c = i2c + self._addr = address + + # get unique serial, its 48 bits so we store in an array + self.serial = self._i2c_read_words_from_cmd([0x36, 0x82], 0.01, 3) + # get featuerset + featureset = self._i2c_read_words_from_cmd([0x20, 0x2f], 0.01, 1) + if featureset[0] not in [ _SGP30_FEATURESET_0, _SGP30_FEATURESET_1 ]: + raise RuntimeError('SGP30 Not detected') + self.iaq_init() + + @property + def tvoc(self): + """Total Volatile Organic Compound in parts per billion.""" + return self.iaq_measure()[1] + + + @property + def baseline_tvoc(self): + """Total Volatile Organic Compound baseline value""" + return self.get_iaq_baseline()[1] + + + @property + def co2eq(self): + """Carbon Dioxide Equivalent in parts per million""" + return self.iaq_measure()[0] + + + @property + def baseline_co2eq(self): + """Carbon Dioxide Equivalent baseline value""" + return self.get_iaq_baseline()[0] + + + def iaq_init(self): + """Initialize the IAQ algorithm""" + # name, command, signals, delay + self._run_profile(["iaq_init", [0x20, 0x03], 0, 0.01]) + + def iaq_measure(self): + """Measure the CO2eq and TVOC""" + # name, command, signals, delay + return self._run_profile(["iaq_measure", [0x20, 0x08], 2, 0.05]) + + def get_iaq_baseline(self): + """Retreive the IAQ algorithm baseline for CO2eq and TVOC""" + # name, command, signals, delay + return self._run_profile(["iaq_get_baseline", [0x20, 0x15], 2, 0.01]) + + def set_iaq_baseline(self, co2eq, tvoc): + """Set the previously recorded IAQ algorithm baseline for CO2eq and TVOC""" + if co2eq == 0 and tvoc == 0: + raise RuntimeError('Invalid baseline') + buffer = [] + for value in [tvoc, co2eq]: + arr = [value >> 8, value & 0xFF] + arr.append(self._generate_crc(arr)) + buffer += arr + self._run_profile(["iaq_set_baseline", [0x20, 0x1e] + buffer, 0, 0.01]) + + def set_iaq_humidity(self, gramsPM3): + """Set the humidity in g/m3 for eCO2 and TVOC compensation algorithm""" + tmp = int(gramsPM3 * 256) + buffer = [] + for value in [tmp]: + arr = [value >> 8, value & 0xFF] + arr.append(self._generate_crc(arr)) + buffer += arr + self._run_profile(["iaq_set_humidity", [0x20, 0x61] + buffer, 0, 0.01]) + + def set_iaq_rel_humidity(self, rh, temp): + """Set the relative humidity in % for eCO2 and TVOC compensation algorithm""" + # Formula from "Generic SGP Driver Integration for Software I2C" + gramsPM3 = rh/100.0 * 6.112 * math.exp(17.62*temp/(243.12+temp)) + gramsPM3 *= 216.7 / (273.15 + temp) + + self.set_iaq_humidity(gramsPM3) + + # Low level command functions + + def _run_profile(self, profile): + """Run an SGP 'profile' which is a named command set""" + # pylint: disable=unused-variable + name, command, signals, delay = profile + # pylint: enable=unused-variable + + #print("\trunning profile: %s, command %s, %d, delay %0.02f" % + # (name, ["0x%02x" % i for i in command], signals, delay)) + return self._i2c_read_words_from_cmd(command, delay, signals) + + + def _i2c_read_words_from_cmd(self, command, delay, reply_size): + """Run an SGP command query, get a reply and CRC results if necessary""" + self._i2c.writeto(self._addr, bytes(command)) + time.sleep(delay) + if not reply_size: + return None + crc_result = bytearray(reply_size * (_SGP30_WORD_LEN +1)) + self._i2c.readfrom_into(self._addr, crc_result) + #print("\tRaw Read: ", crc_result) + result = [] + for i in range(reply_size): + word = [crc_result[3*i], crc_result[3*i+1]] + crc = crc_result[3*i+2] + if self._generate_crc(word) != crc: + raise RuntimeError('CRC Error') + result.append(word[0] << 8 | word[1]) + #print("\tOK Data: ", [hex(i) for i in result]) + return result + + # pylint: disable=no-self-use + def _generate_crc(self, data): + """8-bit CRC algorithm for checking data""" + crc = _SGP30_CRC8_INIT + # calculates 8-Bit checksum with given polynomial + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ _SGP30_CRC8_POLYNOMIAL + else: + crc <<= 1 + return crc & 0xFF + + def get_formatted_reading(self) -> tuple: + co2eq, tvoc = self.iaq_measure() + return (int(co2eq), int(tvoc)) + + def get_reading(self) -> dict: + reading = self.get_formatted_reading() + data = { + "co2eq": reading[0], + "tvoc": reading[1] + } + return data diff --git a/smibhid/lib/sensors/__init__.py b/smibhid/lib/sensors/__init__.py new file mode 100644 index 0000000..d81aa4d --- /dev/null +++ b/smibhid/lib/sensors/__init__.py @@ -0,0 +1,101 @@ +from asyncio import create_task, sleep +from machine import I2C +from config import SENSOR_MODULES +from lib.ulogging import uLogger +from lib.sensors.SGP30 import SGP30 +from lib.sensors.BME280 import BME280 +from lib.sensors.SCD30 import SCD30 +from lib.sensors.sensor_module import SensorModule + +class Sensors: + def __init__(self, i2c: I2C) -> None: + self.log = uLogger("Sensors") + self.i2c = i2c + self.SENSOR_MODULES = SENSOR_MODULES + self.available_modules: dict[str, SensorModule] = {} + self.configured_modules: dict[str, SensorModule] = {} + self.load_modules() + self._configure_modules() + + def load_modules(self) -> None: #TODO DRY this out + try: + self.log.info("Loading SGP30 sensor module") + self.available_modules["SGP30"] = SGP30(self.i2c) + self.log.info("Loaded SGP30 sensor module") + except RuntimeError as e: + self.log.error(f"Failed to load SGP30 sensor module: {e}") + except Exception as e: + self.log.error(f"Failed to load SGP30 sensor module: {e}") + + try: + self.log.info("Loading BME280 sensor module") + self.available_modules["BME280"] = BME280(self.i2c) + self.log.info("Loaded BME280 sensor module") + except RuntimeError as e: + self.log.error(f"Failed to load BME280 sensor module: {e}") + except Exception as e: + self.log.error(f"Failed to load BME280 sensor module: {e}") + + try: + self.log.info("Loading SCD30 sensor module") + self.available_modules["SCD30"] = SCD30(self.i2c) + self.log.info("Loaded SCD30 sensor module") + except RuntimeError as e: + self.log.error(f"Failed to load SCD30 sensor module: {e}") + except Exception as e: + self.log.error(f"Failed to load SCD30 sensor module: {e}") + + def _configure_modules(self) -> None: + self.log.info(f"Attempting to locate drivers for: {self.SENSOR_MODULES}") + for sensor_module in self.SENSOR_MODULES: + if sensor_module in self.available_modules: + self.log.info(f"Found driver for {sensor_module}") + self.configured_modules[sensor_module] = self.available_modules[sensor_module] + self.log.info(f"Configured {sensor_module} sensor module") + self.log.info(f"Available sensors: {self.get_sensors(sensor_module)}") + else: + self.log.error(f"Driver not found for {sensor_module}") + + self.log.info(f"Configured modules: {self.get_modules()}") + + def startup(self) -> None: + self.log.info(f"Starting sensors: {self.configured_modules}") + create_task(self._poll_sensors()) + + async def _poll_sensors(self) -> None: + """ + Asynchronously poll sensors and log readings every 60 seconds. + """ + while True: + readings = self.get_readings() + for reading in readings: + self.log.info(f"Module: {reading}: {readings[reading]}") + await sleep(60) + + def get_modules(self) -> list: + """ + Return a dictionary of configured sensor modules. + """ + return list(self.configured_modules.keys()) + + def get_sensors(self, module: str) -> list: + """ + Return list of sensors for a specific module name. + """ + module_object = self.configured_modules[module] + sensors = module_object.get_sensors() + self.log.info(f"Available sensors for {module}: {sensors}") + return sensors + + def get_readings(self, module: str = "") -> dict: + """ + Return readings from a specific module by passing it's name as a + string, or all modules if none specified. + """ + readings = {} + if module: + readings[module] = self.configured_modules[module].get_reading() + else: + for name, instance in self.configured_modules.items(): + readings[name] = instance.get_reading() + return readings diff --git a/smibhid/lib/sensors/sensor_module.py b/smibhid/lib/sensors/sensor_module.py new file mode 100644 index 0000000..a1df076 --- /dev/null +++ b/smibhid/lib/sensors/sensor_module.py @@ -0,0 +1,21 @@ +from lib.ulogging import uLogger + +class SensorModule: + """ + Base class for sensor modules. + """ + def __init__(self, sensors: list) -> None: + self.log = uLogger("SensorModule") + self.sensors = sensors + + def get_sensors(self) -> list: + """ + Return list of sensors for a specific module. + """ + return self.sensors + + def get_reading(self) -> dict: + """ + Return a dictionary of sensor name and value pairs + """ + raise NotImplementedError("Subclasses must implement this method") \ No newline at end of file diff --git a/smibhid/lib/updater.py b/smibhid/lib/updater.py index 835328b..ecc8068 100644 --- a/smibhid/lib/updater.py +++ b/smibhid/lib/updater.py @@ -8,11 +8,100 @@ from lib.display import Display from time import sleep -class Updater: +class UpdateCore: def __init__(self) -> None: + self.log = uLogger("UpdateCore") + self.update_path = "/updates" + self.check_for_updates_folder() + + def check_for_updates_folder(self) -> bool: + """ + Check if the /updates/ directory exists. + """ + self.log.info("Checking for updates folder") + try: + if self.update_path.strip('/') not in os.listdir('/'): + self.log.info("Updates folder does not exist - creating") + os.mkdir(self.update_path) + self.log.info("Updates folder created") + else: + self.log.info("Updates folder exists") + return True + except Exception as e: + self.log.error(f"Failed to check for updates folder: {e}") + return False + + def stage_update_url(self, url: str) -> bool: + """ + Stage an update file. + """ + self.log.info(f"Staging update file: {url}") + try: + with open(self.update_path + "/.updating", "a") as f: + f.write(url + "\n") + self.log.info("Update file staged") + return True + except Exception as e: + self.log.error(f"Failed to stage update file: {e}") + return False + + def unstage_update_url(self, url: str) -> bool: + """ + Unstage an update file. + """ + self.log.info(f"Unstaging update file: {url}") + try: + with open(self.update_path + "/.updating", "r") as f: + update_data = f.read() + with open(self.update_path + "/.updating", "w") as f: + line_count = 0 + for update in update_data.split("\n"): + if update != url and update != "": + line_count += 1 + f.write(update + "\n") + self.log.info("Update file unstaged") + if line_count == 0: + os.remove(self.update_path + "/.updating") + self.log.info("No updates staged - removing update file") + return True + except Exception as e: + self.log.error(f"Failed to unstage update file: {e}") + return False + + def process_update_file(self) -> list: + """ + Process updates. + """ + self.log.info("Processing updates") + urls = [] + try: + with open(self.update_path + "/.updating", "r") as f: + update_data = f.read() + self.log.info(f"Update data: {update_data}") + + for update in update_data.split("\n"): + if update != "": + urls.append(update) + self.log.info(f"URLs: {urls}") + + except Exception as e: + self.log.error(f"Failed to process updates: {e}") + + finally: + return urls + + def reset(self) -> None: + """ + Restart the device. + """ + self.log.info("Restarting device") + machine.reset() + +class Updater(UpdateCore): + def __init__(self, i2c) -> None: + super().__init__() self.log = uLogger("Updater") - self.update_path = "/updates/" - self.display = Display() + self.display = Display(i2c) def enter_update_mode(self) -> bool: """ @@ -60,7 +149,7 @@ def exit_with_success(self) -> None: """ self.log.info("Updates applied successfully - clearing update flag to reboot into normal mode") try: - os.remove(self.update_path + ".updating") + os.remove(self.update_path + "/.updating") except Exception as e: self.log.warn(f"Unable to delete .updating file, may already be removed - this is unusual, but not fatal: {e}") self.display.clear() @@ -74,33 +163,11 @@ def exit_with_failure(self) -> None: known normal run state. """ self.log.error("Cannot apply updates - future code will revert to backed up files - clearing update flag to reboot into normal mode") - os.remove(self.update_path + ".updating") + os.remove(self.update_path + "/.updating") self.display.clear() self.display.print_update_status("Failed") sleep(2) self.reset() - - def process_update_file(self) -> list: - """ - Process updates. - """ - self.log.info("Processing updates") - urls = [] - try: - with open(self.update_path + ".updating", "r") as f: - update_data = f.read() - self.log.info(f"Update data: {update_data}") - - for update in update_data.split("\n"): - if update != "": - urls.append(update) - self.log.info(f"URLs: {urls}") - - except Exception as e: - self.log.error(f"Failed to process updates: {e}") - - finally: - return urls def connect_wifi(self) -> bool: """ @@ -117,43 +184,6 @@ def connect_wifi(self) -> bool: self.log.info("Connected to wifi") return True - - def stage_update_url(self, url: str) -> bool: - """ - Stage an update file. - """ - self.log.info(f"Staging update file: {url}") - try: - with open(self.update_path + ".updating", "a") as f: - f.write(url + "\n") - self.log.info("Update file staged") - return True - except Exception as e: - self.log.error(f"Failed to stage update file: {e}") - return False - - def unstage_update_url(self, url: str) -> bool: - """ - Unstage an update file. - """ - self.log.info(f"Unstaging update file: {url}") - try: - with open(self.update_path + ".updating", "r") as f: - update_data = f.read() - with open(self.update_path + ".updating", "w") as f: - line_count = 0 - for update in update_data.split("\n"): - if update != url and update != "": - line_count += 1 - f.write(update + "\n") - self.log.info("Update file unstaged") - if line_count == 0: - os.remove(self.update_path + ".updating") - self.log.info("No updates staged - removing update file") - return True - except Exception as e: - self.log.error(f"Failed to unstage update file: {e}") - return False def download_file(self, url: str) -> bool: """ @@ -187,7 +217,7 @@ def save_file(self, filename, data) -> bool: """ try: self.log.info(f"Saving file: {filename}") - with open(self.update_path + filename, "wb") as f: + with open(self.update_path + '/' + filename, "wb") as f: f.write(data) self.log.info(f"File saved: {filename}") return True @@ -206,7 +236,7 @@ def apply_files(self) -> bool: if file_name == ".updating": continue self.log.info(f"Updating {file_name}") - update_path = self.update_path + file_name + update_path = self.update_path + '/' + file_name target_path = "/lib/" + file_name self.log.info(f"Moving {update_path} to {target_path}") os.rename(update_path, target_path) @@ -216,10 +246,3 @@ def apply_files(self) -> bool: except Exception as e: self.log.error(f"Failed to update HID: {e}") return False - - def reset(self) -> None: - """ - Restart the device. - """ - self.log.info("Restarting device") - machine.reset() diff --git a/smibhid/main.py b/smibhid/main.py index 18f97df..b7bc282 100644 --- a/smibhid/main.py +++ b/smibhid/main.py @@ -13,7 +13,10 @@ pass if ".updating" in updates_dir_list: from lib.updater import Updater - updater = Updater() + from machine import I2C + from config import SDA_PIN, SCL_PIN, I2C_ID, I2C_FREQ + i2c = I2C(I2C_ID, sda = SDA_PIN, scl = SCL_PIN, freq = I2C_FREQ) + updater = Updater(i2c) updater.enter_update_mode() else: