Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 77 additions & 12 deletions flux_led/aiodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@
ProtocolLEDENET8Byte,
ProtocolLEDENETAddressableA3,
ProtocolLEDENETAddressableChristmas,
ProtocolLEDENETExtendedCustom,
ProtocolLEDENETOriginal,
RemoteConfig,
)
from .scanner import FluxLEDDiscovery
from .timer import LedTimer
from .timer import LedTimer, LedTimerExtended
from .utils import color_temp_to_white_levels, rgbw_brightness, rgbww_brightness

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(
self._get_time_future: asyncio.Future[bool] | None = None
self._get_timers_lock: asyncio.Lock = asyncio.Lock()
self._get_timers_future: asyncio.Future[bool] | None = None
self._timers: list[LedTimer] | None = None
self._timers: list[LedTimer] | list[LedTimerExtended] | None = None
self._power_restore_future: asyncio.Future[bool] = loop.create_future()
self._device_config_lock: asyncio.Lock = asyncio.Lock()
self._device_config_future: asyncio.Future[bool] = loop.create_future()
Expand Down Expand Up @@ -421,6 +422,47 @@ async def async_set_custom_pattern(
self._generate_custom_patterm(rgb_list, speed, transition_type)
)

async def async_set_extended_custom_effect(
self,
pattern_id: int,
colors: list[tuple[int, int, int]],
speed: int = 50,
density: int = 50,
direction: int = 0x01,
option: int = 0x00,
) -> None:
"""Set an extended custom effect on the device.

Only supported on devices using the extended protocol (e.g., 0xB6).

Args:
pattern_id: Pattern ID (1-24 or 101-102). See ExtendedCustomEffectPattern.
colors: List of 1-8 RGB color tuples
speed: Animation speed 0-100 (default 50)
density: Pattern density 0-100 (default 50)
direction: Animation direction (0x01=L->R, 0x02=R->L)
option: Pattern-specific option (default 0)
"""
await self._async_send_msg(
self._generate_extended_custom_effect(
pattern_id, colors, speed, density, direction, option
)
)

async def async_set_custom_segment_colors(
self,
segments: list[tuple[int, int, int] | None],
) -> None:
"""Set custom colors for each segment on the device.

Only supported on devices using the extended protocol (e.g., 0xB6).
Sets static HSV colors for each of 20 segments on the light strip.

Args:
segments: List of up to 20 segment colors. Each is (R, G, B) or None for off.
"""
await self._async_send_msg(self._generate_custom_segment_colors(segments))

async def async_set_effect(
self, effect: str, speed: int, brightness: int = 100
) -> None:
Expand Down Expand Up @@ -634,7 +676,7 @@ async def async_get_time(self) -> datetime | None:
return None
return self._last_time

async def async_get_timers(self) -> list[LedTimer] | None:
async def async_get_timers(self) -> list[LedTimer] | list[LedTimerExtended] | None:
"""Get the timers."""
assert self._protocol is not None
if isinstance(self._protocol, ProtocolLEDENETOriginal):
Expand All @@ -651,10 +693,24 @@ async def async_get_timers(self) -> list[LedTimer] | None:
return None
return self._timers

async def async_set_timers(self, timer_list: list[LedTimer]) -> None:
async def async_set_timers(
self, timer_list: list[LedTimer] | list[LedTimerExtended]
) -> None:
"""Set the timers."""
assert self._protocol is not None
await self._async_send_msg(self._protocol.construct_set_timers(timer_list))
if isinstance(self._protocol, ProtocolLEDENETExtendedCustom):
# 0xB6 devices set timers one at a time
commands = self._protocol.construct_set_timers(timer_list) # type: ignore[arg-type]
for cmd in commands:
await self._async_send_msg(cmd)
else:
await self._async_send_msg(self._protocol.construct_set_timers(timer_list)) # type: ignore[arg-type]

async def async_set_timer(self, timer: LedTimerExtended) -> None:
"""Set a single timer (for 0xB6 devices)."""
assert self._protocol is not None
if isinstance(self._protocol, ProtocolLEDENETExtendedCustom):
await self._async_send_msg(self._protocol.construct_set_timer(timer))

async def async_set_time(self, time: datetime | None = None) -> None:
"""Set the current time."""
Expand Down Expand Up @@ -706,15 +762,15 @@ def _async_data_recieved(self, data: bytes) -> None:
)
self._async_process_message(msg)

def _async_process_state_response(self, msg: bytes) -> bool:
def _resolve_protocol_determination(self, msg: bytes) -> None:
"""Resolve pending protocol determination if applicable."""
if (
self._determine_protocol_future
and not self._determine_protocol_future.done()
):
assert self._protocol is not None
self._set_protocol_from_msg(msg, self._protocol.name)
self._determine_protocol_future.set_result(True)
return self.process_state_response(msg)

