diff --git a/bme680.py b/bme680.py index 1816cb8..60285df 100644 --- a/bme680.py +++ b/bme680.py @@ -99,38 +99,37 @@ class Adafruit_BME680: def __init__(self, *, refresh_rate=10): """Check the BME680 was found, read the coefficients and enable the sensor for continuous reads.""" - self._write(_BME680_REG_SOFTRESET, [0xB6]) - time.sleep(0.005) - - # Check device ID. - chip_id = self._read_byte(_BME680_REG_CHIPID) - if chip_id != _BME680_CHIPID: - raise RuntimeError('Failed to find BME680! Chip ID 0x%x' % chip_id) - - self._read_calibration() - - # set up heater - self._write(_BME680_BME680_RES_HEAT_0, [0x73]) - self._write(_BME680_BME680_GAS_WAIT_0, [0x65]) - self.sea_level_pressure = 1013.25 - """Pressure in hectoPascals at sea level. Used to calibrate ``altitude``.""" - - # Default oversampling and filter register values. - self._pressure_oversample = 0b011 - self._temp_oversample = 0b100 - self._humidity_oversample = 0b010 - self._filter = 0b010 - - self._adc_pres = None - self._adc_temp = None - self._adc_hum = None - self._adc_gas = None - self._gas_range = None - self._t_fine = None - - self._last_reading = time.ticks_ms() - self._min_refresh_time = 1000 // refresh_rate + self._is_detected() + if self._detected == True: + self._read_calibration() + + # set up heater + self._write(_BME680_BME680_RES_HEAT_0, [0x73]) + self._write(_BME680_BME680_GAS_WAIT_0, [0x65]) + + self.sea_level_pressure = 1013.25 + """Pressure in hectoPascals at sea level. Used to calibrate ``altitude``.""" + + # Default oversampling and filter register values. + self._pressure_oversample = 0b011 + self._temp_oversample = 0b100 + self._humidity_oversample = 0b010 + self._filter = 0b010 + + self._last_pres = 0 + self._adc_pres = None + self._adc_temp = None + self._adc_hum = None + self._adc_gas = None + self._gas_range = None + self._t_fine = None + + self._last_reading = time.ticks_ms() + self._min_refresh_time = 1000 // refresh_rate + else: + # BME680 not detected, so return None for outside validation + return(None) @property def pressure_oversample(self): @@ -183,14 +182,22 @@ def filter_size(self, size): @property def temperature(self): """The compensated temperature in degrees celsius.""" - self._perform_reading() - calc_temp = (((self._t_fine * 5) + 128) / 256) - return calc_temp / 100 + if self._debug: print(f"\t Reading temperature... ", end="") + result = self._perform_reading() + if result is None: + if self._debug: print(f"...temp was None!") + return None + if self._debug: print(f"...fixing up temperature... ", end="") + calc_temp = (((self._t_fine * 5) + 128) / 256) / 100 + if self._debug: print(f"...temperature is {calc_temp} ") + return calc_temp @property def pressure(self): """The barometric pressure in hectoPascals""" - self._perform_reading() + result = self._perform_reading() + if result is None: + return None var1 = (self._t_fine / 2) - 64000 var2 = ((var1 / 4) * (var1 / 4)) / 2048 var2 = (var2 * self._pressure_calibration[5]) / 4 @@ -208,12 +215,24 @@ def pressure(self): var2 = ((calc_pres / 4) * self._pressure_calibration[7]) / 8192 var3 = (((calc_pres / 256) ** 3) * self._pressure_calibration[9]) / 131072 calc_pres += ((var1 + var2 + var3 + (self._pressure_calibration[6] * 128)) / 16) - return calc_pres/100 + if calc_pres > 1500: + calc_pres = calc_pres / 100 + if calc_pres > 630: + # this looks valid, so cache it and return the value + self._last_pres = calc_pres + return calc_pres + if self._last_pres > 0: + # calc value is too low, so if we have a "good" prior read, return that + return self._last_pres + # otherwise just return the weird value, and hopefully we catch up + return calc_pres @property def humidity(self): """The relative humidity in RH %""" - self._perform_reading() + result = self._perform_reading() + if result is None: + return None temp_scaled = ((self._t_fine * 5) + 128) / 256 var1 = ((self._adc_hum - (self._humidity_calibration[0] * 16)) - ((temp_scaled * self._humidity_calibration[2]) / 200)) @@ -245,16 +264,49 @@ def altitude(self): @property def gas(self): """The gas resistance in ohms""" - self._perform_reading() + result = self._perform_reading() + if result is None: + return None var1 = ((1340 + (5 * self._sw_err)) * (_LOOKUP_TABLE_1[self._gas_range])) / 65536 var2 = ((self._adc_gas * 32768) - 16777216) + var1 var3 = (_LOOKUP_TABLE_2[self._gas_range] * var1) / 512 calc_gas_res = (var3 + (var2 / 2)) / var2 return int(calc_gas_res) + @property + def detected(self): + """Whether the BME600 was detected""" + return self._detected + + def _is_detected(self): + """Check if the BME680 was found""" + if self._debug: print("Resetting BME680") + self._write(_BME680_REG_SOFTRESET, [0xB6]) + time.sleep(0.005) + + if self._debug: print("Attempting to read from BME680") + # Check device ID. + chip_id = self._read_byte(_BME680_REG_CHIPID) + # No chip found at this address + if chip_id is None: + if self._debug: print("chip_id is None") + self._detected = False + # Unsupported chip found at this address + elif chip_id != _BME680_CHIPID: + if self._debug: print(f"chip_id is {chip_id}") + self._detected = False + # Detected + else: + if self._debug: print("BME680 found!") + self._detected = True + def _perform_reading(self): """Perform a single-shot reading from the sensor and fill internal data structure for calculations""" + if self._detected == False: + if self._debug: print("\t perform_reading failed!") + return None + expired = time.ticks_diff(self._last_reading, time.ticks_ms()) * time.ticks_diff(0, 1) if 0 <= expired < self._min_refresh_time: time.sleep_ms(self._min_refresh_time - expired) @@ -291,6 +343,7 @@ def _perform_reading(self): var3 = (var3 * self._temp_calibration[2] * 16) / 16384 self._t_fine = int(var2 + var3) + return(True) def _read_calibration(self): """Read & save the calibration coefficients""" @@ -316,12 +369,18 @@ def _read_calibration(self): def _read_byte(self, register): """Read a byte register value and return it""" - return self._read(register, 1)[0] + result = self._read(register, 1) + if result is not None: + return result[0] + else: + return None def _read(self, register, length): + # Overridden by I2C or SPI during init raise NotImplementedError() def _write(self, register, values): + # Overridden by I2C or SPI during init raise NotImplementedError() class BME680_I2C(Adafruit_BME680): @@ -337,23 +396,30 @@ def __init__(self, i2c, address=0x77, debug=False, *, refresh_rate=10): self._i2c = i2c self._address = address self._debug = debug - super().__init__(refresh_rate=refresh_rate) + return super().__init__(refresh_rate=refresh_rate) def _read(self, register, length): """Returns an array of 'length' bytes from the 'register'""" result = bytearray(length) - self._i2c.readfrom_mem_into(self._address, register & 0xff, result) + try: + self._i2c.readfrom_mem_into(self._address, register & 0xff, result) + except: + result = None if self._debug: print("\t${:x} read ".format(register), " ".join(["{:02x}".format(i) for i in result])) - return result + return(result) def _write(self, register, values): """Writes an array of 'length' bytes to the 'register'""" if self._debug: print("\t${:x} write".format(register), " ".join(["{:02x}".format(i) for i in values])) for value in values: - self._i2c.writeto_mem(self._address, register, bytearray([value & 0xFF])) - register += 1 + try: + self._i2c.writeto_mem(self._address, register, bytearray([value & 0xFF])) + register += 1 + return(0) + except: + return(None) class BME680_SPI(Adafruit_BME680): diff --git a/bmetest.py b/bmetest.py index 83ff6a1..e0a696f 100644 --- a/bmetest.py +++ b/bmetest.py @@ -1,10 +1,14 @@ -from bme680 import * -from machine import I2C, Pin -import time -bme = BME680_I2C(I2C(-1, Pin(13), Pin(12))) - -for _ in range(3): - print(bme.temperature, bme.humidity, bme.pressure, bme.gas) - time.sleep(1) +try: from bme680 import * +except: pass +try: from machine import I2C, Pin +except: pass +from time import sleep +try: + bme = BME680_I2C(I2C(-1, Pin(13), Pin(12))) + for _ in range(3): + print(bme.temperature, bme.humidity, bme.pressure, bme.gas) + sleep(1) +except: + print("BME680 not detected")