diff --git a/zhaquirks/xiaomi/aqara/thermostat_agl001.py b/zhaquirks/xiaomi/aqara/thermostat_agl001.py index 67af06a878..ef1e117980 100644 --- a/zhaquirks/xiaomi/aqara/thermostat_agl001.py +++ b/zhaquirks/xiaomi/aqara/thermostat_agl001.py @@ -4,6 +4,7 @@ from functools import reduce import math import struct +import time from typing import Any from zigpy.profiles import zha @@ -48,6 +49,10 @@ SENSOR = 0x027E BATTERY_PERCENTAGE = 0x040A +SENSOR_TEMP = 0x1392 # Fake address to pass external sensor temperature +SENSOR_ATTR = 0xFFF2 +SENSOR_ATTR_NAME = "sensor_attr" + XIAOMI_CLUSTER_ID = 0xFCC0 DAYS_MAP = { @@ -379,6 +384,8 @@ class AqaraThermostatSpecificCluster(XiaomiAqaraE1Cluster): SCHEDULE_SETTINGS: ("schedule_settings", ScheduleSettings, True), SENSOR: ("sensor", t.uint8_t, True), BATTERY_PERCENTAGE: ("battery_percentage", t.uint8_t, True), + SENSOR_TEMP: ("sensor_temp", t.uint32_t, True), + SENSOR_ATTR: (SENSOR_ATTR_NAME, t.LVBytes, True), } ) @@ -393,6 +400,166 @@ def _update_attribute(self, attrid, value): ) super()._update_attribute(attrid, value) + def aqaraHeader(self, counter: int, params: bytearray, action: int) -> bytearray: + """Create Aqara header for setting external sensor.""" + header = bytes([0xAA, 0x71, len(params) + 3, 0x44, counter]) + integrity = 512 - sum(header) + + return header + bytes([integrity, action, 0x41, len(params)]) + + def _float_to_hex(self, f): + """Convert float to hex.""" + return hex(struct.unpack(" list: + """Write attributes to device with internal 'attributes' validation.""" + sensor = bytearray.fromhex("00158d00019d1b98") + attrs = {} + + for attr, value in attributes.items(): + # implemented with help from https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/devices/xiaomi.js + + if attr == SENSOR_TEMP: + # set external sensor temp. this function expect value to be passed multiplied by 100 + temperatureBuf = bytearray.fromhex( + self._float_to_hex(round(float(value)))[2:] + ) + + params = sensor + params += bytes([0x00, 0x01, 0x00, 0x55]) + params += temperatureBuf + + attrs = {} + attrs[SENSOR_ATTR_NAME] = self.aqaraHeader(0x12, params, 0x05) + params + + elif attr == SENSOR: + # set internal/external temperature sensor + device = bytearray.fromhex( + ("%s" % (self.endpoint.device.ieee)).replace(":", "") + ) + timestamp = bytes(reversed(t.uint32_t(int(time.time())).serialize())) + + if value == 0: + # internal sensor + params1 = timestamp + params1 += bytes([0x3D, 0x05]) + params1 += device + params1 += bytes( + [ + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ] + ) + + params2 = timestamp + params2 += bytes([0x3D, 0x04]) + params2 += device + params2 += bytes( + [ + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ] + ) + + attrs1 = {} + attrs1[SENSOR_ATTR_NAME] = ( + self.aqaraHeader(0x12, params1, 0x04) + params1 + ) + attrs[SENSOR_ATTR_NAME] = ( + self.aqaraHeader(0x13, params2, 0x04) + params2 + ) + + result = await super().write_attributes(attrs1, manufacturer) + else: + # external sensor + params1 = timestamp + params1 += bytes([0x3D, 0x04]) + params1 += device + params1 += sensor + params1 += bytes([0x00, 0x01, 0x00, 0x55]) + params1 += bytes( + [ + 0x13, + 0x0A, + 0x02, + 0x00, + 0x00, + 0x64, + 0x04, + 0xCE, + 0xC2, + 0xB6, + 0xC8, + ] + ) + params1 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + params1 += bytes([0x64]) + params1 += bytes([0x65]) + + params2 = timestamp + params2 += bytes([0x3D, 0x05]) + params2 += device + params2 += sensor + params2 += bytes([0x08, 0x00, 0x07, 0xFD]) + params2 += bytes( + [ + 0x16, + 0x0A, + 0x02, + 0x0A, + 0xC9, + 0xE8, + 0xB1, + 0xB8, + 0xD4, + 0xDA, + 0xCF, + 0xDF, + 0xC0, + 0xEB, + ] + ) + params2 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + params2 += bytes([0x04]) + params2 += bytes([0x65]) + + attrs1 = {} + attrs1[SENSOR_ATTR_NAME] = ( + self.aqaraHeader(0x12, params1, 0x02) + params1 + ) + attrs[SENSOR_ATTR_NAME] = ( + self.aqaraHeader(0x13, params2, 0x02) + params2 + ) + + result = await super().write_attributes(attrs1, manufacturer) + else: + attrs[attr] = value + + result = await super().write_attributes(attrs, manufacturer) + return result + class AGL001(XiaomiCustomDevice): """Aqara E1 Radiator Thermostat (AGL001) Device."""