Skip to content
Open
16 changes: 14 additions & 2 deletions examples/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

_LOGGER = logging.getLogger(__name__)

ADDRESS = "D0291B39-3A1B-7FF2-787B-4E743FED5B25"
ADDRESS = "D0291B39-3A1B-7FF2-787B-4E743FED5B25"
ADDRESS = "BE:27:E1:00:10:63" # Hello Fairy-1063PPPP


async def run() -> None:
Expand All @@ -34,20 +33,33 @@ def on_state_changed(state: LEDBLEState) -> None:
device = await future
led = LEDBLE(device)
cancel_callback = led.register_callback(on_state_changed)
_LOGGER.info("update...")
await led.update()
_LOGGER.info("turn_on...")
await led.turn_on()
_LOGGER.info("set_rgb(red)...")
await led.set_rgb((255, 0, 0), 255)
await asyncio.sleep(1)
_LOGGER.info("set_rgb(green)...")
await led.set_rgb((0, 255, 0), 128)
await asyncio.sleep(1)
_LOGGER.info("set_rgb(blue)...")
await led.set_rgb((0, 0, 255), 255)
await asyncio.sleep(1)
_LOGGER.info("set_rgbw(white)...")
await led.set_rgbw((255, 255, 255, 128), 255)
await asyncio.sleep(1)
_LOGGER.info("set_preset_pattern(1)...")
await led.async_set_preset_pattern(1, 100, 100)
await asyncio.sleep(2)
_LOGGER.info("turn_off...")
await led.turn_off()
_LOGGER.info("update...")
await led.update()
_LOGGER.info("finish...")
cancel_callback()
await scanner.stop()
_LOGGER.info("done")


logging.basicConfig(level=logging.INFO)
Expand Down
9 changes: 7 additions & 2 deletions src/led_ble/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ class CharacteristicMissingError(Exception):
"""Raised when a characteristic is missing."""


HELLO_FAIRY_WRITE_CHARACTERISTIC = "49535343-8841-43f4-a8d4-ecbe34729bb3"
HELLO_FAIRY_READ_CHARACTERISTIC = "49535343-1e4d-4bd9-ba61-23c647249616"

POSSIBLE_WRITE_CHARACTERISTIC_UUIDS = [
BASE_UUID_FORMAT.format(part) for part in ["ff01", "ffd5", "ffd9", "ffe5", "ffe9"]
]
] + [HELLO_FAIRY_WRITE_CHARACTERISTIC]

POSSIBLE_READ_CHARACTERISTIC_UUIDS = [
BASE_UUID_FORMAT.format(part) for part in ["ff02", "ffd0", "ffd4", "ffe0", "ffe4"]
]
] + [HELLO_FAIRY_READ_CHARACTERISTIC]


QUERY_STATE_BYTES = bytearray([0xEF, 0x01, 0x77])
98 changes: 75 additions & 23 deletions src/led_ble/led_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from flux_led.utils import rgbw_brightness

from led_ble.model_db import LEDBLEModel
from led_ble.protocol import ProtocolFairy