def _async_process_message(self, msg: bytes) -> None:
"""Process a full message (maybe reassembled)."""
Expand All @@ -725,13 +781,22 @@ def _async_process_message(self, msg: bytes) -> None:
if self._protocol.is_valid_outer_message(msg):
msg = self._protocol.extract_inner_message(msg)

if self._protocol.is_valid_state_response(msg):
self._last_message["state"] = msg
self._async_process_state_response(msg)
self._process_state_futures()
elif self._protocol.is_valid_extended_state_response(msg):
# Check for extended state BEFORE regular state to properly handle 0xEA responses
# Extended state format (0xEA 0x81) was introduced in PR #428 for 0x35 v10 devices
# Some devices (like 0xB6) ONLY respond with extended state format, so we check it first
if self._protocol.is_valid_extended_state_response(msg):
self._last_message["extended_state"] = msg
self._resolve_protocol_determination(msg)
self.process_extended_state_response(msg)
# Extended state includes both state and power state information
# so we need to resolve both types of futures
self._process_state_futures()
self._process_power_futures()
elif self._protocol.is_valid_state_response(msg):
self._last_message["state"] = msg
self._resolve_protocol_determination(msg)
self.process_state_response(msg)
self._process_state_futures()
elif self._protocol.is_valid_power_state_response(msg):
self._last_message["power_state"] = msg
self.process_power_state_response(msg)
Expand Down
106 changes: 104 additions & 2 deletions flux_led/base_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
STATE_RED,
STATE_WARM_WHITE,
STATIC_MODES,
ExtendedCustomEffectPattern,
WhiteChannelType,
)
from .models_db import (
BASE_MODE_MAP,
HARDWARE_MAP,
LEDENETHardware,
LEDENETModel,
extract_model_version_from_state,
get_model,
is_known_model,
)
Expand All @@ -72,6 +74,7 @@
EFFECT_LIST,
EFFECT_LIST_DIMMABLE,
EFFECT_LIST_LEGACY_CCT,
EXTENDED_CUSTOM_EFFECT_ID_NAME,
ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME,
ORIGINAL_ADDRESSABLE_EFFECT_NAME_ID,
PresetPattern,
Expand All @@ -90,6 +93,7 @@
PROTOCOL_LEDENET_ADDRESSABLE_CHRISTMAS,
PROTOCOL_LEDENET_CCT,
PROTOCOL_LEDENET_CCT_WRAPPED,
PROTOCOL_LEDENET_EXTENDED_CUSTOM,
PROTOCOL_LEDENET_ORIGINAL,
PROTOCOL_LEDENET_ORIGINAL_CCT,
PROTOCOL_LEDENET_ORIGINAL_RGBW,
Expand All @@ -110,6 +114,7 @@
ProtocolLEDENETAddressableChristmas,
ProtocolLEDENETCCT,
ProtocolLEDENETCCTWrapped,
ProtocolLEDENETExtendedCustom,
ProtocolLEDENETOriginal,
ProtocolLEDENETOriginalCCT,
ProtocolLEDENETOriginalRGBW,
Expand Down Expand Up @@ -143,6 +148,7 @@ class DeviceUnavailableException(RuntimeError):
ProtocolLEDENET9ByteAutoOn,
ProtocolLEDENET9ByteDimmableEffects,
ProtocolLEDENET25Byte,
ProtocolLEDENETExtendedCustom,
ProtocolLEDENETAddressableA1,
ProtocolLEDENETAddressableA2,
ProtocolLEDENETAddressableA3,
Expand Down Expand Up @@ -187,6 +193,7 @@ class DeviceUnavailableException(RuntimeError):
PROTOCOL_LEDENET_9BYTE_AUTO_ON: ProtocolLEDENET9ByteAutoOn,
PROTOCOL_LEDENET_9BYTE_DIMMABLE_EFFECTS: ProtocolLEDENET9ByteDimmableEffects,
PROTOCOL_LEDENET_25BYTE: ProtocolLEDENET25Byte,
PROTOCOL_LEDENET_EXTENDED_CUSTOM: ProtocolLEDENETExtendedCustom,
PROTOCOL_LEDENET_ADDRESSABLE_A3: ProtocolLEDENETAddressableA3,
PROTOCOL_LEDENET_ADDRESSABLE_A2: ProtocolLEDENETAddressableA2,
PROTOCOL_LEDENET_ADDRESSABLE_A1: ProtocolLEDENETAddressableA1,
Expand Down Expand Up @@ -638,6 +645,18 @@ def effect_list(self) -> list[str]:
return [*effects, EFFECT_RANDOM, EFFECT_MUSIC]
return [*effects, EFFECT_RANDOM]

