-
-
Notifications
You must be signed in to change notification settings - Fork 13
feat: basic support for hello fairy #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
c66f0fb
b805a31
1e230ba
2b87b74
bfb1df8
6954b7b
755ae9c
599d208
2f13944
fb768b0
6c6391f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,8 +25,10 @@ | |
| from flux_led.utils import rgbw_brightness | ||
|
|
||
| from led_ble.model_db import LEDBLEModel | ||
| from led_ble.protocol import ProtocolFairy | ||
|
|
||
| from .const import ( | ||
| HELLO_FAIRY_READ_CHARACTERISTIC, | ||
| POSSIBLE_READ_CHARACTERISTIC_UUIDS, | ||
| POSSIBLE_WRITE_CHARACTERISTIC_UUIDS, | ||
| STATE_COMMAND, | ||
|
|
@@ -279,10 +281,14 @@ def _generate_preset_pattern( | |
| brightness = int(brightness * 255 / 100) | ||
| speed = int(speed * 255 / 100) | ||
| return bytearray([0x9E, 0x00, pattern, speed, brightness, 0x00, 0xE9]) | ||
| PresetPattern.valid_or_raise(pattern) | ||
| if not self._is_hello_fairy(): | ||
| PresetPattern.valid_or_raise(pattern) | ||
| if not (1 <= brightness <= 100): | ||
| raise ValueError("Brightness must be between 1 and 100") | ||
| assert self._protocol is not None # nosec | ||
| if self._is_hello_fairy() and pattern > 58: | ||
| rgb = [[255, 0, 0], [0, 255, 0], [0, 0, 255]] * 8 + [[255, 0, 0]] | ||
| return self._protocol.construct_custom_effect(rgb, speed, "") | ||
jr4 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return self._protocol.construct_preset_pattern(pattern, speed, brightness) | ||
|
|
||
| async def async_set_preset_pattern( | ||
|
|
@@ -431,29 +437,64 @@ def _named_effect(self) -> str | None: | |
| """Returns the named effect.""" | ||
| return EFFECT_ID_NAME.get(self.preset_pattern_num) | ||
|
|
||
| # ideally replace with classes to encapsulate the differences between device makes | ||
| def _is_hello_fairy(self) -> bool: | ||
| if self._read_char is None: | ||
| return False | ||
| d = self._read_char.descriptors | ||
| c = d[0].characteristic_uuid if (len(d) > 0) else None | ||
| return c == HELLO_FAIRY_READ_CHARACTERISTIC | ||
|
|
||
| def _notification_handler(self, _sender: int, data: bytearray) -> None: | ||
| """Handle notification responses.""" | ||
| _LOGGER.debug("%s: Notification received: %s", self.name, data.hex()) | ||
|
|
||
| if len(data) == 4 and data[0] == 0xCC: | ||
| on = data[1] == 0x23 | ||
| self._state = replace(self._state, power=on) | ||
| return | ||
| if len(data) < 11: | ||
| return | ||
| model_num = data[1] | ||
| on = data[2] == 0x23 | ||
| preset_pattern = data[3] | ||
| mode = data[4] | ||
| speed = data[5] | ||
| r = data[6] | ||
| g = data[7] | ||
| b = data[8] | ||
| w = data[9] | ||
| version = data[10] | ||
| self._state = LEDBLEState( | ||
| on, (r, g, b), w, model_num, preset_pattern, mode, speed, version | ||
| ) | ||
| model_num = 0 | ||
| if self._is_hello_fairy(): | ||
| if data[0] == 0xAA: | ||
| if data[1] == 0x00: # hw info | ||
| if len(data) > 7: | ||
| version_string = data[3:8].decode("ascii") | ||
| _LOGGER.debug("version %s", version_string) | ||
| self._state = replace( | ||
| self._state, | ||
| version_num=(data[3] - 48) * 100 | ||
| + (data[5] - 48) * 10 | ||
| + (data[7] - 48), | ||
| ) | ||
| if len(data) > 12: | ||
| model = data[8:13].decode("ascii") | ||
| _LOGGER.debug("model %s", model) | ||
| if len(data) > 24: | ||
| lights = data[24] # guessing | ||
| _LOGGER.debug("lights %d", lights) | ||
| if len(data) > 33: | ||
| effects = data[33] # guessing | ||
| _LOGGER.debug("effects %d", effects) | ||
|
|
||
| if data[1] == 0x01: # state info | ||
| if len(data) > 6: | ||
| self._state = replace(self._state, power=data[6] > 0) | ||
| else: | ||
| if len(data) == 4 and data[0] == 0xCC: | ||
| on = data[1] == 0x23 | ||
| self._state = replace(self._state, power=on) | ||
| return | ||
| if len(data) < 11: | ||
| return | ||
| model_num = data[1] | ||
| on = data[2] == 0x23 | ||
| preset_pattern = data[3] | ||
| mode = data[4] | ||
| speed = data[5] | ||
| r = data[6] | ||
| g = data[7] | ||
| b = data[8] | ||
| w = data[9] | ||
| version = data[10] | ||
| self._state = LEDBLEState( | ||
| on, (r, g, b), w, model_num, preset_pattern, mode, speed, version | ||
| ) | ||
|
|
||
|
Comment on lines
463
to
510
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be better to abstract this into a new method in the protocol so each protocol can handle this different
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. We don't control the ProtocolBase class though, it comes from flux_led. One option would be to make a device class to encapsulate the differences between the flux_led devices and the hello fairy devices. This would include the (send) protocol, this notification_handler, and (to your other comments), the status command, and protocol_for_version_num.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its no problem to merge a PR to flux_led
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cleaned up flux_led for Python 3.9+ and released 1.1.0
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you want the new protocol method to return LEDBLEState (so move that class into flux-led)? or a generic dict? |
||
| _LOGGER.debug( | ||
| "%s: Notification received; RSSI: %s: %s %s", | ||
|
|
@@ -466,8 +507,10 @@ def _notification_handler(self, _sender: int, data: bytearray) -> None: | |
| if not self._resolve_protocol_event.is_set(): | ||
| self._resolve_protocol_event.set() | ||
| self._model_data = get_model(model_num) | ||
| self._set_protocol(self._model_data.protocol_for_version_num(version)) | ||
|
|
||
| if self._is_hello_fairy(): | ||
| self._protocol = ProtocolFairy() | ||
| else: | ||
| self._set_protocol(self._model_data.protocol_for_version_num(version)) | ||
jr4 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self._fire_callbacks() | ||
|
|
||
| def _reset_disconnect_timer(self) -> None: | ||
|
|
@@ -622,13 +665,23 @@ def _resolve_characteristics(self, services: BleakGATTServiceCollection) -> bool | |
| if char := services.get_characteristic(characteristic): | ||
| self._write_char = char | ||
| break | ||
| _LOGGER.debug( | ||
| "using characteristic %s for read, characteristic %s for write", | ||
| self._read_char, | ||
| self._write_char, | ||
| ) | ||
| return bool(self._read_char and self._write_char) | ||
|
|
||
| async def _resolve_protocol(self) -> None: | ||
| """Resolve protocol.""" | ||
| if self._resolve_protocol_event.is_set(): | ||
| return | ||
| await self._send_command_while_connected([STATE_COMMAND]) | ||
| if self._is_hello_fairy(): | ||
| await self._send_command_while_connected( | ||
| [b"\xaa\x00\x00\xaa"] | ||
| ) # get version and capabilities | ||
| else: | ||
| await self._send_command_while_connected([STATE_COMMAND]) | ||
|
Comment on lines
-645
to
+693
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a new method to the protocol to get the state command instead
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that method is implemented but the protocol is not resolved yet at this point |
||
| async with asyncio_timeout(10): | ||
| await self._resolve_protocol_event.wait() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| import colorsys | ||
| from math import floor | ||
| from flux_led.protocol import ProtocolBase | ||
| from led_ble.led_ble import LevelWriteMode | ||
|
|
||
|
|
||
| class ProtocolFairy(ProtocolBase): | ||
| """Protocol for Hello Fairy devices.""" | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The name of the protocol.""" | ||
| return "Fairy" | ||
|
|
||
| def construct_state_query(self) -> bytearray: | ||
| """The bytes to send for a query request.""" | ||
| return self.construct_message(bytearray([0xAA, 0x01, 0x00])) | ||
|
|
||
| def construct_state_change(self, turn_on: int) -> bytearray: | ||
| """The bytes to send for a state change request.""" | ||
| return self.construct_message( | ||
| bytearray([0xAA, 0x02, 0x01, 1 if turn_on else 0]) | ||
| ) | ||
|
|
||
| def construct_message(self, raw_bytes: bytearray) -> bytearray: | ||
| """Calculate checksum of byte array and add to end.""" | ||
| csum = sum(raw_bytes) & 0xFF | ||
| raw_bytes.append(csum) | ||
| return raw_bytes | ||
|
|
||
| def construct_levels_change( | ||
| self, | ||
| persist: int, | ||
| red: int | None, | ||
| green: int | None, | ||
| blue: int | None, | ||
| warm_white: int | None, | ||
| cool_white: int | None, | ||
| write_mode: LevelWriteMode, | ||
| ) -> list[bytearray]: | ||
| """The bytes to send for a level change request.""" | ||
| h, s, v = colorsys.rgb_to_hsv( | ||
| (red or 0) / 255, (green or 0) / 255, (blue or 0) / 255 | ||
| ) | ||
| h_scaled = min(359, floor(h * 360)) | ||
| s_scaled = round(s * 1000) | ||
| v_scaled = round(v * 1000) | ||
| return [ | ||
| self.construct_message( | ||
| bytearray( | ||
| [ | ||
| 0xAA, | ||
| 0x03, | ||
| 0x07, | ||
| 0x01, | ||
| h_scaled >> 8, | ||
| h_scaled & 0xFF, | ||
| s_scaled >> 8, | ||
| s_scaled & 0xFF, | ||
| v_scaled >> 8, | ||
| v_scaled & 0xFF, | ||
| ] | ||
| ) | ||
| ) | ||
| ] | ||
|
|
||
| def construct_preset_pattern( | ||
| self, pattern: int, speed: int, brightness: int | ||
| ) -> list[bytearray]: | ||
| """The bytes to send for a preset pattern.""" | ||
| return [ | ||
| self.construct_message( | ||
| bytearray( | ||
| [ | ||
| 0xAA, | ||
| 0x03, | ||
| 0x04, | ||
| 0x02, | ||
| pattern & 0xFF, | ||
| (brightness >> 8) & 0xFF, | ||
| brightness & 0xFF, | ||
| ] | ||
| ) | ||
| ), | ||
| self.construct_message(bytearray([0xAA, 0x0C, 0x01, min(speed, 100)])), | ||
| ] | ||
|
|
||
| def construct_custom_effect( | ||
| self, rgb_list: list[tuple[int, int, int]], speed: int, transition_type: str | ||
| ) -> list[bytearray]: | ||
| """The bytes to send for a custom effect.""" | ||
| data_bytes = len(rgb_list) * 3 + 1 | ||
| hue_message = bytearray(data_bytes + 3) | ||
| hue_message[0:4] = [0xAA, 0xDA, data_bytes, 0x01] | ||
| for [i, [r, g, b]] in enumerate(rgb_list): | ||
| h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) | ||
| if v < 0.25: | ||
| h = 0xFE # black | ||
| elif s < 0.25: | ||
| h = 0xFF # white | ||
| else: | ||
| h = floor(h * 0xAF) | ||
| # necessary to satisfy both flake and ruff: | ||
| a = i * 3 + 4 | ||
| b = a + 3 | ||
| hue_message[a:b] = [i >> 8, i & 0xFF, h] | ||
| return [ | ||
| *self.construct_motion(speed, 0), | ||
| self.construct_message(hue_message), | ||
| *self.construct_motion(speed, 2), | ||
| ] | ||
|
|
||
| def construct_motion(self, speed: int, transition: int) -> list[bytearray]: | ||
| """The bytes to send for motion speed and transition.""" | ||
| return [ | ||
| self.construct_message( | ||
| bytearray([0xAA, 0xD0, 0x04, transition, 0x64, speed, 0x01]) | ||
| ) | ||
| ] |
Uh oh!
There was an error while loading. Please reload this page.