from .const import (
HELLO_FAIRY_READ_CHARACTERISTIC,
POSSIBLE_READ_CHARACTERISTIC_UUIDS,
POSSIBLE_WRITE_CHARACTERISTIC_UUIDS,
STATE_COMMAND,
Expand Down Expand Up @@ -279,7 +281,8 @@ def _generate_preset_pattern(
brightness = int(brightness * 255 / 100)
speed = int(speed * 255 / 100)
return bytearray([0x9E, 0x00, pattern, speed, brightness, 0x00, 0xE9])
PresetPattern.valid_or_raise(pattern)
if not self._is_hello_fairy():
PresetPattern.valid_or_raise(pattern)
if not (1 <= brightness <= 100):
raise ValueError("Brightness must be between 1 and 100")
assert self._protocol is not None # nosec
Expand Down Expand Up @@ -445,29 +448,65 @@ def _named_effect(self) -> str | None:
"""Returns the named effect."""
return EFFECT_ID_NAME.get(self.preset_pattern_num)

# ideally replace with classes to encapsulate the differences between device makes
def _is_hello_fairy(self) -> bool:
if self._read_char is None:
return False
d = self._read_char.descriptors
c = d[0].characteristic_uuid if (len(d) > 0) else None
return c == HELLO_FAIRY_READ_CHARACTERISTIC

def _notification_handler(self, _sender: int, data: bytearray) -> None:
"""Handle notification responses."""
_LOGGER.debug("%s: Notification received: %s", self.name, data.hex())

if len(data) == 4 and data[0] == 0xCC:
on = data[1] == 0x23
self._state = replace(self._state, power=on)
return
if len(data) < 11:
return
model_num = data[1]
on = data[2] == 0x23
preset_pattern = data[3]
mode = data[4]
speed = data[5]
r = data[6]
g = data[7]
b = data[8]
w = data[9]
version = data[10]
self._state = LEDBLEState(
on, (r, g, b), w, model_num, preset_pattern, mode, speed, version
)
model_num = 0
version = 0
if self._is_hello_fairy():
if data[0] == 0xAA:
if data[1] == 0x00: # hw info
if len(data) > 7:
version_string = data[3:8].decode("ascii")
_LOGGER.debug("version %s", version_string)
self._state = replace(
self._state,
version_num=(data[3] - 48) * 100
+ (data[5] - 48) * 10
+ (data[7] - 48),
)
if len(data) > 12:
model = data[8:13].decode("ascii")
_LOGGER.debug("model %s", model)
if len(data) > 24:
lights = data[24] # guessing
_LOGGER.debug("lights %d", lights)
if len(data) > 33:
effects = data[33] # guessing
_LOGGER.debug("effects %d", effects)

if data[1] == 0x01: # state info
if len(data) > 6:
self._state = replace(self._state, power=data[6] > 0)
else:
if len(data) == 4 and data[0] == 0xCC:
on = data[1] == 0x23
self._state = replace(self._state, power=on)
return
if len(data) < 11:
return
model_num = data[1]
on = data[2] == 0x23
preset_pattern = data[3]
mode = data[4]
speed = data[5]
r = data[6]
g = data[7]
b = data[8]
w = data[9]
version = data[10]
self._state = LEDBLEState(
on, (r, g, b), w, model_num, preset_pattern, mode, speed, version
)

_LOGGER.debug(
"%s: Notification received; RSSI: %s: %s %s",
Expand All @@ -481,7 +520,6 @@ def _notification_handler(self, _sender: int, data: bytearray) -> None:
self._resolve_protocol_event.set()
self._model_data = get_model(model_num)
self._set_protocol(self._model_data.protocol_for_version_num(version))

self._fire_callbacks()

def _reset_disconnect_timer(self) -> None:
Expand Down Expand Up @@ -636,18 +674,32 @@ def _resolve_characteristics(self, services: BleakGATTServiceCollection) -> bool
if char := services.get_characteristic(characteristic):
self._write_char = char
break
_LOGGER.debug(
"using characteristic %s for read, characteristic %s for write",
self._read_char,
self._write_char,
)
return bool(self._read_char and self._write_char)

async def _resolve_protocol(self) -> None:
"""Resolve protocol."""
if self._resolve_protocol_event.is_set():
return
await self._send_command_while_connected([STATE_COMMAND])
if self._is_hello_fairy():
await self._send_command_while_connected(
[b"\xaa\x00\x00\xaa"]
) # get version and capabilities
else:
await self._send_command_while_connected([STATE_COMMAND])
Comment on lines -645 to +693
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new method to the protocol to get the state command instead

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that method is implemented but the protocol is not resolved yet at this point

async with asyncio_timeout(10):
await self._resolve_protocol_event.wait()

def _set_protocol(self, protocol: str) -> None:
cls = PROTOCOL_NAME_TO_CLS.get(protocol)
cls = (
ProtocolFairy
if self._is_hello_fairy()
else PROTOCOL_NAME_TO_CLS.get(protocol)
) # flux-led
if cls is None:
raise ValueError(f"Invalid protocol: {protocol}")
self._protocol = cls()
9 changes: 9 additions & 0 deletions src/led_ble/model_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ def protocol_for_version_num(self, version_num: int) -> str:


MODELS = [
LEDBLEModel(
model_num=0x00,
models=["Hello Fairy:BMSL6"],
description="Controller RGB",
protocols=[
MinVersionProtocol(0, "Fairy"),
],
color_modes=COLOR_MODES_RGB_W, # Formerly rgbwcapable
),
LEDBLEModel(
model_num=0x04,
models=["Triones:C10511000166"],
Expand Down
130 changes: 130 additions & 0 deletions src/led_ble/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from __future__ import annotations
import colorsys
from math import floor
from flux_led.protocol import ProtocolBase
from led_ble.led_ble import LevelWriteMode


class ProtocolFairy(ProtocolBase):
"""Protocol for Hello Fairy devices."""

@property
def name(self) -> str:
"""The name of the protocol."""
return "Fairy"

def construct_state_query(self) -> bytearray:
"""The bytes to send for a query request."""
return self.construct_message(bytearray([0xAA, 0x01, 0x00]))

def construct_state_change(self, turn_on: int) -> bytearray:
"""The bytes to send for a state change request."""
return self.construct_message(
bytearray([0xAA, 0x02, 0x01, 1 if turn_on else 0])
)

def construct_pause(self, pause: bool) -> bytearray:
"""The bytes to send for pausing or unpausing."""
return self.construct_message(bytearray([0xAA, 0x11, 0x01, 1 if pause else 0]))

def construct_ir_state(self, turn_on: bool) -> bytearray:
"""The bytes to send for enabling/disabling the IR remote."""
return self.construct_message(
bytearray([0xAA, 0x0F, 0x01, 1 if turn_on else 0])
)

def construct_levels_change(
self,
persist: int,
red: int | None,
green: int | None,
blue: int | None,
warm_white: int | None,
cool_white: int | None,
write_mode: LevelWriteMode,
) -> list[bytearray]:
"""The bytes to send for a level change request."""
h, s, v = colorsys.rgb_to_hsv(
(red or 0) / 255, (green or 0) / 255, (blue or 0) / 255
)
h_scaled = min(359, floor(h * 360))
s_scaled = round(s * 1000)
v_scaled = round(v * 1000)
return [
self.construct_message(
bytearray(
[
0xAA,
0x03,
0x07,
0x01,
h_scaled >> 8,
h_scaled & 0xFF,
s_scaled >> 8,
s_scaled & 0xFF,
v_scaled >> 8,
v_scaled & 0xFF,
]
)
)
]

def construct_preset_pattern(
self, pattern: int, speed: int, brightness: int
) -> list[bytearray]:
"""The bytes to send for a preset pattern."""
return [
self.construct_message(
bytearray(
[
0xAA,
0x03,
0x04,
0x02,
pattern & 0xFF,
(brightness >> 8) & 0xFF,
brightness & 0xFF,
]
)
),
self.construct_message(bytearray([0xAA, 0x0C, 0x01, min(speed, 100)])),
]

def construct_custom_effect(
self, rgb_list: list[tuple[int, int, int]], speed: int, transition_type: str
) -> list[bytearray]:
"""The bytes to send for a custom effect."""
data_bytes = len(rgb_list) * 3 + 1
hue_message = bytearray(data_bytes + 3)
hue_message[0:4] = [0xAA, 0xDA, data_bytes, 0x01]
for [i, [r, g, b]] in enumerate(rgb_list):
h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255)
if v < 0.25:
h = 0xFE # black
elif s < 0.25:
h = 0xFF # white
else:
h = floor(h * 0xAF)
# necessary to satisfy both flake and ruff:
a = i * 3 + 4
b = a + 3
hue_message[a:b] = [i >> 8, i & 0xFF, h]
return [
*self.construct_motion(speed, 0),
self.construct_message(hue_message),
*self.construct_motion(speed, 2),
]

def construct_motion(self, speed: int, transition: int) -> list[bytearray]:
"""The bytes to send for motion speed and transition."""
return [
self.construct_message(
bytearray([0xAA, 0xD0, 0x04, transition, 0x64, speed, 0x01])
)
]

def construct_message(self, raw_bytes: bytearray) -> bytearray:
"""Calculate checksum of byte array and add to end."""
csum = sum(raw_bytes) & 0xFF
raw_bytes.append(csum)
return raw_bytes