@property
def supports_extended_custom_effects(self) -> bool:
"""Return True if device supports extended custom effects."""
return self.protocol == PROTOCOL_LEDENET_EXTENDED_CUSTOM

@property
def extended_custom_effect_pattern_list(self) -> list[str] | None:
"""Return available extended custom effect patterns, or None if not supported."""
if not self.supports_extended_custom_effects:
return None
return [p.name.lower().replace("_", " ") for p in ExtendedCustomEffectPattern]

@property
def effect(self) -> str | None:
"""Return the current effect."""
Expand All @@ -652,6 +671,16 @@ def _named_effect(self) -> str | None:
mode = self.raw_state.mode
pattern_code = self.preset_pattern_num
protocol = self.protocol
# Devices with extended custom effects use different pattern names
if self.supports_extended_custom_effects and pattern_code == 0x25:
return EXTENDED_CUSTOM_EFFECT_ID_NAME.get(mode)
# For 0xB6 segment mode: preset_pattern=0x24 and mode=0x00
if (
self.supports_extended_custom_effects
and pattern_code == 0x24
and mode == 0x00
):
return EXTENDED_CUSTOM_EFFECT_ID_NAME.get(mode) # Returns "Segments"
if protocol in OLD_EFFECTS_PROTOCOLS:
effect_id = (pattern_code << 8) + mode - 99
return ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME.get(effect_id)
Expand Down Expand Up @@ -1252,9 +1281,9 @@ def _set_protocol_from_msg(
full_msg: bytes,
fallback_protocol: str,
) -> None:
self._model_num = full_msg[1]
model_num, version_num = extract_model_version_from_state(full_msg)
self._model_num = model_num
self._model_data = get_model(self._model_num, fallback_protocol)
version_num = full_msg[10] if len(full_msg) > 10 else 1
self.setProtocol(self._model_data.protocol_for_version_num(version_num))

def _generate_preset_pattern(
Expand Down Expand Up @@ -1296,6 +1325,79 @@ def _generate_custom_patterm(
assert self._protocol is not None
return self._protocol.construct_custom_effect(rgb_list, speed, transition_type)

def _generate_extended_custom_effect(
self,
pattern_id: int,
colors: list[tuple[int, int, int]],
speed: int = 50,
density: int = 50,
direction: int = 0x01,
option: int = 0x00,
) -> bytearray:
"""Generate the extended custom effect protocol bytes with validation.

Only supported on devices using the extended protocol (e.g., 0xB6).
"""
# Validate pattern_id
valid_ids = set(range(1, 25)) | {101, 102}
if pattern_id not in valid_ids:
raise ValueError(f"Pattern ID must be 1-24 or 101-102, got {pattern_id}")

# Truncate if more than 8 colors
if len(colors) > 8:
_LOGGER.warning(
"Too many colors in %s, truncating list to %s", len(colors), 8
)
colors = colors[:8]

# Require at least one color
if len(colors) == 0:
raise ValueError("Surplife pattern requires at least one color")

# Validate color tuples
for idx, color in enumerate(colors):
if len(color) != 3:
raise ValueError(f"Color {idx} must be (R, G, B) tuple")
for c in color:
if not 0 <= c <= 255:
raise ValueError(f"Color values must be 0-255, got {c}")

assert self._protocol is not None
assert isinstance(self._protocol, ProtocolLEDENETExtendedCustom)
return self._protocol.construct_extended_custom_effect(
pattern_id, colors, speed, density, direction, option
)

def _generate_custom_segment_colors(
self,
segments: list[tuple[int, int, int] | None],
) -> bytearray:
"""Generate custom segment colors protocol bytes with validation.

Only supported on devices using the extended protocol (e.g., 0xB6).

Args:
segments: List of up to 20 segment colors. Each is (R, G, B) or None for off.
"""
# Truncate if more than 20 segments
if len(segments) > 20:
_LOGGER.warning("Too many segments (%s), truncating to 20", len(segments))
segments = segments[:20]

# Validate color tuples
for idx, color in enumerate(segments):
if color is None:
continue
if len(color) != 3:
raise ValueError(f"Segment {idx} must be (R, G, B) tuple or None")
for c in color:
if not 0 <= c <= 255:
raise ValueError(f"Color values must be 0-255, got {c}")

assert self._protocol is not None
assert isinstance(self._protocol, ProtocolLEDENETExtendedCustom)
return self._protocol.construct_custom_segment_colors(segments)

def _effect_to_pattern(self, effect: str) -> int:
"""Convert an effect to a pattern code."""
protocol = self.protocol
Expand Down
Loading