diff --git a/flux_led/aiodevice.py b/flux_led/aiodevice.py index 98139874..acafe9b7 100644 --- a/flux_led/aiodevice.py +++ b/flux_led/aiodevice.py @@ -45,11 +45,12 @@ ProtocolLEDENET8Byte, ProtocolLEDENETAddressableA3, ProtocolLEDENETAddressableChristmas, + ProtocolLEDENETExtendedCustom, ProtocolLEDENETOriginal, RemoteConfig, ) from .scanner import FluxLEDDiscovery -from .timer import LedTimer +from .timer import LedTimer, LedTimerExtended from .utils import color_temp_to_white_levels, rgbw_brightness, rgbww_brightness _LOGGER = logging.getLogger(__name__) @@ -83,7 +84,7 @@ def __init__( self._get_time_future: asyncio.Future[bool] | None = None self._get_timers_lock: asyncio.Lock = asyncio.Lock() self._get_timers_future: asyncio.Future[bool] | None = None - self._timers: list[LedTimer] | None = None + self._timers: list[LedTimer] | list[LedTimerExtended] | None = None self._power_restore_future: asyncio.Future[bool] = loop.create_future() self._device_config_lock: asyncio.Lock = asyncio.Lock() self._device_config_future: asyncio.Future[bool] = loop.create_future() @@ -421,6 +422,47 @@ async def async_set_custom_pattern( self._generate_custom_patterm(rgb_list, speed, transition_type) ) + async def async_set_extended_custom_effect( + self, + pattern_id: int, + colors: list[tuple[int, int, int]], + speed: int = 50, + density: int = 50, + direction: int = 0x01, + option: int = 0x00, + ) -> None: + """Set an extended custom effect on the device. + + Only supported on devices using the extended protocol (e.g., 0xB6). + + Args: + pattern_id: Pattern ID (1-24 or 101-102). See ExtendedCustomEffectPattern. + colors: List of 1-8 RGB color tuples + speed: Animation speed 0-100 (default 50) + density: Pattern density 0-100 (default 50) + direction: Animation direction (0x01=L->R, 0x02=R->L) + option: Pattern-specific option (default 0) + """ + await self._async_send_msg( + self._generate_extended_custom_effect( + pattern_id, colors, speed, density, direction, option + ) + ) + + async def async_set_custom_segment_colors( + self, + segments: list[tuple[int, int, int] | None], + ) -> None: + """Set custom colors for each segment on the device. + + Only supported on devices using the extended protocol (e.g., 0xB6). + Sets static HSV colors for each of 20 segments on the light strip. + + Args: + segments: List of up to 20 segment colors. Each is (R, G, B) or None for off. + """ + await self._async_send_msg(self._generate_custom_segment_colors(segments)) + async def async_set_effect( self, effect: str, speed: int, brightness: int = 100 ) -> None: @@ -634,7 +676,7 @@ async def async_get_time(self) -> datetime | None: return None return self._last_time - async def async_get_timers(self) -> list[LedTimer] | None: + async def async_get_timers(self) -> list[LedTimer] | list[LedTimerExtended] | None: """Get the timers.""" assert self._protocol is not None if isinstance(self._protocol, ProtocolLEDENETOriginal): @@ -651,10 +693,24 @@ async def async_get_timers(self) -> list[LedTimer] | None: return None return self._timers - async def async_set_timers(self, timer_list: list[LedTimer]) -> None: + async def async_set_timers( + self, timer_list: list[LedTimer] | list[LedTimerExtended] + ) -> None: """Set the timers.""" assert self._protocol is not None - await self._async_send_msg(self._protocol.construct_set_timers(timer_list)) + if isinstance(self._protocol, ProtocolLEDENETExtendedCustom): + # 0xB6 devices set timers one at a time + commands = self._protocol.construct_set_timers(timer_list) # type: ignore[arg-type] + for cmd in commands: + await self._async_send_msg(cmd) + else: + await self._async_send_msg(self._protocol.construct_set_timers(timer_list)) # type: ignore[arg-type] + + async def async_set_timer(self, timer: LedTimerExtended) -> None: + """Set a single timer (for 0xB6 devices).""" + assert self._protocol is not None + if isinstance(self._protocol, ProtocolLEDENETExtendedCustom): + await self._async_send_msg(self._protocol.construct_set_timer(timer)) async def async_set_time(self, time: datetime | None = None) -> None: """Set the current time.""" @@ -706,7 +762,8 @@ def _async_data_recieved(self, data: bytes) -> None: ) self._async_process_message(msg) - def _async_process_state_response(self, msg: bytes) -> bool: + def _resolve_protocol_determination(self, msg: bytes) -> None: + """Resolve pending protocol determination if applicable.""" if ( self._determine_protocol_future and not self._determine_protocol_future.done() @@ -714,7 +771,6 @@ def _async_process_state_response(self, msg: bytes) -> bool: assert self._protocol is not None self._set_protocol_from_msg(msg, self._protocol.name) self._determine_protocol_future.set_result(True) - return self.process_state_response(msg) def _async_process_message(self, msg: bytes) -> None: """Process a full message (maybe reassembled).""" @@ -725,13 +781,22 @@ def _async_process_message(self, msg: bytes) -> None: if self._protocol.is_valid_outer_message(msg): msg = self._protocol.extract_inner_message(msg) - if self._protocol.is_valid_state_response(msg): - self._last_message["state"] = msg - self._async_process_state_response(msg) - self._process_state_futures() - elif self._protocol.is_valid_extended_state_response(msg): + # Check for extended state BEFORE regular state to properly handle 0xEA responses + # Extended state format (0xEA 0x81) was introduced in PR #428 for 0x35 v10 devices + # Some devices (like 0xB6) ONLY respond with extended state format, so we check it first + if self._protocol.is_valid_extended_state_response(msg): self._last_message["extended_state"] = msg + self._resolve_protocol_determination(msg) self.process_extended_state_response(msg) + # Extended state includes both state and power state information + # so we need to resolve both types of futures + self._process_state_futures() + self._process_power_futures() + elif self._protocol.is_valid_state_response(msg): + self._last_message["state"] = msg + self._resolve_protocol_determination(msg) + self.process_state_response(msg) + self._process_state_futures() elif self._protocol.is_valid_power_state_response(msg): self._last_message["power_state"] = msg self.process_power_state_response(msg) diff --git a/flux_led/base_device.py b/flux_led/base_device.py index e4072a94..edfc4195 100644 --- a/flux_led/base_device.py +++ b/flux_led/base_device.py @@ -49,6 +49,7 @@ STATE_RED, STATE_WARM_WHITE, STATIC_MODES, + ExtendedCustomEffectPattern, WhiteChannelType, ) from .models_db import ( @@ -56,6 +57,7 @@ HARDWARE_MAP, LEDENETHardware, LEDENETModel, + extract_model_version_from_state, get_model, is_known_model, ) @@ -72,6 +74,7 @@ EFFECT_LIST, EFFECT_LIST_DIMMABLE, EFFECT_LIST_LEGACY_CCT, + EXTENDED_CUSTOM_EFFECT_ID_NAME, ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME, ORIGINAL_ADDRESSABLE_EFFECT_NAME_ID, PresetPattern, @@ -90,6 +93,7 @@ PROTOCOL_LEDENET_ADDRESSABLE_CHRISTMAS, PROTOCOL_LEDENET_CCT, PROTOCOL_LEDENET_CCT_WRAPPED, + PROTOCOL_LEDENET_EXTENDED_CUSTOM, PROTOCOL_LEDENET_ORIGINAL, PROTOCOL_LEDENET_ORIGINAL_CCT, PROTOCOL_LEDENET_ORIGINAL_RGBW, @@ -110,6 +114,7 @@ ProtocolLEDENETAddressableChristmas, ProtocolLEDENETCCT, ProtocolLEDENETCCTWrapped, + ProtocolLEDENETExtendedCustom, ProtocolLEDENETOriginal, ProtocolLEDENETOriginalCCT, ProtocolLEDENETOriginalRGBW, @@ -143,6 +148,7 @@ class DeviceUnavailableException(RuntimeError): ProtocolLEDENET9ByteAutoOn, ProtocolLEDENET9ByteDimmableEffects, ProtocolLEDENET25Byte, + ProtocolLEDENETExtendedCustom, ProtocolLEDENETAddressableA1, ProtocolLEDENETAddressableA2, ProtocolLEDENETAddressableA3, @@ -187,6 +193,7 @@ class DeviceUnavailableException(RuntimeError): PROTOCOL_LEDENET_9BYTE_AUTO_ON: ProtocolLEDENET9ByteAutoOn, PROTOCOL_LEDENET_9BYTE_DIMMABLE_EFFECTS: ProtocolLEDENET9ByteDimmableEffects, PROTOCOL_LEDENET_25BYTE: ProtocolLEDENET25Byte, + PROTOCOL_LEDENET_EXTENDED_CUSTOM: ProtocolLEDENETExtendedCustom, PROTOCOL_LEDENET_ADDRESSABLE_A3: ProtocolLEDENETAddressableA3, PROTOCOL_LEDENET_ADDRESSABLE_A2: ProtocolLEDENETAddressableA2, PROTOCOL_LEDENET_ADDRESSABLE_A1: ProtocolLEDENETAddressableA1, @@ -638,6 +645,18 @@ def effect_list(self) -> list[str]: return [*effects, EFFECT_RANDOM, EFFECT_MUSIC] return [*effects, EFFECT_RANDOM] + @property + def supports_extended_custom_effects(self) -> bool: + """Return True if device supports extended custom effects.""" + return self.protocol == PROTOCOL_LEDENET_EXTENDED_CUSTOM + + @property + def extended_custom_effect_pattern_list(self) -> list[str] | None: + """Return available extended custom effect patterns, or None if not supported.""" + if not self.supports_extended_custom_effects: + return None + return [p.name.lower().replace("_", " ") for p in ExtendedCustomEffectPattern] + @property def effect(self) -> str | None: """Return the current effect.""" @@ -652,6 +671,16 @@ def _named_effect(self) -> str | None: mode = self.raw_state.mode pattern_code = self.preset_pattern_num protocol = self.protocol + # Devices with extended custom effects use different pattern names + if self.supports_extended_custom_effects and pattern_code == 0x25: + return EXTENDED_CUSTOM_EFFECT_ID_NAME.get(mode) + # For 0xB6 segment mode: preset_pattern=0x24 and mode=0x00 + if ( + self.supports_extended_custom_effects + and pattern_code == 0x24 + and mode == 0x00 + ): + return EXTENDED_CUSTOM_EFFECT_ID_NAME.get(mode) # Returns "Segments" if protocol in OLD_EFFECTS_PROTOCOLS: effect_id = (pattern_code << 8) + mode - 99 return ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME.get(effect_id) @@ -1252,9 +1281,9 @@ def _set_protocol_from_msg( full_msg: bytes, fallback_protocol: str, ) -> None: - self._model_num = full_msg[1] + model_num, version_num = extract_model_version_from_state(full_msg) + self._model_num = model_num self._model_data = get_model(self._model_num, fallback_protocol) - version_num = full_msg[10] if len(full_msg) > 10 else 1 self.setProtocol(self._model_data.protocol_for_version_num(version_num)) def _generate_preset_pattern( @@ -1296,6 +1325,79 @@ def _generate_custom_patterm( assert self._protocol is not None return self._protocol.construct_custom_effect(rgb_list, speed, transition_type) + def _generate_extended_custom_effect( + self, + pattern_id: int, + colors: list[tuple[int, int, int]], + speed: int = 50, + density: int = 50, + direction: int = 0x01, + option: int = 0x00, + ) -> bytearray: + """Generate the extended custom effect protocol bytes with validation. + + Only supported on devices using the extended protocol (e.g., 0xB6). + """ + # Validate pattern_id + valid_ids = set(range(1, 25)) | {101, 102} + if pattern_id not in valid_ids: + raise ValueError(f"Pattern ID must be 1-24 or 101-102, got {pattern_id}") + + # Truncate if more than 8 colors + if len(colors) > 8: + _LOGGER.warning( + "Too many colors in %s, truncating list to %s", len(colors), 8 + ) + colors = colors[:8] + + # Require at least one color + if len(colors) == 0: + raise ValueError("Surplife pattern requires at least one color") + + # Validate color tuples + for idx, color in enumerate(colors): + if len(color) != 3: + raise ValueError(f"Color {idx} must be (R, G, B) tuple") + for c in color: + if not 0 <= c <= 255: + raise ValueError(f"Color values must be 0-255, got {c}") + + assert self._protocol is not None + assert isinstance(self._protocol, ProtocolLEDENETExtendedCustom) + return self._protocol.construct_extended_custom_effect( + pattern_id, colors, speed, density, direction, option + ) + + def _generate_custom_segment_colors( + self, + segments: list[tuple[int, int, int] | None], + ) -> bytearray: + """Generate custom segment colors protocol bytes with validation. + + Only supported on devices using the extended protocol (e.g., 0xB6). + + Args: + segments: List of up to 20 segment colors. Each is (R, G, B) or None for off. + """ + # Truncate if more than 20 segments + if len(segments) > 20: + _LOGGER.warning("Too many segments (%s), truncating to 20", len(segments)) + segments = segments[:20] + + # Validate color tuples + for idx, color in enumerate(segments): + if color is None: + continue + if len(color) != 3: + raise ValueError(f"Segment {idx} must be (R, G, B) tuple or None") + for c in color: + if not 0 <= c <= 255: + raise ValueError(f"Color values must be 0-255, got {c}") + + assert self._protocol is not None + assert isinstance(self._protocol, ProtocolLEDENETExtendedCustom) + return self._protocol.construct_custom_segment_colors(segments) + def _effect_to_pattern(self, effect: str) -> int: """Convert an effect to a pattern code.""" protocol = self.protocol diff --git a/flux_led/const.py b/flux_led/const.py index ca122a66..c89f7166 100755 --- a/flux_led/const.py +++ b/flux_led/const.py @@ -38,6 +38,55 @@ class MultiColorEffects(Enum): BREATHING = 0x05 +class ExtendedCustomEffectPattern(Enum): + """Pattern IDs for extended custom effects (e.g., 0xB6 device).""" + + WAVE = 0x01 + METEOR = 0x02 + STREAMER = 0x03 + BUILDING_BLOCKS = 0x04 + FLOWING_WATER = 0x05 + CHASE = 0x06 + HORSE_RACING = 0x07 + CYCLE = 0x08 + BREATHE = 0x09 + JUMP = 0x0A + STROBE = 0x0B + TWINKLING_STARS = 0x0C + STARS_WINK = 0x0D + WARNING = 0x0E + COLLISION = 0x0F + FIREWORKS = 0x10 + COMET = 0x11 + GRADIENT_METEOR = 0x12 + VOLCANO = 0x13 + SUPERLUMINAL = 0x14 + RAINBOW_BRIDGE = 0x15 + GRADIENT_OVERLAY = 0x16 + STATIC_GRADIENT = 0x65 + STATIC_FILL = 0x66 + + +class ExtendedCustomEffectDirection(Enum): + """Direction for extended custom effect animations.""" + + LEFT_TO_RIGHT = 0x01 + RIGHT_TO_LEFT = 0x02 + + +class ExtendedCustomEffectOption(Enum): + """Option values for extended custom effects. + + The meaning varies by pattern: + - Some patterns: DEFAULT=static color, VARIANT_1=color change + - Rainbow patterns: DEFAULT=strobe, VARIANT_1=twinkle, VARIANT_2=breathe + """ + + DEFAULT = 0x00 + VARIANT_1 = 0x01 + VARIANT_2 = 0x02 + + DEFAULT_WHITE_CHANNEL_TYPE: Final = WhiteChannelType.WARM PRESET_MUSIC_MODE: Final = 0x62 @@ -161,3 +210,15 @@ class MultiColorEffects(Enum): PUSH_UPDATE_INTERVAL = 90 # seconds NEVER_TIME = -PUSH_UPDATE_INTERVAL + + +# Extended Timer Action Types (0xB6 devices) +TIMER_ACTION_ON: Final = 0x23 +TIMER_ACTION_OFF: Final = 0x24 +TIMER_ACTION_COLOR: Final = 0xA1 +TIMER_ACTION_SCENE_GRADIENT: Final = 0x29 +TIMER_ACTION_SCENE_SEGMENTS: Final = 0x6B + +# Extended Timer Effect Types +TIMER_EFFECT_GRADIENT: Final = 0x21 # e1 21 - gradient overlay +TIMER_EFFECT_SEGMENTS: Final = 0x22 # e1 22 - static segments/colorful diff --git a/flux_led/device.py b/flux_led/device.py index 181bc154..8f938996 100644 --- a/flux_led/device.py +++ b/flux_led/device.py @@ -276,7 +276,12 @@ def _determine_protocol(self) -> bytearray: # cannot process, recycle the connection self.close() continue - full_msg = rx + self._read_msg(protocol.state_response_length - read_bytes) + # Check for extended state format (0xEA 0x81) which needs 21 bytes total + if rx[0] == 0xEA and rx[1] == 0x81: + additional_bytes = 19 # 21 - 2 bytes already read + else: + additional_bytes = protocol.state_response_length - read_bytes + full_msg = rx + self._read_msg(additional_bytes) if not protocol.is_valid_state_response(full_msg): self.close() continue @@ -378,5 +383,58 @@ def setCustomPattern( retry=retry, ) + def setExtendedCustomEffect( + self, + pattern_id: int, + colors: list[tuple[int, int, int]], + speed: int = 50, + density: int = 50, + direction: int = 0x01, + option: int = 0x00, + retry: int = DEFAULT_RETRIES, + ) -> None: + """Set an extended custom effect on the device. + + Only supported on devices using the extended protocol (e.g., 0xB6). + + Args: + pattern_id: Pattern ID (1-24 or 101-102). See ExtendedCustomEffectPattern. + colors: List of 1-8 RGB color tuples, e.g., [(255, 0, 0), (0, 255, 0)] + speed: Animation speed 0-100 (default 50) + density: Pattern density 0-100 (default 50) + direction: Animation direction. See ExtendedCustomEffectDirection. + 0x01 = Left to Right (default) + 0x02 = Right to Left + option: Pattern-specific option (default 0) + retry: Number of retries on failure + """ + self._send_and_read_with_retry( + self._generate_extended_custom_effect( + pattern_id, colors, speed, density, direction, option + ), + 0, + retry=retry, + ) + + def setCustomSegmentColors( + self, + segments: list[tuple[int, int, int] | None], + retry: int = DEFAULT_RETRIES, + ) -> None: + """Set custom colors for each segment on the device. + + Only supported on devices using the extended protocol (e.g., 0xB6). + Sets static HSV colors for each of 20 segments on the light strip. + + Args: + segments: List of up to 20 segment colors. Each is (R, G, B) or None for off. + retry: Number of retries on failure + """ + self._send_and_read_with_retry( + self._generate_custom_segment_colors(segments), + 0, + retry=retry, + ) + def refreshState(self) -> None: return self.update_state() diff --git a/flux_led/fluxled.py b/flux_led/fluxled.py index 3f67b9e2..601369b9 100644 --- a/flux_led/fluxled.py +++ b/flux_led/fluxled.py @@ -45,14 +45,23 @@ import logging import sys from optparse import OptionGroup, OptionParser, Values -from typing import Any +from typing import Any, cast from .aio import AIOWifiLedBulb from .aioscanner import AIOBulbScanner -from .const import ATTR_ID, ATTR_IPADDR +from .const import ( + ATTR_ID, + ATTR_IPADDR, + TIMER_ACTION_COLOR, + TIMER_ACTION_OFF, + TIMER_ACTION_ON, + ExtendedCustomEffectDirection, + ExtendedCustomEffectPattern, +) from .pattern import PresetPattern +from .protocol import ProtocolLEDENETExtendedCustom from .scanner import FluxLEDDiscovery -from .timer import LedTimer +from .timer import LedTimer, LedTimerExtended from .utils import utils _LOGGER = logging.getLogger(__name__) @@ -107,6 +116,15 @@ def showUsageExamples() -> None: Set custom pattern 25% speed, red/green/blue, gradual change: %prog% 192.168.1.100 -C gradual 25 "red green (0,0,255)" +Set extended custom effect (0xB6 devices) - wave pattern, 80% speed: + %prog% 192.168.1.100 -C wave 80 "red green blue" + +Set extended effect with density, direction, and color change options: + %prog% 192.168.1.100 -C meteor 50 "purple orange" --density 75 --direction r2l --colorchange + +Set custom segment colors (0xB6 devices) - set each segment individually: + %prog% 192.168.1.100 -C segments 0 "red green blue off off red green blue" + Sync all bulb's clocks with this computer's: %prog% -sS --setclock @@ -355,35 +373,297 @@ def processSetTimerArgs(parser: OptionParser, args: Any) -> LedTimer: return timer -def processCustomArgs( - parser: OptionParser, args: Any -) -> tuple[Any, int, list[tuple[int, ...]]] | None: - if args[0] not in ["gradual", "jump", "strobe"]: - parser.error(f"bad pattern type: {args[0]}") - return None +def processSetTimerArgsExtended( + parser: OptionParser, args: list[str] +) -> LedTimerExtended: + """Process timer args for 0xB6 extended timer format. - speed = int(args[1]) + Supported modes: inactive, default (on), poweroff, color + Settings format: mode;time:HHMM;repeat:0123456;color:RRGGBB + """ + num = args[0] + mode = args[1].lower() if len(args) > 1 else "inactive" + settings = args[2] if len(args) > 2 else "" - # convert the string to a list of RGB tuples - # it should have space separated items of either - # color names, hex values, or byte triples - try: - color_list_str = args[2].strip() - str_list = color_list_str.split(" ") - color_list = [] - for s in str_list: - c = utils.color_object_to_tuple(s) - if c is not None: - color_list.append(c) - else: - raise Exception + if not num.isdigit() or int(num) > 6 or int(num) < 1: + parser.error("Timer number must be between 1 and 6") + + slot = int(num) - except Exception: + # parse settings + settings_dict: dict[str, str] = {} + if settings: + for s in settings.split(";"): + pair = s.split(":") + key = pair[0].strip().lower() + val = pair[1].strip().lower() if len(pair) > 1 else "" + settings_dict[key] = val + + if mode == "inactive": + return LedTimerExtended( + slot=slot, + active=False, + hour=0, + minute=0, + repeat_mask=0, + action_type=TIMER_ACTION_OFF, + ) + + if mode not in ["poweroff", "default", "color"]: parser.error( - "COLORLIST isn't formatted right. It should be a space separated list of RGB tuples, color names or web hex values" + f"Not a valid timer mode for this device: {mode}. " + "Supported: inactive, default, poweroff, color" ) - return args[0], speed, color_list + # validate time + if "time" not in settings_dict: + parser.error(f"This mode needs a time: {mode}") + time_str = settings_dict["time"] + if len(time_str) != 4 or not time_str.isdigit(): + parser.error("time must be 4 digits (HHMM)") + hour = int(time_str[0:2]) + minute = int(time_str[2:4]) + if hour > 23: + parser.error("timer hour can't be greater than 23") + if minute > 59: + parser.error("timer minute can't be greater than 59") + + # parse repeat mask + # Format: repeat:0123456 where 0=Sun, 1=Mon, ..., 6=Sat + repeat_mask = 0 + if "repeat" in settings_dict: + repeat_str = settings_dict["repeat"] + for c in repeat_str: + if c not in "0123456": + parser.error("repeat can only contain digits 0-6") + day = int(c) + # Map: 0=Sun->bit7, 1=Mon->bit1, 2=Tue->bit2, ..., 6=Sat->bit6 + if day == 0: + repeat_mask |= 0x80 # Sunday = bit 7 + else: + repeat_mask |= 1 << day # Mon=bit1, Tue=bit2, etc. + + # determine action type and build timer + if mode == "poweroff": + return LedTimerExtended( + slot=slot, + active=True, + hour=hour, + minute=minute, + repeat_mask=repeat_mask, + action_type=TIMER_ACTION_OFF, + ) + + if mode == "default": + return LedTimerExtended( + slot=slot, + active=True, + hour=hour, + minute=minute, + repeat_mask=repeat_mask, + action_type=TIMER_ACTION_ON, + ) + + if mode == "color": + if "color" not in settings_dict: + parser.error("color mode needs a color setting") + color_val = settings_dict["color"] + # If it looks like hex without # prefix (6 hex chars), add the prefix + if len(color_val) == 6 and all(c in "0123456789abcdef" for c in color_val): + color_val = "#" + color_val + rgb = utils.color_object_to_tuple(color_val) + if rgb is None: + parser.error(f"Invalid color value: {color_val}") + assert rgb is not None + # Convert RGB to HSV (0-255 scale) + r, g, b = rgb[0], rgb[1], rgb[2] + max_c = max(r, g, b) + min_c = min(r, g, b) + if max_c == 0: + hsv_s = 0 + hsv_h = 0 + else: + hsv_s = int((max_c - min_c) * 255 / max_c) + delta = max_c - min_c + if delta == 0: + hsv_h = 0 + elif max_c == r: + hsv_h = int(((g - b) / delta) * 255 / 6) % 256 + elif max_c == g: + hsv_h = int((2.0 + (b - r) / delta) * 255 / 6) % 256 + else: + hsv_h = int((4.0 + (r - g) / delta) * 255 / 6) % 256 + + # brightness from settings or default to 100% + brightness = 100 + if "brightness" in settings_dict: + brightness = int(settings_dict["brightness"]) + brightness = min(brightness, 100) + + return LedTimerExtended( + slot=slot, + active=True, + hour=hour, + minute=minute, + repeat_mask=repeat_mask, + action_type=TIMER_ACTION_COLOR, + color_hsv=(hsv_h, hsv_s, brightness), + ) + + # Should never reach here due to earlier validation + parser.error(f"Not a valid timer mode: {mode}") + raise SystemExit(1) # For type checker + + +def processCustomArgs( + parser: OptionParser, + args: Any, + density: int = 50, + direction: str = "l2r", + colorchange: bool = False, +) -> dict[str, Any] | None: + """Process custom pattern arguments. + + Supports both standard patterns (jump, gradual, strobe) and extended + patterns (wave, meteor, etc.) for devices that support them. + + Returns a dict with: + - mode: "standard", "extended", or "segments" + - For standard: type, speed, colors + - For extended: pattern_id, speed, density, colors, direction, option + - For segments: colors (list of up to 20 colors) + """ + # Build mapping of extended pattern names to IDs + extended_pattern_names = { + p.name.lower().replace("_", " "): p.value for p in ExtendedCustomEffectPattern + } + # Also add underscore versions and single word versions + for p in ExtendedCustomEffectPattern: + extended_pattern_names[p.name.lower()] = p.value + + pattern_type = args[0].lower() + speed = int(args[1]) + + # Check if this is a standard pattern + if pattern_type in ["gradual", "jump", "strobe"]: + # Standard custom pattern + try: + color_list_str = args[2].strip() + str_list = color_list_str.split(" ") + color_list: list[tuple[int, ...]] = [] + for s in str_list: + c = utils.color_object_to_tuple(s) + if c is not None: + color_list.append(c) + else: + raise ValueError(f"Invalid color: {s}") + except Exception: + parser.error( + "COLORLIST isn't formatted right. It should be a space separated list " + "of RGB tuples, color names or web hex values" + ) + return None + + return { + "mode": "standard", + "type": pattern_type, + "speed": speed, + "colors": color_list, + } + + # Check if this is "segments" mode + if pattern_type == "segments": + try: + color_list_str = args[2].strip() + str_list = color_list_str.split(" ") + segment_list: list[tuple[int, int, int] | None] = [] + for s in str_list: + s = s.strip().lower() + if not s: + continue + if s == "off": + segment_list.append(None) + else: + c = utils.color_object_to_tuple(s) + if c is not None and len(c) >= 3: + segment_list.append((c[0], c[1], c[2])) + else: + raise ValueError(f"Invalid color: {s}") + if len(segment_list) == 0: + parser.error("At least one segment color is required") + return None + if len(segment_list) > 20: + print("Warning: More than 20 segments provided, truncating to 20") + segment_list = segment_list[:20] + except Exception as e: + parser.error( + f"COLORLIST isn't formatted right: {e}. It should be a space-separated " + "list of color names, hex values, RGB triples, or 'off'" + ) + return None + + return { + "mode": "segments", + "colors": segment_list, + } + + # Check if this is an extended pattern name + if pattern_type in extended_pattern_names: + pattern_id = extended_pattern_names[pattern_type] + + # Parse color list + try: + color_list_str = args[2].strip() + str_list = color_list_str.split(" ") + ext_color_list: list[tuple[int, int, int]] = [] + for s in str_list: + c = utils.color_object_to_tuple(s) + if c is not None and len(c) >= 3: + ext_color_list.append((c[0], c[1], c[2])) + else: + raise ValueError(f"Invalid color: {s}") + if len(ext_color_list) == 0: + parser.error("At least one color is required") + return None + if len(ext_color_list) > 8: + print("Warning: More than 8 colors provided, truncating to 8") + ext_color_list = ext_color_list[:8] + except Exception as e: + parser.error( + f"COLORLIST isn't formatted right: {e}. It should be a space-separated " + "list of color names, hex values, or RGB triples" + ) + return None + + # Parse direction + if direction.lower() in ("l2r", "left", "ltr"): + dir_value = ExtendedCustomEffectDirection.LEFT_TO_RIGHT.value + elif direction.lower() in ("r2l", "right", "rtl"): + dir_value = ExtendedCustomEffectDirection.RIGHT_TO_LEFT.value + else: + parser.error(f"Invalid direction: {direction}. Use l2r or r2l") + return None + + # Option value + option_value = 0x01 if colorchange else 0x00 + + return { + "mode": "extended", + "pattern_id": pattern_id, + "speed": speed, + "density": density, + "colors": ext_color_list, + "direction": dir_value, + "option": option_value, + } + + # Unknown pattern type - show valid options + parser.error( + f"Unknown pattern type: {pattern_type}. Valid types include: " + f"jump, gradual, strobe, segments, wave, meteor, breathe, etc. " + f"Use --listpresets for the full list." + ) + return None def parseArgs() -> tuple[Values, Any]: @@ -427,7 +707,7 @@ def parseArgs() -> tuple[Values, Any]: action="store_true", dest="listpresets", default=False, - help="List preset codes", + help="List preset codes (including extended patterns for 0xB6 devices)", ) info_group.add_option( "--listcolors", @@ -527,11 +807,37 @@ def parseArgs() -> tuple[Values, Any]: default=None, nargs=3, help="Set custom pattern mode. " - + "TYPE should be jump, gradual, or strobe. SPEED is percent. " - + "COLORLIST is a space-separated list of color names, web hex values, or comma-separated RGB triples", + + "TYPE: jump, gradual, strobe (standard), or extended pattern names " + + "(wave, meteor, breathe, etc. for 0xB6 devices), or 'segments' for static colors. " + + "SPEED is percent (0-100). " + + "COLORLIST is a space-separated list of color names, hex values, or RGB triples. " + + "Use --density, --direction, --colorchange for extended pattern options.", ) parser.add_option_group(mode_group) + other_group.add_option( + "--density", + dest="density", + default=50, + type="int", + metavar="DENSITY", + help="Pattern density 0-100 for extended effects (default: 50)", + ) + other_group.add_option( + "--direction", + dest="direction", + default="l2r", + metavar="DIR", + help="Direction for extended effect: l2r (left to right) or r2l (right to left). Default: l2r", + ) + other_group.add_option( + "--colorchange", + action="store_true", + dest="colorchange", + default=False, + help="Enable color change option for extended effect", + ) + parser.add_option( "-i", "--info", @@ -611,10 +917,20 @@ def parseArgs() -> tuple[Values, Any]: sys.exit(0) if options.listpresets: + print("Standard preset patterns (-p option):") for c in range( PresetPattern.seven_color_cross_fade, PresetPattern.seven_color_jumping + 1 ): - print(f"{c:2} {PresetPattern.valtostr(c)}") + print(f" {c:2} {PresetPattern.valtostr(c)}") + print("\nStandard custom pattern types (-C option):") + print(" jump - Colors change instantly") + print(" gradual - Colors fade smoothly") + print(" strobe - Colors flash rapidly") + print("\nExtended custom patterns (-C option, 0xB6 devices only):") + for p in ExtendedCustomEffectPattern: + print(f" {p.name.lower().replace('_', ' '):<20} (ID: {p.value})") + print("\nSegment colors (-C segments, 0xB6 devices only):") + print(" segments - Set individual segment colors (up to 20)") sys.exit(0) if options.listcolors: @@ -623,11 +939,9 @@ def parseArgs() -> tuple[Values, Any]: print() sys.exit(0) - if options.settimer: - new_timer = processSetTimerArgs(parser, options.settimer) - options.new_timer = new_timer - else: - options.new_timer = None + # Timer processing is deferred to _async_run_commands since we need + # to know the device protocol first (0xB6 uses extended timer format) + options.new_timer = None mode_count = 0 if options.color: @@ -651,7 +965,13 @@ def parseArgs() -> tuple[Values, Any]: parser.error("options --on and --off are mutually exclusive") if options.custom: - options.custom = processCustomArgs(parser, options.custom) + options.custom = processCustomArgs( + parser, + options.custom, + density=options.density, + direction=options.direction, + colorchange=options.colorchange, + ) if options.color: options.color = utils.color_object_to_tuple(options.color) @@ -782,12 +1102,67 @@ def buf_in(str: str) -> None: ) elif options.custom is not None: - await bulb.async_set_custom_pattern( - options.custom[2], options.custom[1], options.custom[0] - ) - buf_in( - f"Setting custom pattern: {options.custom[0]}, Speed={options.custom[1]}%, {options.custom[2]}" - ) + custom = options.custom + mode = custom["mode"] + + if mode == "standard": + # Standard custom pattern (jump, gradual, strobe) + await bulb.async_set_custom_pattern( + custom["colors"], custom["speed"], custom["type"] + ) + buf_in( + f"Setting custom pattern: {custom['type']}, " + f"Speed={custom['speed']}%, {custom['colors']}" + ) + + elif mode == "extended": + # Extended custom effect (wave, meteor, etc.) + if not bulb.supports_extended_custom_effects: + raise ValueError( + f"Device {bulb.model} (model_num=0x{bulb.model_num:02X}) " + "does not support extended custom effects. " + "Use jump, gradual, or strobe for this device." + ) + pattern_id = custom["pattern_id"] + speed = custom["speed"] + density = custom["density"] + colors = custom["colors"] + direction = custom["direction"] + option = custom["option"] + + # Get pattern name for display + try: + pattern_name = ( + ExtendedCustomEffectPattern(pattern_id) + .name.lower() + .replace("_", " ") + ) + except ValueError: + pattern_name = f"pattern {pattern_id}" + dir_name = ( + "L->R" + if direction == ExtendedCustomEffectDirection.LEFT_TO_RIGHT.value + else "R->L" + ) + buf_in( + f"Setting extended effect: {pattern_name}, " + f"Speed={speed}%, Density={density}%, Direction={dir_name}, " + f"Colors={colors}" + ) + await bulb.async_set_extended_custom_effect( + pattern_id, colors, speed, density, direction, option + ) + + elif mode == "segments": + # Custom segment colors + if not bulb.supports_extended_custom_effects: + raise ValueError( + f"Device {bulb.model} (model_num=0x{bulb.model_num:02X}) " + "does not support custom segment colors. " + "This feature is only available on 0xB6 devices." + ) + buf_in(f"Setting custom segment colors: {len(custom['colors'])} segments") + await bulb.async_set_custom_segment_colors(custom["colors"]) elif options.preset is not None: buf_in( @@ -806,14 +1181,29 @@ def buf_in(str: str) -> None: buf_in("{} [{}] {} ({})".format(info["id"], info["ipaddr"], bulb, bulb.model)) if options.settimer: - empty_timers: list[LedTimer] = [] - timers = await bulb.async_get_timers() or empty_timers + # Check if device uses extended timer format (0xB6) + is_extended = isinstance(bulb._protocol, ProtocolLEDENETExtendedCustom) num = int(options.settimer[0]) - buf_in(f"New Timer ---- #{num}: {options.new_timer}") - if options.new_timer.isExpired(): - buf_in("[timer is already expired, will be deactivated]") - timers[num - 1] = options.new_timer - await bulb.async_set_timers(timers) + + if is_extended: + # Create a minimal parser for error handling + temp_parser = OptionParser() + ext_timer = processSetTimerArgsExtended(temp_parser, options.settimer) + buf_in(f"New Timer ---- #{num}: {ext_timer}") + await bulb.async_set_timer(ext_timer) + else: + temp_parser = OptionParser() + std_timer = processSetTimerArgs(temp_parser, options.settimer) + buf_in(f"New Timer ---- #{num}: {std_timer}") + if std_timer.isExpired(): + buf_in("[timer is already expired, will be deactivated]") + timers_result = await bulb.async_get_timers() + timers_list = cast("list[LedTimer]", timers_result) if timers_result else [] + if len(timers_list) < num: + # Extend list if needed + timers_list.extend([LedTimer() for _ in range(num - len(timers_list))]) + timers_list[num - 1] = std_timer + await bulb.async_set_timers(timers_list) if options.showtimers: show_timers = await bulb.async_get_timers() diff --git a/flux_led/models_db.py b/flux_led/models_db.py index dd55fb11..39369167 100755 --- a/flux_led/models_db.py +++ b/flux_led/models_db.py @@ -30,6 +30,10 @@ A2_PROTOCOL_TO_NUM, ADDRESSABLE_RGB_NUM_TO_WIRING, ADDRESSABLE_RGB_WIRING_TO_NUM, + LEDENET_EXTENDED_STATE_MODEL_POS, + LEDENET_EXTENDED_STATE_VERSION_POS, + LEDENET_STATE_MODEL_POS, + LEDENET_STATE_VERSION_POS, NEW_ADDRESSABLE_NUM_TO_OPERATING_MODE, NEW_ADDRESSABLE_NUM_TO_PROTOCOL, NEW_ADDRESSABLE_OPERATING_MODE_TO_NUM, @@ -47,6 +51,7 @@ PROTOCOL_LEDENET_ADDRESSABLE_CHRISTMAS, PROTOCOL_LEDENET_CCT, PROTOCOL_LEDENET_CCT_WRAPPED, + PROTOCOL_LEDENET_EXTENDED_CUSTOM, PROTOCOL_LEDENET_ORIGINAL, PROTOCOL_LEDENET_ORIGINAL_CCT, PROTOCOL_LEDENET_SOCKET, @@ -83,6 +88,7 @@ "ZG-LX": "", # Seen on floor lamp, v2 addressable, and Single channel controller "ZG-LX-UART": "", # Seen on UK xmas lights 0x33, fairy controller, and lytworx "ZG-BL-PWM": "", # Seen on 40w Flood Light + "ZG-BL-UFO": "", # Seen on Surplife outdoor permanent lighting 0xB6 "ZG-ZW2": "", # seen on 0x97 socket "ZGIR44": "44 Key IR", "IR_ZG": "IR", @@ -343,6 +349,7 @@ class LEDENETModel: channel_map: dict[str, str] # Used to remap channels microphone: bool device_config: LEDENETDeviceConfigurationOptions + supports_extended_custom_effects: bool = False def protocol_for_version_num(self, version_num: int) -> str: protocol = self.protocols[-1].protocol @@ -638,6 +645,15 @@ def protocol_for_version_num(self, version_num: int) -> str: auto_on=False, dimmable_effects=False, ), + LEDENETHardware( + model="AK001-ZJ21413", # Surplife outdoor permanent LED lighting + chip=LEDENETChip.BL602, + remote_rf=False, + remote_24g=True, # has remote access enabled + remote_24g_controls=False, + auto_on=True, + dimmable_effects=True, + ), ] HARDWARE_MAP: dict[str, LEDENETHardware] = {model.model: model for model in HARDWARE} @@ -1296,6 +1312,35 @@ def protocol_for_version_num(self, version_num: int) -> str: microphone=True, # confirmed with mocks to be true device_config=NEW_ADDRESABLE_DEVICE_CONFIG, ), + LEDENETModel( + model_num=0xB6, + models=["AK001-ZJ21413"], + description="Surplife Outdoor Permanent Lighting", + # NOTE: This device ONLY responds with extended state format (0xEA 0x81) + # introduced in PR #428, unlike 0x35 which can respond with both + # standard (0x81) and extended (0xEA 0x81) formats + # + # CUSTOM PATTERNS: This device supports 24 unique pattern types (wave, + # meteor, jump, strobe, comet, etc.) with customizable colors, speed, and + # density via ExtendedCustomEffectPattern and async_set_extended_custom_effect(). + # The manufacturer's app shows 45 "scenes" (pre-configured colors) and 24 + # "customizable patterns" (user-chosen colors), but they share the same + # pattern IDs - scenes are just preset color combinations of the patterns. + # Pattern IDs: 1-22 (animated), 101-102 (static). When active, preset_pattern + # byte = 0x25 (37) and byte 8 = actual pattern ID. + # + # STANDARD PRESET PATTERNS: NOT SUPPORTED by this device hardware. Standard + # preset pattern commands (0x61...) and custom effect commands (0x51...) do + # not work and will never work on this device. + always_writes_white_and_colors=False, + protocols=[MinVersionProtocol(0, PROTOCOL_LEDENET_EXTENDED_CUSTOM)], + # Device has single white LED (not separate warm/cool), so use RGB_W not RGB_CCT + mode_to_color_mode={0x01: COLOR_MODES_RGB_W, 0x17: COLOR_MODES_RGB_W}, + color_modes=COLOR_MODES_RGB_W, + channel_map={}, + microphone=True, + device_config=IMMUTABLE_DEVICE_CONFIG, + ), LEDENETModel( model_num=0xD1, models=[], @@ -1347,6 +1392,33 @@ def protocol_for_version_num(self, version_num: int) -> str: MODEL_MAP: dict[int, LEDENETModel] = {model.model_num: model for model in MODELS} +def extract_model_version_from_state(full_msg: bytes) -> tuple[int, int]: + """Extract model number and version number from a state message. + + Handles both standard state format (0x81) and extended state format (0xEA 0x81). + + Returns: + Tuple of (model_num, version_num), with version_num defaulting to 1 if not present. + """ + if len(full_msg) >= 20 and full_msg[0] == 0xEA and full_msg[1] == 0x81: + # Extended state format + model_num = full_msg[LEDENET_EXTENDED_STATE_MODEL_POS] + version_num = ( + full_msg[LEDENET_EXTENDED_STATE_VERSION_POS] + if len(full_msg) > LEDENET_EXTENDED_STATE_VERSION_POS + else 1 + ) + else: + # Standard state format + model_num = full_msg[LEDENET_STATE_MODEL_POS] + version_num = ( + full_msg[LEDENET_STATE_VERSION_POS] + if len(full_msg) > LEDENET_STATE_VERSION_POS + else 1 + ) + return model_num, version_num + + def get_model(model_num: int, fallback_protocol: str | None = None) -> LEDENETModel: """Return the LEDNETModel for the model_num.""" return MODEL_MAP.get( diff --git a/flux_led/pattern.py b/flux_led/pattern.py index 28d7575a..d8abca2a 100644 --- a/flux_led/pattern.py +++ b/flux_led/pattern.py @@ -201,6 +201,40 @@ v: k for k, v in ASSESSABLE_MULTI_COLOR_ID_NAME.items() } +# Extended custom effect pattern names for 0xB6 (Surplife) devices +# These map to ExtendedCustomEffectPattern enum values +EXTENDED_CUSTOM_EFFECT_ID_NAME = { + 0x00: "Segments", # Custom segment colors mode + 0x01: "Wave", + 0x02: "Meteor", + 0x03: "Streamer", + 0x04: "Building Blocks", + 0x05: "Flowing Water", + 0x06: "Chase", + 0x07: "Horse Racing", + 0x08: "Cycle", + 0x09: "Breathe", + 0x0A: "Jump", + 0x0B: "Strobe", + 0x0C: "Twinkling Stars", + 0x0D: "Stars Wink", + 0x0E: "Warning", + 0x0F: "Collision", + 0x10: "Fireworks", + 0x11: "Comet", + 0x12: "Gradient Meteor", + 0x13: "Volcano", + 0x14: "Superluminal", + 0x15: "Rainbow Bridge", + 0x16: "Gradient Overlay", + 0x65: "Static Gradient", + 0x66: "Static Fill", + 0x6E: "Solid Color", # Mode when solid color is set +} +EXTENDED_CUSTOM_EFFECT_NAME_ID = { + v: k for k, v in EXTENDED_CUSTOM_EFFECT_ID_NAME.items() +} + ORIGINAL_ADDRESSABLE_EFFECT_ID_NAME = { 1: "Circulate all modes", 2: "7 colors change gradually", diff --git a/flux_led/protocol.py b/flux_led/protocol.py index 3bd1e797..22dc74bb 100755 --- a/flux_led/protocol.py +++ b/flux_led/protocol.py @@ -27,7 +27,7 @@ LevelWriteModeData, MultiColorEffects, ) -from .timer import LedTimer +from .timer import LedTimer, LedTimerExtended from .utils import ( scaled_color_temp_to_white_levels, utils, @@ -97,6 +97,7 @@ class PowerRestoreStates: PROTOCOL_LEDENET_CCT_WRAPPED = "LEDENET_CCT_WRAPPED" PROTOCOL_LEDENET_ADDRESSABLE_CHRISTMAS = "LEDENET_CHRISTMAS" PROTOCOL_LEDENET_25BYTE = "LEDENET_25_BYTE" +PROTOCOL_LEDENET_EXTENDED_CUSTOM = "LEDENET_EXTENDED_CUSTOM" TRANSITION_BYTES = { TRANSITION_JUMP: 0x3B, @@ -109,6 +110,7 @@ class PowerRestoreStates: LEDENET_POWER_RESTORE_RESPONSE_LEN = 7 LEDENET_ORIGINAL_STATE_RESPONSE_LEN = 11 LEDENET_STATE_RESPONSE_LEN = 14 +LEDENET_EXTENDED_STATE_RESPONSE_LEN = 21 LEDENET_POWER_RESPONSE_LEN = 4 LEDENET_ADDRESSABLE_STATE_RESPONSE_LEN = 25 LEDENET_A1_DEVICE_CONFIG_RESPONSE_LEN = 12 @@ -119,6 +121,15 @@ class PowerRestoreStates: LEDENET_TIMERS_9BYTE_RESPONSE_LEN = 94 LEDENET_TIMERS_SOCKET_RESPONSE_LEN = 100 +# Byte positions for parsing state messages +# Standard state format (0x81): model at position 1, version at position 10 +LEDENET_STATE_MODEL_POS = 1 +LEDENET_STATE_VERSION_POS = 10 +# Extended state format (0xEA 0x81): model at position 4, version at position 5 +# Extended state format was introduced in PR #428 for devices like 0x35 v10 and 0xB6 +LEDENET_EXTENDED_STATE_MODEL_POS = 4 +LEDENET_EXTENDED_STATE_VERSION_POS = 5 + MSG_ORIGINAL_POWER_STATE = "original_power_state" MSG_ORIGINAL_STATE = "original_state" MSG_POWER_RESTORE_STATE = "power_restore_state" @@ -126,6 +137,7 @@ class PowerRestoreStates: MSG_STATE = "state" MSG_TIME = "time" MSG_TIMERS = "timers" +MSG_TIMERS_EXTENDED = "timers_extended" MSG_MUSIC_MODE_STATE = "music_mode_state" MSG_ADDRESSABLE_STATE = "addressable_state" MSG_DEVICE_CONFIG = "device_config" @@ -161,6 +173,7 @@ class PowerRestoreStates: (0x63,): MSG_A1_DEVICE_CONFIG, (0x72,): MSG_MUSIC_MODE_STATE, (0x2B,): MSG_REMOTE_CONFIG, + (0xE0, 0x06): MSG_TIMERS_EXTENDED, # 0xB6 device timer response } MSG_LENGTHS = { @@ -1021,14 +1034,35 @@ def _is_start_of_power_state_response(self, data: bytes) -> bool: """Check if a message is the start of a state response.""" return _message_type_from_start_of_msg(data) == MSG_POWER_STATE + def is_valid_extended_state_response(self, raw_state: bytes) -> bool: + """Check if this is an extended state response (0xEA 0x81 format). + + Some devices (like 0xB6) only respond with extended state format. + This allows probing to recognize these responses. + """ + return len(raw_state) >= 20 and raw_state[0] == 0xEA and raw_state[1] == 0x81 + def is_valid_state_response(self, raw_state: bytes) -> bool: """Check if a state response is valid.""" + # Check for extended state format (0xEA 0x81) first + if self.is_valid_extended_state_response(raw_state): + return True + # Check for standard state format (0x81) if len(raw_state) != self.state_response_length: return False if raw_state[0] != 129: return False return self.is_checksum_correct(raw_state) + def extended_state_to_state(self, raw_state: bytes) -> bytes: + """Convert an extended state response to a standard state response. + + This is overridden by ProtocolLEDENET25Byte with the actual conversion logic. + Default implementation returns raw_state unchanged for protocols that don't + use extended state. + """ + return raw_state + def construct_state_change(self, turn_on: int) -> bytearray: """ The bytes to send for a state change request. @@ -1593,6 +1627,472 @@ def construct_levels_change( ] +class ProtocolLEDENETExtendedCustom(ProtocolLEDENET25Byte): + """Protocol for devices with extended state format (0xEA 0x81) and custom effects. + + Used by devices like Surplife Outdoor Permanent Lighting (0xB6) with RGBW LEDs. + This protocol: + - ONLY responds with extended state format (0xEA 0x81, 20+ bytes) + - Supports extended custom effect commands (0xE1 0x21, 0xE1 0x22) + - Uses HSV+W color format: [H/2, S, V, 0x00, W] (5 bytes per color) + - Parent's 0xE0 command works for RGB but NOT for white/CCT + - Uses 0xE1 0x21 STATIC_FILL (0x66) for solid RGBW colors + + Extended state response format (0xEA 0x81, 21+ bytes): + pos 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ... + EA 81 01 00 MN VN PW PP MD SP ?? HH SS VV WT WB ... + | | | | | | | | | | | + | | | | | | | | | | white brightness (0-100) + | | | | | | | | | white temp (0-100, 0xFF=off) + | | | | | | | | value (0-100) + | | | | | | | saturation (0-100) + | | | | | | hue/2 (0-180) + | | | | | speed (0-100) + | | | | mode/pattern ID (when PP=0x25) + | | | preset_pattern (0x25=custom effect) + | | power state (0x23=ON, 0x24=OFF) + | version number + model number (0xB6) + + RGBW color format in 0xE1 0x21 command: + [H/2, S, V, 0x00, W] + | | | | | + | | | | white LED brightness (0-255) + | | | unused in 0xB6 (always 0x00) + | | value/brightness (0-100) + | saturation (0-100) + hue/2 (0-180, divide actual hue by 2) + """ + + @property + def name(self) -> str: + """The name of the protocol.""" + return PROTOCOL_LEDENET_EXTENDED_CUSTOM + + @property + def state_response_length(self) -> int: + """The length of the query response. + + Extended state format is 21 bytes (0xEA 0x81 + 19 bytes). + """ + return LEDENET_EXTENDED_STATE_RESPONSE_LEN + + def is_valid_state_response(self, raw_state: bytes) -> bool: + """Check if a state response is valid. + + This protocol ONLY accepts extended state format (0xEA 0x81). + """ + return self.is_valid_extended_state_response(raw_state) + + def extended_state_to_state(self, raw_state: bytes) -> bytes: + """Convert extended state to standard state format. + + For this protocol, extended state mode (position 8) always contains + the custom effect pattern ID when preset_pattern is 0x25. + No model lookup needed since this protocol always supports extended effects. + """ + if len(raw_state) < 20: + return b"" + + model_num = raw_state[4] + version_number = raw_state[5] + power_state = raw_state[6] + preset_pattern = raw_state[7] + speed = raw_state[9] + + hue = raw_state[11] + saturation = raw_state[12] + value = raw_state[13] + + white_temp = raw_state[14] + white_brightness = raw_state[15] + + if white_temp > 100 or white_brightness == 0: + cool_white = 0 + warm_white = 0 + else: + levels = scaled_color_temp_to_white_levels(white_temp, white_brightness) + cool_white = levels.cool_white + warm_white = levels.warm_white + + # Convert HSV to RGB + h = (hue * 2) / 360 + s = saturation / 100 + v = value / 100 + r_f, g_f, b_f = colorsys.hsv_to_rgb(h, s, v) + red = min(int(max(0, r_f) * 255), 255) + green = min(int(max(0, g_f) * 255), 255) + blue = min(int(max(0, b_f) * 255), 255) + + # For this protocol, mode always contains the effect pattern ID + # when in custom pattern mode (preset_pattern == 0x25) + mode = raw_state[8] if preset_pattern == 0x25 else 0 + color_mode = 0 + check_sum = 0 + + return bytes( + ( + raw_state[1], # Head (0x81) + model_num, + power_state, + preset_pattern, + mode, + speed, + red, + green, + blue, + warm_white, + version_number, + cool_white, + color_mode, + check_sum, + ) + ) + + def named_raw_state(self, raw_state: bytes) -> LEDENETRawState: + """Convert raw_state to a namedtuple. + + For extended state format (0xEA 0x81), convert to standard 14-byte format first. + If already in standard format (14 bytes starting with 0x81), use directly. + """ + # Check if this is extended state format (0xEA 0x81) + if len(raw_state) >= 20 and raw_state[0] == 0xEA and raw_state[1] == 0x81: + converted_state = self.extended_state_to_state(raw_state) + return LEDENETRawState(*converted_state) + # Already standard 14-byte format + return LEDENETRawState(*raw_state) + + def _rgb_to_hsv_bytes_rgbw( + self, r: int, g: int, b: int, white: int = 0 + ) -> list[int]: + """Convert RGBW (0-255) to 5-byte HSVW format for extended effect commands. + + Output format: + [H/2, S, V, 0x00, W] + 0 1 2 3 4 + | | | | white LED brightness (0-255) + | | | unused (always 0x00) + | | value/brightness (0-100) + | saturation (0-100) + hue divided by 2 (0-180) + + Args: + r: Red component (0-255) + g: Green component (0-255) + b: Blue component (0-255) + white: White LED brightness (0-255) + + Returns: + 5-byte list [H/2, S, V, 0x00, W] + """ + h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) + return [int(h * 180), int(s * 100), int(v * 100), 0x00, white] + + def construct_levels_change( + self, + persist: int, + red: int | None, + green: int | None, + blue: int | None, + warm_white: int | None, + cool_white: int | None, + write_mode: LevelWriteMode | int, + ) -> list[bytearray]: + """Construct level change using 0xE1 0x21 STATIC_FILL command. + + The parent's 0xE0 wrapped command works for RGB but not for CCT/white. + This override uses extended custom effect command (0xE1 0x21) with + STATIC_FILL pattern (0x66) which supports both RGB and white. + + Inner message format (before wrapping): + pos 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 + E1 21 00 64 66 00 01 32 32 00 00 00 00 00 00 01 HH SS VV 00 WW + | | | | | | | | | | | | + | | | | | | | | | | | white (0-255) + | | | | | | | | | | unused + | | | | | | | | | value (0-100) + | | | | | | | | saturation (0-100) + | | | | | | | hue/2 (0-180) + | | | | | | color count (1) + | | | | | reserved (6 bytes, all 0x00) + | | | | speed (0x32 = 50) + | | | density (0x32 = 50) + | | direction (0x01 = L->R) + | option (0x00 = default) + pattern ID (0x66 = STATIC_FILL) + + Note: Device has single white LED, so warm_white and cool_white + are combined into one brightness value. + """ + # STATIC_FILL pattern ID + STATIC_FILL = 0x66 + + r = red or 0 + g = green or 0 + b = blue or 0 + + # Combine warm and cool white into single white brightness + # (device only has one white LED) + w = min((warm_white or 0) + (cool_white or 0), 255) + + # Build extended custom effect command with RGBW support + msg = bytearray( + [ + 0xE1, + 0x21, + 0x00, # Command type + 0x64, # Constant (100) + STATIC_FILL, # Pattern ID + 0x00, # Option + 0x01, # Direction (L->R) + 0x32, # Density (50) + 0x32, # Speed (50) + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, # Reserved (6 bytes) + 0x01, # Color count (1) + ] + ) + + # Add RGBW color as HSV with white in 5th byte + msg.extend(self._rgb_to_hsv_bytes_rgbw(r, g, b, w)) + + return [ + self.construct_wrapped_message( + msg, inner_pre_constructed=True, version=0x02 + ) + ] + + def _rgb_to_hsv_bytes(self, r: int, g: int, b: int) -> list[int]: + """Convert RGB (0-255) to 5-byte HSV format [H/2, S, V, 0, 0]. + + This format is used by extended commands (0xE1 0x21, 0xE1 0x22). + """ + h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) + return [int(h * 180), int(s * 100), int(v * 100), 0x00, 0x00] + + def construct_extended_custom_effect( + self, + pattern_id: int, + colors: list[tuple[int, int, int]], + speed: int = 50, + density: int = 50, + direction: int = 0x01, + option: int = 0x00, + ) -> bytearray: + """Construct an extended custom effect command. + + Protocol format: + 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ... + e1 21 00 64 PP OO DD NS SP 00 00 00 00 00 00 CC [H S V 00 00] x N + | | | | | | colors (5 bytes each) + | | | | | color count + | | | | speed (0-100) + | | | density (0-100) + | | direction (01=L->R, 02=R->L) + | option (00=default, 01=color change) + pattern_id (1-24 or 101-102) + + Args: + pattern_id: Pattern ID 1-24 or 101-102 + colors: List of 1-8 RGB color tuples (0-255 per channel) + speed: Animation speed 0-100 (default 50) + density: Pattern density 0-100 (default 50) + direction: 0x01=L->R, 0x02=R->L (default L->R) + option: Pattern-specific option (default 0) + + Returns: + Wrapped command bytearray + """ + # Clamp values to valid range + speed = max(0, min(100, speed)) + density = max(0, min(100, density)) + + # Convert Enum to value if needed + if hasattr(pattern_id, "value"): + pattern_id = pattern_id.value + if hasattr(option, "value"): + option = option.value + if hasattr(direction, "value"): + direction = direction.value + + # Build inner message + msg = bytearray( + [ + 0xE1, + 0x21, + 0x00, # Command type + 0x64, # Constant (100) + pattern_id, + option, + direction, + density, + speed, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, # Reserved (6 bytes) + len(colors), # Color count + ] + ) + + # Add colors as HSV (5 bytes each) + for r, g, b in colors: + msg.extend(self._rgb_to_hsv_bytes(r, g, b)) + + return self.construct_wrapped_message( + msg, inner_pre_constructed=True, version=0x02 + ) + + def construct_custom_segment_colors( + self, + segments: list[tuple[int, int, int] | None], + ) -> bytearray: + """Construct a custom segment colors command (0xE1 0x22). + + Sets static HSV colors for each of 20 segments on the light strip. + Used by devices like AK001-ZJ21413 (model 0xB6) under the "colorful" menu. + + Protocol format: + e1 22 00 00 00 00 14 [H/2 S V 00 00] x 20 + + Args: + segments: List of up to 20 segment colors. Each segment is either: + - None or (0, 0, 0) for off + - (R, G, B) tuple with values 0-255 + + Returns: + Wrapped command bytearray + """ + # Build inner message: header + 20 segments + msg = bytearray([0xE1, 0x22, 0x00, 0x00, 0x00, 0x00, 0x14]) + + for i in range(20): + segment = segments[i] if i < len(segments) else None + if segment and segment != (0, 0, 0): + msg.extend(self._rgb_to_hsv_bytes(*segment)) + else: + msg.extend([0x00, 0x00, 0x00, 0x00, 0x00]) # Off + + return self.construct_wrapped_message( + msg, inner_pre_constructed=True, version=0x02 + ) + + # --- Timer Support for 0xB6 devices --- + # This protocol uses a different timer format than other protocols: + # - Query: e0 06 (wrapped) + # - Response: e0 06 + variable length slot data + # - Set: e0 05 SS f0 HH MM 00 RR ... (wrapped) + + def construct_get_timers(self) -> bytearray: + """Construct a get timers request. + + 0xB6 devices use the wrapped e0 06 command for timer queries. + """ + inner = bytearray([0xE0, 0x06]) + return self.construct_wrapped_message( + inner, inner_pre_constructed=True, version=0x01 + ) + + def construct_set_timer(self, timer: LedTimerExtended) -> bytearray: + """Construct a set timer command for a single timer. + + 0xB6 devices set timers one at a time using e0 05 SS ... + + Args: + timer: The LedTimerExtended object to set + + Returns: + Wrapped command bytearray + """ + inner = bytearray([0xE0, 0x05, timer.slot]) + inner.extend(timer.to_bytes()) + return self.construct_wrapped_message( + inner, inner_pre_constructed=True, version=0x01 + ) + + def construct_set_timers( # type: ignore[override] + self, timer_list: list[LedTimerExtended] + ) -> list[bytearray]: + """Construct set timer commands for multiple timers. + + 0xB6 devices set timers one at a time, so this returns a list + of commands (one per timer). + + Args: + timer_list: List of LedTimerExtended objects to set + + Returns: + List of wrapped command bytearrays + """ + return [self.construct_set_timer(timer) for timer in timer_list] + + def is_valid_timers_response(self, msg: bytes) -> bool: + """Check if a message is a valid timers response. + + 0xB6 timer responses start with e0 06. + """ + if len(msg) < 2: + return False + return msg[0] == 0xE0 and msg[1] == 0x06 + + def parse_get_timers(self, msg: bytes) -> list[LedTimerExtended]: # type: ignore[override] + """Parse timer response into list of LedTimerExtended objects. + + Response format: + - Empty: e0 06 (2 bytes) + - With timers: e0 06 [slot1] [slot2] ... [slot6] + + Each slot is either: + - 7 bytes if empty/inactive + - 21 bytes if simple timer (ON/OFF/color) + - Variable (48+) bytes if scene timer + """ + if len(msg) <= 2: + # Empty response or just header - no timers configured + return [] + + timers: list[LedTimerExtended] = [] + offset = 2 # Skip e0 06 header + + while offset < len(msg): + # Check if we have enough bytes for minimum slot (7 bytes) + if offset + 7 > len(msg): + break + + timer, consumed = LedTimerExtended.from_bytes(msg, offset) + timers.append(timer) + offset += consumed + + return timers + + @property + def timer_count(self) -> int: + """Number of timer slots supported.""" + return 6 + + def expected_timers_response_length(self, data: bytes) -> int: + """Calculate expected timer response length. + + For 0xB6 devices, the timer response has variable length + based on the inner message length encoded in the wrapped message. + """ + # Timer responses are wrapped, so we extract from the wrapper + if ( + len(data) >= OUTER_MESSAGE_WRAPPER_START_LEN + and data[0] == OUTER_MESSAGE_FIRST_BYTE + ): + inner_msg_len = (data[8] << 8) + data[9] + return OUTER_MESSAGE_WRAPPER_START_LEN + inner_msg_len + CHECKSUM_LEN + # Fallback + return len(data) + + class ProtocolLEDENETAddressableBase(ProtocolLEDENET9Byte): """Base class for addressable protocols.""" diff --git a/flux_led/timer.py b/flux_led/timer.py index c80f6022..049a6685 100644 --- a/flux_led/timer.py +++ b/flux_led/timer.py @@ -1,10 +1,24 @@ from __future__ import annotations import datetime - +from dataclasses import dataclass, field + +from .const import ( + TIMER_ACTION_COLOR, + TIMER_ACTION_OFF, + TIMER_ACTION_ON, + TIMER_ACTION_SCENE_GRADIENT, + TIMER_ACTION_SCENE_SEGMENTS, + TIMER_EFFECT_GRADIENT, + TIMER_EFFECT_SEGMENTS, + ExtendedCustomEffectPattern, +) from .pattern import PresetPattern from .utils import utils +# Type alias for HSV color tuple (hue 0-180, saturation 0-100, value 0-100) +HSVColor = tuple[int, int, int] + class BuiltInTimer: sunrise = 0xA1 @@ -365,3 +379,267 @@ def __str__(self) -> str: txt += f"{type} (Duration:{self.duration} minutes, Brightness: {utils.byteToPercent(self.brightness_start)}% -> {utils.byteToPercent(self.brightness_end)}%)" return txt + + +@dataclass +class LedTimerExtended: + """Extended timer for 0xB6 devices with scene/color support. + + Timer format (from packet analysis): + - Simple ON/OFF: 21 bytes (slot + 20 bytes data) + - Scene timer: variable (slot + header + effect data + Nx5 bytes colors) + + Repeat mask format (different from LedTimer): + - bit 0: reserved (always 0 for repeat mode) + - bit 1-7: Mon-Sun + """ + + # Day constants (same as LedTimer) + Mo = 0x02 + Tu = 0x04 + We = 0x08 + Th = 0x10 + Fr = 0x20 + Sa = 0x40 + Su = 0x80 + Everyday = Mo | Tu | We | Th | Fr | Sa | Su + Weekdays = Mo | Tu | We | Th | Fr + Weekend = Sa | Su + + slot: int = 1 + active: bool = True + hour: int = 0 + minute: int = 0 + repeat_mask: int = 0 + action_type: int = TIMER_ACTION_OFF + + # For color action (0xa1) - HSV tuple (hue 0-180, saturation 0-100, value 0-100) + color_hsv: HSVColor | None = None + + # For scene actions (0x29=gradient, 0x6b=segments) + pattern: ExtendedCustomEffectPattern | None = None + speed: int = 50 + colors: list[HSVColor] = field(default_factory=list) + + @property + def is_scene(self) -> bool: + """Return True if this is a scene timer.""" + return self.action_type in ( + TIMER_ACTION_SCENE_GRADIENT, + TIMER_ACTION_SCENE_SEGMENTS, + ) + + @property + def is_color(self) -> bool: + """Return True if this is a color timer.""" + return self.action_type == TIMER_ACTION_COLOR + + @property + def is_on(self) -> bool: + """Return True if timer turns on the device.""" + return self.action_type in ( + TIMER_ACTION_ON, + TIMER_ACTION_COLOR, + TIMER_ACTION_SCENE_GRADIENT, + TIMER_ACTION_SCENE_SEGMENTS, + ) + + @property + def repeat_days(self) -> list[str]: + """Return list of repeat day names.""" + days = [] + day_map = [ + (self.Mo, "Mon"), + (self.Tu, "Tue"), + (self.We, "Wed"), + (self.Th, "Thu"), + (self.Fr, "Fri"), + (self.Sa, "Sat"), + (self.Su, "Sun"), + ] + for mask, name in day_map: + if self.repeat_mask & mask: + days.append(name) + return days + + def to_bytes(self) -> bytes: + """Serialize timer to bytes for SET command (without slot number).""" + if not self.active: + return bytes(20) + + if self.is_scene: + return self._to_bytes_scene() + if self.is_color: + return self._to_bytes_color() + return self._to_bytes_simple() + + def _to_bytes_simple(self) -> bytes: + """Serialize simple ON/OFF timer.""" + data = bytearray(20) + data[0] = 0xF0 # active flag + data[1] = self.hour + data[2] = self.minute + data[3] = 0x00 # seconds + data[4] = self.repeat_mask + data[5:10] = [0x0E, 0xE0, 0x01, 0x00, self.action_type] + return bytes(data) + + def _to_bytes_color(self) -> bytes: + """Serialize color timer.""" + data = bytearray(20) + data[0] = 0xF0 # active flag + data[1] = self.hour + data[2] = self.minute + data[3] = 0x00 # seconds + data[4] = self.repeat_mask + data[5:10] = [0x0E, 0xE0, 0x01, 0x00, TIMER_ACTION_COLOR] + if self.color_hsv: + data[10], data[11], data[12] = self.color_hsv + return bytes(data) + + def _to_bytes_scene(self) -> bytes: + """Serialize scene timer.""" + is_gradient = self.action_type == TIMER_ACTION_SCENE_GRADIENT + effect_type = TIMER_EFFECT_GRADIENT if is_gradient else TIMER_EFFECT_SEGMENTS + pattern_id = self.pattern.value if self.pattern else 0x16 + + data = bytearray() + data.extend([0xF0, self.hour, self.minute, 0x00, self.repeat_mask]) + data.extend([self.action_type, 0xE1, effect_type]) + + if is_gradient: + # Gradient header (14 bytes + num_colors) + data.extend( + [ + 0x00, + 100, + pattern_id, # sub-cmd, brightness, pattern + 0x00, + 0x01, + self.speed, + 0x50, # option, direction, speed, transition + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, # padding + len(self.colors), + ] + ) + else: + # Segments header (4 bytes + num_colors) + data.extend([0x00, 0x00, 0x00, 0x00, len(self.colors)]) + + for h, s, v in self.colors: + data.extend([h, s, v, 0x00, 0x00]) + return bytes(data) + + @classmethod + def from_bytes(cls, data: bytes, offset: int = 0) -> tuple[LedTimerExtended, int]: + """Parse timer from response bytes. Returns (timer, bytes_consumed).""" + if offset + 7 > len(data): + slot = data[offset] if offset < len(data) else 1 + return cls(slot=slot, active=False), len(data) - offset + + slot = data[offset] + flags = data[offset + 1] + if flags == 0x00: + return cls(slot=slot, active=False), 7 + + hour = data[offset + 2] + minute = data[offset + 3] + if hour > 23 or minute > 59: + return cls(slot=slot, active=False), 7 + + timer = cls( + slot=slot, + active=True, + hour=hour, + minute=minute, + repeat_mask=data[offset + 5], + ) + + action_indicator = data[offset + 6] + + if action_indicator == 0x0E: + # Simple or color timer + if offset + 11 <= len(data): + timer.action_type = data[offset + 10] + if timer.action_type == TIMER_ACTION_COLOR and offset + 14 <= len(data): + timer.color_hsv = ( + data[offset + 11], + data[offset + 12], + data[offset + 13], + ) + return timer, min(21, len(data) - offset) + + # Scene timer - look for e1 marker + if offset + 9 <= len(data) and data[offset + 7] == 0xE1: + effect_type = data[offset + 8] + is_gradient = effect_type == TIMER_EFFECT_GRADIENT + + timer.action_type = ( + TIMER_ACTION_SCENE_GRADIENT + if is_gradient + else TIMER_ACTION_SCENE_SEGMENTS + ) + + if is_gradient: + if offset + 23 > len(data): + return timer, min(21, len(data) - offset) + num_colors_offset, color_start = offset + 22, offset + 23 + pattern_id = data[offset + 11] if offset + 12 <= len(data) else 0x16 + for p in ExtendedCustomEffectPattern: + if p.value == pattern_id: + timer.pattern = p + break + timer.speed = data[offset + 14] if offset + 15 <= len(data) else 50 + else: + if offset + 14 > len(data): + return timer, min(21, len(data) - offset) + num_colors_offset, color_start = offset + 13, offset + 14 + + num_colors = data[num_colors_offset] + timer.colors = [] + for i in range(num_colors): + c_off = color_start + i * 5 + if c_off + 3 > len(data): + break + timer.colors.append((data[c_off], data[c_off + 1], data[c_off + 2])) + + return timer, min( + (color_start - offset) + num_colors * 5, len(data) - offset + ) + + return timer, min(21, len(data) - offset) + + def __str__(self) -> str: + """Return human-readable string representation.""" + if not self.active: + return f"Timer {self.slot}: Unset" + + on_off = "[ON ]" if self.is_on else "[OFF]" + time_str = f"{self.hour:02}:{self.minute:02}" + repeat_str = ",".join(self.repeat_days) if self.repeat_mask else "Once" + + if self.action_type == TIMER_ACTION_ON: + action_str = "Turn On" + elif self.action_type == TIMER_ACTION_OFF: + action_str = "Turn Off" + elif self.action_type == TIMER_ACTION_COLOR and self.color_hsv: + h, s, v = self.color_hsv + action_str = f"Color (H:{h} S:{s} V:{v})" + elif self.action_type == TIMER_ACTION_SCENE_GRADIENT: + name = ( + self.pattern.name.replace("_", " ").title() + if self.pattern + else "Unknown" + ) + action_str = f"Scene: {name} ({len(self.colors)} colors)" + elif self.action_type == TIMER_ACTION_SCENE_SEGMENTS: + action_str = f"Colorful ({len(self.colors)} colors)" + else: + action_str = f"Unknown action: 0x{self.action_type:02x}" + + return f"Timer {self.slot}: {on_off} {time_str} {repeat_str} {action_str}" diff --git a/tests/test_aio.py b/tests/test_aio.py index 6b0c1b57..481847d9 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import colorsys import contextlib import datetime import json @@ -14,6 +15,8 @@ except ImportError: from unittest.mock import MagicMock as AsyncMock +from optparse import OptionParser + import pytest from flux_led import DeviceUnavailableException, aiodevice, aioscanner @@ -22,6 +25,7 @@ from flux_led.aioscanner import AIOBulbScanner, LEDENETDiscovery from flux_led.const import ( COLOR_MODE_CCT, + COLOR_MODE_DIM, COLOR_MODE_RGB, COLOR_MODE_RGBW, COLOR_MODE_RGBWW, @@ -29,22 +33,36 @@ MAX_TEMP, MIN_TEMP, PUSH_UPDATE_INTERVAL, + TIMER_ACTION_COLOR, + TIMER_ACTION_OFF, + TIMER_ACTION_ON, + TIMER_ACTION_SCENE_GRADIENT, + TIMER_ACTION_SCENE_SEGMENTS, + ExtendedCustomEffectDirection, + ExtendedCustomEffectOption, + ExtendedCustomEffectPattern, MultiColorEffects, WhiteChannelType, ) +from flux_led.fluxled import processSetTimerArgs, processSetTimerArgsExtended +from flux_led.models_db import extract_model_version_from_state from flux_led.protocol import ( + LEDENET_EXTENDED_STATE_RESPONSE_LEN, PROTOCOL_LEDENET_8BYTE_AUTO_ON, PROTOCOL_LEDENET_8BYTE_DIMMABLE_EFFECTS, PROTOCOL_LEDENET_9BYTE, PROTOCOL_LEDENET_25BYTE, PROTOCOL_LEDENET_ADDRESSABLE_CHRISTMAS, + PROTOCOL_LEDENET_EXTENDED_CUSTOM, PROTOCOL_LEDENET_ORIGINAL, LEDENETRawState, PowerRestoreState, PowerRestoreStates, + ProtocolLEDENET8Byte, ProtocolLEDENET25Byte, ProtocolLEDENETCCT, ProtocolLEDENETCCTWrapped, + ProtocolLEDENETExtendedCustom, RemoteConfig, ) from flux_led.scanner import ( @@ -53,7 +71,7 @@ is_legacy_device, merge_discoveries, ) -from flux_led.timer import LedTimer +from flux_led.timer import LedTimer, LedTimerExtended IP_ADDRESS = "127.0.0.1" MODEL_NUM_HEX = "0x35" @@ -4303,3 +4321,1849 @@ def _updated_callback(*args, **kwargs): ) ) assert light.white_active is True + + +@pytest.mark.asyncio +async def test_setup_0xB6_surplife(mock_aio_protocol): + """Test setup of 0xB6 Surplife device with extended state.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + # 0xB6 ONLY responds with extended state format (0xEA 0x81) per models_db.py:1313 + # This tests the extended state code paths in aiodevice.py and base_device.py + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, # Extended state header + 0x01, + 0x00, # Reserved + 0xB6, # Model at position 4 (LEDENET_EXTENDED_STATE_MODEL_POS) + 0x01, # Version at position 5 (LEDENET_EXTENDED_STATE_VERSION_POS) + 0x23, + 0x61, # Power on, mode + 0x24, + 0x64, + 0x0F, # Settings + 0x00, + 0x00, + 0x00, # RGB off + 0x64, + 0x64, # WW/CW values + 0x00, + 0x00, + 0x00, + 0x00, # Padding + 0x83, # Checksum + ) + ) + ) + await task + + assert light.model_num == 0xB6 + assert light.version_num == 1 + assert light.protocol == PROTOCOL_LEDENET_EXTENDED_CUSTOM + assert "Surplife" in light.model + assert light.color_modes == {COLOR_MODE_RGB, COLOR_MODE_DIM} + assert light.supports_extended_custom_effects is True + assert light.microphone is True + + +def test_protocol_extended_state_validation_0xB6(): + """Test protocol methods correctly handle extended state for 0xB6 device.""" + extended_state = bytes( + ( + 0xEA, + 0x81, # Extended state header + 0x01, + 0x00, # Reserved + 0xB6, # Model + 0x01, # Version + 0x23, + 0x61, # Power on, mode + 0x24, + 0x64, + 0x0F, # Settings + 0x00, + 0x00, + 0x00, # RGB off + 0x64, + 0x64, # WW/CW values + 0x00, + 0x00, + 0x00, + 0x00, # Padding + 0x83, # Checksum + ) + ) + + # Test ProtocolLEDENET8Byte validates extended state correctly + protocol_8byte = ProtocolLEDENET8Byte() + assert protocol_8byte.is_valid_extended_state_response(extended_state) + # This covers line 1040-1041: is_valid_state_response should return True for extended state + assert protocol_8byte.is_valid_state_response(extended_state) + + # Test ProtocolLEDENET25Byte validates extended state correctly + protocol_25byte = ProtocolLEDENET25Byte() + assert protocol_25byte.is_valid_extended_state_response(extended_state) + # This covers line 1465-1466: is_valid_state_response should return True for extended state + assert protocol_25byte.is_valid_state_response(extended_state) + + # Test extended_state_to_state default implementation (line 1055) + # ProtocolLEDENET8Byte uses the default implementation which just returns raw_state + assert protocol_8byte.extended_state_to_state(extended_state) == extended_state + + # Test ProtocolLEDENETExtendedCustom - dedicated protocol for 0xB6 + protocol_extended = ProtocolLEDENETExtendedCustom() + assert protocol_extended.name == PROTOCOL_LEDENET_EXTENDED_CUSTOM + # This protocol ONLY accepts extended state format + assert protocol_extended.is_valid_state_response(extended_state) is True + + # Test that ProtocolLEDENETExtendedCustom rejects standard 14-byte state + # This is a minimal standard state (without valid checksum) - just for format testing + standard_state = bytes( + ( + 0x81, # Standard state header + 0xB6, # Model + 0x23, # Power on + 0x61, # Mode + 0x00, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x01, + 0x00, + 0x00, + 0x00, + ) + ) + # ProtocolLEDENETExtendedCustom should reject standard state (only accepts 0xEA 0x81) + assert protocol_extended.is_valid_state_response(standard_state) is False + + +# Extended Custom Effect Tests (for devices with supports_extended_custom_effects=True) + + +def test_extended_custom_effect_pattern_enum_values(): + """Test ExtendedCustomEffectPattern enum has expected values.""" + assert ExtendedCustomEffectPattern.WAVE.value == 0x01 + assert ExtendedCustomEffectPattern.METEOR.value == 0x02 + assert ExtendedCustomEffectPattern.STREAMER.value == 0x03 + assert ExtendedCustomEffectPattern.BUILDING_BLOCKS.value == 0x04 + assert ExtendedCustomEffectPattern.BREATHE.value == 0x09 + assert ExtendedCustomEffectPattern.STATIC_GRADIENT.value == 0x65 + assert ExtendedCustomEffectPattern.STATIC_FILL.value == 0x66 + + +def test_extended_custom_effect_direction_enum_values(): + """Test ExtendedCustomEffectDirection enum has expected values.""" + assert ExtendedCustomEffectDirection.LEFT_TO_RIGHT.value == 0x01 + assert ExtendedCustomEffectDirection.RIGHT_TO_LEFT.value == 0x02 + + +def test_extended_custom_effect_option_enum_values(): + """Test ExtendedCustomEffectOption enum has expected values.""" + assert ExtendedCustomEffectOption.DEFAULT.value == 0x00 + assert ExtendedCustomEffectOption.VARIANT_1.value == 0x01 + assert ExtendedCustomEffectOption.VARIANT_2.value == 0x02 + + +def test_construct_extended_custom_effect_single_color(): + """Test constructing an extended custom effect with single color.""" + proto = ProtocolLEDENETExtendedCustom() + + # Single red color, pattern Wave, default settings + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=[(255, 0, 0)], + speed=50, + density=50, + direction=0x01, + option=0x00, + ) + + # Result should be a wrapped message + assert isinstance(result, bytearray) + assert len(result) > 0 + + # Check the wrapper header (b0 b1 b2 b3) + assert result[0] == 0xB0 + assert result[1] == 0xB1 + assert result[2] == 0xB2 + assert result[3] == 0xB3 + + +def test_construct_extended_custom_effect_multiple_colors(): + """Test constructing an extended custom effect with multiple colors.""" + proto = ProtocolLEDENETExtendedCustom() + + # Three colors: red, green, blue + colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)] + result = proto.construct_extended_custom_effect( + pattern_id=2, # Meteor + colors=colors, + speed=80, + density=100, + direction=0x02, # Right to Left + option=0x01, # Color change + ) + + assert isinstance(result, bytearray) + assert len(result) > 0 + + +def test_construct_extended_custom_effect_color_order(): + """Test that colors are stored in input order.""" + proto = ProtocolLEDENETExtendedCustom() + + # Two distinct colors + colors = [(255, 0, 0), (0, 0, 255)] # Red, Blue + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=colors, + speed=50, + density=50, + ) + + # The message structure after wrapper: + # Inner message starts after wrapper header + length bytes + # Find the color data section (after the 16-byte header) + # Colors are 5 bytes each (H, S, V, 0, 0) in input order + + # Red (255, 0, 0) in HSV: H=0, S=100, V=100 -> stored as (0, 100, 100) + # Blue (0, 0, 255) in HSV: H=240, S=100, V=100 -> stored as (120, 100, 100) + + # Red should come first in the message (same order as input) + assert isinstance(result, bytearray) + + +def test_construct_extended_custom_effect_hsv_conversion(): + """Test RGB to HSV conversion accuracy.""" + proto = ProtocolLEDENETExtendedCustom() + + # Test with a known color: pure green + colors = [(0, 255, 0)] + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=colors, + ) + + # Green in HSV: H=120, S=100%, V=100% + # Stored as: H/2=60, S=100, V=100 + # Verify the message was constructed + assert isinstance(result, bytearray) + assert len(result) > 0 + + +def test_construct_extended_custom_effect_speed_clamping(): + """Test that speed is clamped to 0-100.""" + proto = ProtocolLEDENETExtendedCustom() + + # Speed > 100 should be clamped + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=[(255, 0, 0)], + speed=150, + ) + assert isinstance(result, bytearray) + + # Speed < 0 should be clamped + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=[(255, 0, 0)], + speed=-10, + ) + assert isinstance(result, bytearray) + + +def test_construct_extended_custom_effect_density_clamping(): + """Test that density is clamped to 0-100.""" + proto = ProtocolLEDENETExtendedCustom() + + # Density > 100 should be clamped + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=[(255, 0, 0)], + density=200, + ) + assert isinstance(result, bytearray) + + +def test_construct_extended_custom_effect_max_colors(): + """Test constructing an effect with maximum 8 colors.""" + proto = ProtocolLEDENETExtendedCustom() + + # 8 colors (maximum) + colors = [ + (255, 0, 0), + (255, 128, 0), + (255, 255, 0), + (0, 255, 0), + (0, 255, 255), + (0, 0, 255), + (128, 0, 255), + (255, 0, 255), + ] + result = proto.construct_extended_custom_effect( + pattern_id=1, + colors=colors, + ) + + assert isinstance(result, bytearray) + # Each color is 5 bytes, 8 colors = 40 bytes for colors + # Plus 16 bytes header = 56 bytes inner message + # Plus wrapper overhead + + +def test_construct_extended_custom_effect_with_enums(): + """Test using enum values for parameters.""" + proto = ProtocolLEDENETExtendedCustom() + + result = proto.construct_extended_custom_effect( + pattern_id=ExtendedCustomEffectPattern.WAVE, + colors=[(255, 0, 0)], + speed=50, + density=50, + direction=ExtendedCustomEffectDirection.RIGHT_TO_LEFT, + option=ExtendedCustomEffectOption.VARIANT_1, + ) + + assert isinstance(result, bytearray) + + +def test_construct_extended_custom_effect_with_variant_2(): + """Test using VARIANT_2 option (e.g., breathe mode for rainbow patterns).""" + proto = ProtocolLEDENETExtendedCustom() + + # Rainbow colors with VARIANT_2 option + colors = [ + (255, 0, 0), # Red + (255, 255, 0), # Yellow + (0, 255, 0), # Green + (0, 255, 255), # Cyan + (0, 0, 255), # Blue + (255, 0, 255), # Magenta + ] + result = proto.construct_extended_custom_effect( + pattern_id=12, # Twinkling stars + colors=colors, + speed=60, + density=100, + direction=ExtendedCustomEffectDirection.LEFT_TO_RIGHT, + option=ExtendedCustomEffectOption.VARIANT_2, + ) + + assert isinstance(result, bytearray) + assert len(result) > 0 + + +def test_extended_custom_effect_hsv_values(): + """Test specific HSV value calculations.""" + # Test the HSV conversion formula used in the protocol + # RGB (255, 0, 0) -> HSV (0, 100%, 100%) -> stored as (0, 100, 100) + r, g, b = 255, 0, 0 + h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) + hsv_h = int(h * 180) + hsv_s = int(s * 100) + hsv_v = int(v * 100) + + assert hsv_h == 0 # Red hue + assert hsv_s == 100 # Full saturation + assert hsv_v == 100 # Full value + + # RGB (0, 0, 255) -> HSV (240, 100%, 100%) -> stored as (120, 100, 100) + r, g, b = 0, 0, 255 + h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) + hsv_h = int(h * 180) + hsv_s = int(s * 100) + hsv_v = int(v * 100) + + assert hsv_h == 120 # Blue hue (240/2) + assert hsv_s == 100 + assert hsv_v == 100 + + # RGB (0, 255, 0) -> HSV (120, 100%, 100%) -> stored as (60, 100, 100) + r, g, b = 0, 255, 0 + h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) + hsv_h = int(h * 180) + hsv_s = int(s * 100) + hsv_v = int(v * 100) + + assert hsv_h == 60 # Green hue (120/2) + assert hsv_s == 100 + assert hsv_v == 100 + + +# Tests for _generate_extended_custom_effect validation (base_device.py lines 1335-1360) + + +@pytest.mark.asyncio +async def test_generate_extended_custom_effect_validation(mock_aio_protocol): + """Test validation in _generate_extended_custom_effect.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + # Setup 0xB6 device with extended state + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + assert light.protocol == PROTOCOL_LEDENET_EXTENDED_CUSTOM + + # Test invalid pattern_id (0 is not valid) + with pytest.raises(ValueError, match="Pattern ID must be 1-24 or 101-102"): + light._generate_extended_custom_effect(0, [(255, 0, 0)]) + + # Test invalid pattern_id (25 is not valid) + with pytest.raises(ValueError, match="Pattern ID must be 1-24 or 101-102"): + light._generate_extended_custom_effect(25, [(255, 0, 0)]) + + # Test empty colors list + with pytest.raises(ValueError, match="at least one color"): + light._generate_extended_custom_effect(1, []) + + # Test invalid color tuple (not 3 elements) + with pytest.raises(ValueError, match=r"must be .* tuple"): + light._generate_extended_custom_effect(1, [(255, 0)]) + + # Test color values out of range (> 255) + with pytest.raises(ValueError, match="must be 0-255"): + light._generate_extended_custom_effect(1, [(256, 0, 0)]) + + # Test color values out of range (< 0) + with pytest.raises(ValueError, match="must be 0-255"): + light._generate_extended_custom_effect(1, [(-1, 0, 0)]) + + # Test valid pattern_id 101 (STATIC_GRADIENT) + result = light._generate_extended_custom_effect(101, [(255, 0, 0)]) + assert isinstance(result, bytearray) + + # Test valid pattern_id 102 (STATIC_FILL) + result = light._generate_extended_custom_effect(102, [(255, 0, 0)]) + assert isinstance(result, bytearray) + + +@pytest.mark.asyncio +async def test_generate_extended_custom_effect_truncate_colors( + mock_aio_protocol, caplog: pytest.LogCaptureFixture +): + """Test that too many colors (>8) are truncated with warning.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + # 10 colors (more than 8 max) + colors = [(i * 25, 0, 0) for i in range(10)] + + with caplog.at_level(logging.WARNING): + result = light._generate_extended_custom_effect(1, colors) + + assert isinstance(result, bytearray) + assert "truncating" in caplog.text.lower() + + +# Tests for _generate_custom_segment_colors validation (base_device.py lines 1376-1392) + + +@pytest.mark.asyncio +async def test_generate_custom_segment_colors_validation(mock_aio_protocol): + """Test validation in _generate_custom_segment_colors.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + # Test invalid color tuple (not 3 elements) + with pytest.raises(ValueError, match=r"must be .* tuple"): + light._generate_custom_segment_colors([(255, 0)]) + + # Test color values out of range (> 255) + with pytest.raises(ValueError, match="must be 0-255"): + light._generate_custom_segment_colors([(256, 0, 0)]) + + # Test valid segments with None + result = light._generate_custom_segment_colors([None, (255, 0, 0), None]) + assert isinstance(result, bytearray) + + +@pytest.mark.asyncio +async def test_generate_custom_segment_colors_truncate( + mock_aio_protocol, caplog: pytest.LogCaptureFixture +): + """Test that too many segments (>20) are truncated with warning.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + # 25 segments (more than 20 max) + segments = [(i * 10, 0, 0) for i in range(25)] + + with caplog.at_level(logging.WARNING): + result = light._generate_custom_segment_colors(segments) + + assert isinstance(result, bytearray) + assert "truncating" in caplog.text.lower() + + +# Tests for construct_levels_change (protocol.py lines 1826-1861) + + +def test_protocol_construct_levels_change_0xB6(): + """Test construct_levels_change uses STATIC_FILL for 0xB6 protocol.""" + proto = ProtocolLEDENETExtendedCustom() + + # Test with RGB values + result = proto.construct_levels_change( + persist=1, + red=255, + green=0, + blue=0, + warm_white=0, + cool_white=0, + write_mode=0, + ) + + assert len(result) == 1 + msg = result[0] + assert isinstance(msg, bytearray) + # Check wrapper header + assert msg[0] == 0xB0 + assert msg[1] == 0xB1 + assert msg[2] == 0xB2 + assert msg[3] == 0xB3 + + +def test_protocol_construct_levels_change_with_white(): + """Test construct_levels_change combines warm and cool white.""" + proto = ProtocolLEDENETExtendedCustom() + + # Test with white values + result = proto.construct_levels_change( + persist=1, + red=0, + green=0, + blue=0, + warm_white=100, + cool_white=50, + write_mode=0, + ) + + assert len(result) == 1 + msg = result[0] + assert isinstance(msg, bytearray) + + +def test_protocol_construct_levels_change_white_clamping(): + """Test that combined white is clamped to 255.""" + proto = ProtocolLEDENETExtendedCustom() + + # Test with white values that exceed 255 when combined + result = proto.construct_levels_change( + persist=1, + red=0, + green=0, + blue=0, + warm_white=200, + cool_white=200, # Total would be 400, should clamp to 255 + write_mode=0, + ) + + assert len(result) == 1 + assert isinstance(result[0], bytearray) + + +def test_protocol_extended_state_to_state_white_off(): + """Test extended_state_to_state when white is off (white_brightness=0).""" + proto = ProtocolLEDENETExtendedCustom() + + # Extended state with white_brightness=0 (position 15) + extended_state = bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, # Model + 0x01, # Version + 0x23, # Power on + 0x61, # Mode + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0xFF, # white_temp (255 > 100, triggers branch) + 0x00, # white_brightness = 0 (triggers branch) + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + + result = proto.extended_state_to_state(extended_state) + assert len(result) == 14 + # cool_white and warm_white should be 0 + assert result[9] == 0 # warm_white + assert result[11] == 0 # cool_white + + +def test_protocol_rgb_to_hsv_bytes_rgbw(): + """Test _rgb_to_hsv_bytes_rgbw conversion.""" + proto = ProtocolLEDENETExtendedCustom() + + # Test pure red with white + result = proto._rgb_to_hsv_bytes_rgbw(255, 0, 0, 100) + assert len(result) == 5 + assert result[0] == 0 # Hue (red = 0) + assert result[1] == 100 # Saturation + assert result[2] == 100 # Value + assert result[3] == 0x00 # Unused + assert result[4] == 100 # White + + # Test pure green + result = proto._rgb_to_hsv_bytes_rgbw(0, 255, 0, 0) + assert result[0] == 60 # Hue (green = 120/2) + + # Test pure blue + result = proto._rgb_to_hsv_bytes_rgbw(0, 0, 255, 255) + assert result[0] == 120 # Hue (blue = 240/2) + assert result[4] == 255 # White + + +# Tests for construct_custom_segment_colors (protocol.py lines 1971-1980) + + +def test_protocol_construct_custom_segment_colors(): + """Test construct_custom_segment_colors command format.""" + proto = ProtocolLEDENETExtendedCustom() + + # Test with a few segments + segments = [(255, 0, 0), None, (0, 255, 0)] + result = proto.construct_custom_segment_colors(segments) + + assert isinstance(result, bytearray) + # Check wrapper header + assert result[0] == 0xB0 + assert result[1] == 0xB1 + assert result[2] == 0xB2 + assert result[3] == 0xB3 + + +def test_protocol_construct_custom_segment_colors_all_off(): + """Test construct_custom_segment_colors with all segments off.""" + proto = ProtocolLEDENETExtendedCustom() + + # All None segments + segments = [None] * 10 + result = proto.construct_custom_segment_colors(segments) + + assert isinstance(result, bytearray) + + +def test_protocol_construct_custom_segment_colors_zero_tuple(): + """Test that (0,0,0) is treated as off.""" + proto = ProtocolLEDENETExtendedCustom() + + # Mix of None and (0,0,0) + segments = [None, (0, 0, 0), (255, 0, 0)] + result = proto.construct_custom_segment_colors(segments) + + assert isinstance(result, bytearray) + + +# Tests for async API methods + + +@pytest.mark.asyncio +async def test_async_set_extended_custom_effect_0xB6(mock_aio_protocol): + """Test async_set_extended_custom_effect sends correct bytes.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + transport, _protocol = await mock_aio_protocol() + + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + transport.reset_mock() + + await light.async_set_extended_custom_effect( + pattern_id=1, + colors=[(255, 0, 0), (0, 255, 0)], + speed=50, + density=50, + ) + + assert transport.write.called + written_data = transport.write.call_args[0][0] + # Verify it's a wrapped message + assert written_data[0] == 0xB0 + assert written_data[1] == 0xB1 + + +@pytest.mark.asyncio +async def test_async_set_custom_segment_colors_0xB6(mock_aio_protocol): + """Test async_set_custom_segment_colors sends correct bytes.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + transport, _protocol = await mock_aio_protocol() + + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + transport.reset_mock() + + await light.async_set_custom_segment_colors( + segments=[(255, 0, 0), None, (0, 0, 255)] + ) + + assert transport.write.called + written_data = transport.write.call_args[0][0] + # Verify it's a wrapped message + assert written_data[0] == 0xB0 + assert written_data[1] == 0xB1 + + +# Tests for extended_custom_effect_pattern_list property (base_device.py line 658) + + +@pytest.mark.asyncio +async def test_extended_custom_effect_pattern_list_0xB6(mock_aio_protocol): + """Test extended_custom_effect_pattern_list returns list for 0xB6 device.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + # Test extended_custom_effect_pattern_list returns a list + pattern_list = light.extended_custom_effect_pattern_list + assert pattern_list is not None + assert isinstance(pattern_list, list) + assert len(pattern_list) > 0 + # Check some expected patterns + assert "wave" in pattern_list + assert "meteor" in pattern_list + assert "breathe" in pattern_list + + +@pytest.mark.asyncio +async def test_extended_custom_effect_pattern_list_non_0xB6(mock_aio_protocol): + """Test extended_custom_effect_pattern_list returns None for non-0xB6 device.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + # Standard 0x25 device (not extended custom) + light._aio_protocol.data_received( + b"\x81\x25\x23\x61\x05\x10\xb6\x00\x98\x19\x04\x25\x0f\xde" + ) + await task + + # Should return None for non-extended devices + assert light.extended_custom_effect_pattern_list is None + + +# Tests for _named_effect with extended custom effects (base_device.py line 676) + + +@pytest.mark.asyncio +async def test_named_effect_extended_custom_0xB6(mock_aio_protocol): + """Test _named_effect returns extended effect name for 0xB6 in custom mode.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + # 0xB6 device with preset_pattern=0x25 (custom effect mode) and mode=0x01 (Wave) + # Extended state: preset_pattern at pos 7, mode at pos 8 + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, # Model + 0x01, # Version + 0x23, # Power on + 0x25, # preset_pattern = 0x25 (custom effect mode) + 0x01, # mode = Wave pattern ID + 0x64, # speed + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + # Effect should be "Wave" from EXTENDED_CUSTOM_EFFECT_ID_NAME + assert light.effect == "Wave" + + +@pytest.mark.asyncio +async def test_named_effect_extended_custom_meteor(mock_aio_protocol): + """Test _named_effect returns Meteor for mode=0x02.""" + light = AIOWifiLedBulb("192.168.1.166") + + def _updated_callback(*args, **kwargs): + pass + + task = asyncio.create_task(light.async_setup(_updated_callback)) + await mock_aio_protocol() + + # 0xB6 device with mode=0x02 (Meteor) + light._aio_protocol.data_received( + bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, + 0x01, + 0x23, + 0x25, # preset_pattern = 0x25 + 0x02, # mode = Meteor + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + ) + await task + + assert light.effect == "Meteor" + + +# Tests for extract_model_version_from_state (models_db.py) + + +def test_extract_model_version_from_extended_state(): + """Test extract_model_version_from_state with extended state format.""" + # Extended state format (0xEA 0x81) + extended_state = bytes( + ( + 0xEA, + 0x81, + 0x01, + 0x00, + 0xB6, # Model at position 4 + 0x05, # Version at position 5 + 0x23, + 0x61, + 0x24, + 0x64, + 0x0F, + 0x00, + 0x00, + 0x00, + 0x64, + 0x64, + 0x00, + 0x00, + 0x00, + 0x00, + 0x83, + ) + ) + + model_num, version_num = extract_model_version_from_state(extended_state) + assert model_num == 0xB6 + assert version_num == 5 + + +def test_extract_model_version_from_standard_state(): + """Test extract_model_version_from_state with standard state format.""" + # Standard state format (0x81) + standard_state = bytes( + ( + 0x81, + 0x25, # Model at position 1 + 0x23, + 0x61, + 0x05, + 0x10, + 0xB6, + 0x00, + 0x98, + 0x19, + 0x04, # Version at position 10 + 0x25, + 0x0F, + 0xDE, + ) + ) + + model_num, version_num = extract_model_version_from_state(standard_state) + assert model_num == 0x25 + assert version_num == 4 + + +def test_extract_model_version_short_standard_state(): + """Test extract_model_version_from_state with short standard state (no version).""" + # Short standard state without version byte + short_state = bytes((0x81, 0x33, 0x23, 0x61, 0x05, 0x10, 0xB6, 0x00, 0x98, 0x19)) + + model_num, version_num = extract_model_version_from_state(short_state) + assert model_num == 0x33 + assert version_num == 1 # Default when not present + + +# Tests for protocol edge cases + + +def test_protocol_extended_custom_state_response_length(): + """Test state_response_length property returns correct value.""" + proto = ProtocolLEDENETExtendedCustom() + assert proto.state_response_length == LEDENET_EXTENDED_STATE_RESPONSE_LEN + assert proto.state_response_length == 21 + + +def test_protocol_extended_state_to_state_short_input(): + """Test extended_state_to_state returns empty bytes for short input.""" + proto = ProtocolLEDENETExtendedCustom() + + # Input too short (< 20 bytes) + short_state = bytes((0xEA, 0x81, 0x01, 0x00, 0xB6)) + result = proto.extended_state_to_state(short_state) + assert result == b"" + + +def test_protocol_named_raw_state_standard_format(): + """Test named_raw_state with standard 14-byte format.""" + proto = ProtocolLEDENETExtendedCustom() + + # Standard 14-byte state (should pass through directly) + standard_state = bytes( + ( + 0x81, + 0xB6, + 0x23, + 0x61, + 0x00, + 0x64, + 0xFF, + 0x00, + 0x00, + 0x00, + 0x01, + 0x00, + 0x00, + 0x00, + ) + ) + result = proto.named_raw_state(standard_state) + assert result.head == 0x81 + assert result.model_num == 0xB6 + + +# Tests for 0xB6 Extended Timer Support + + +def test_protocol_construct_get_timers_0xB6(): + """Test timer query construction for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + msg = proto.construct_get_timers() + + # Should be wrapped message with inner content [0xE0, 0x06] + assert msg[0:6] == bytes([0xB0, 0xB1, 0xB2, 0xB3, 0x00, 0x01]) + # Version byte at position 6 + assert msg[6] == 0x01 + # Inner message length at positions 8-9 (should be 2) + assert msg[8] == 0x00 + assert msg[9] == 0x02 + # Inner message starts at position 10 + assert msg[10] == 0xE0 + assert msg[11] == 0x06 + + +def test_protocol_is_valid_timers_response_0xB6(): + """Test timer response validation for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + + # Valid response starts with e0 06 + assert proto.is_valid_timers_response(bytes([0xE0, 0x06])) is True + assert proto.is_valid_timers_response(bytes([0xE0, 0x06, 0x01, 0x02])) is True + + # Invalid responses + assert proto.is_valid_timers_response(bytes([0xE0])) is False + assert proto.is_valid_timers_response(bytes([0x01, 0x22])) is False + assert proto.is_valid_timers_response(bytes([])) is False + + +def test_protocol_parse_get_timers_empty_0xB6(): + """Test parsing empty timer response for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + + # Empty response (no timers configured) + response = bytes([0xE0, 0x06]) + timers = proto.parse_get_timers(response) + + assert len(timers) == 0 + + +def test_protocol_parse_get_timers_single_0xB6(): + """Test parsing single timer response for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + + # Response with one timer: 13:25 OFF, no repeat + # Format: e0 06 [slot1: 21 bytes] [empty slots 2-6: 7 bytes each] + response = bytes.fromhex( + "e006" # Header + "01f00d1900000ee001002400000000000000000000" # Slot 1: 13:25 OFF + "0264640000690003000000000000040000000000000500000000000006000000000000" # Slots 2-6 empty + ) + timers = proto.parse_get_timers(response) + + assert len(timers) == 6 + + # First timer should be active + timer1 = timers[0] + assert timer1.slot == 1 + assert timer1.active is True + assert timer1.hour == 13 + assert timer1.minute == 25 + assert timer1.repeat_mask == 0 + assert timer1.action_type == 0x24 # OFF + + # Remaining timers should be inactive + for i in range(1, 6): + assert timers[i].active is False + + +def test_protocol_parse_get_timers_multiple_0xB6(): + """Test parsing multiple timer response for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + + # Response with three timers (from actual packet capture) + # Format: e0 06 [slot1: 21 bytes] [slot2: 21 bytes] [slot3: 21 bytes] [slots 4-6: 7 bytes each] + response = bytes.fromhex( + "e006" # Header + "01f00d1900000ee001002400000000000000000000" # Slot 1: 13:25 OFF + "02f00f2300000ee001002400000000000000000000" # Slot 2: 15:35 OFF + "03f00d2100000ee001002400000000000000000000" # Slot 3: 13:33 OFF + "04000000000000" # Slot 4 empty + "05000000000000" # Slot 5 empty + "06000000000000" # Slot 6 empty + ) + timers = proto.parse_get_timers(response) + + assert len(timers) == 6 + + # Check active timers + assert timers[0].slot == 1 + assert timers[0].hour == 13 + assert timers[0].minute == 25 + assert timers[0].active is True + + assert timers[1].slot == 2 + assert timers[1].hour == 15 + assert timers[1].minute == 35 + assert timers[1].active is True + + assert timers[2].slot == 3 + assert timers[2].hour == 13 + assert timers[2].minute == 33 + assert timers[2].active is True + + # Check inactive timers + assert timers[3].active is False + assert timers[4].active is False + assert timers[5].active is False + + +def test_protocol_parse_get_timers_with_color_0xB6(): + """Test parsing timer with color action for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + + # Build a timer response with color action (0xa1) + # Slot 1: 17:15, Sun+Tue repeat (0x84), color action with HSV + inner_slot1 = bytes.fromhex("01f0110f00840ee00100a17be432000000000000") + inner_empty = bytes.fromhex( + "02000000000000030000000000000400000000000005000000000000060000000000" + ) + + response = bytes([0xE0, 0x06]) + inner_slot1 + inner_empty + timers = proto.parse_get_timers(response) + + # Check first timer + timer1 = timers[0] + assert timer1.slot == 1 + assert timer1.hour == 17 + assert timer1.minute == 15 + assert timer1.repeat_mask == 0x84 # Sun + Tue + assert timer1.action_type == 0xA1 # Color + assert timer1.color_hsv == (0x7B, 0xE4, 0x32) # (123, 228, 50) + + +def test_led_timer_extended_simple_on(): + """Test LedTimerExtended for simple ON action.""" + timer = LedTimerExtended( + slot=1, + active=True, + hour=8, + minute=30, + repeat_mask=LedTimerExtended.Weekdays, + action_type=TIMER_ACTION_ON, + ) + + assert timer.is_on is True + assert timer.is_scene is False + assert timer.is_color is False + assert timer.repeat_days == ["Mon", "Tue", "Wed", "Thu", "Fri"] + + # Test serialization + data = timer.to_bytes() + assert data[0] == 0xF0 # Active flag + assert data[1] == 8 # Hour + assert data[2] == 30 # Minute + assert data[9] == 0x23 # ON action + + +def test_led_timer_extended_simple_off(): + """Test LedTimerExtended for simple OFF action.""" + timer = LedTimerExtended( + slot=2, + active=True, + hour=22, + minute=0, + repeat_mask=0, # No repeat + action_type=TIMER_ACTION_OFF, + ) + + assert timer.is_on is False + assert timer.repeat_days == [] + + data = timer.to_bytes() + assert data[9] == 0x24 # OFF action + + +def test_led_timer_extended_color(): + """Test LedTimerExtended for color action.""" + timer = LedTimerExtended( + slot=1, + active=True, + hour=17, + minute=15, + repeat_mask=0x84, # Sun + Tue + action_type=TIMER_ACTION_COLOR, + color_hsv=(123, 228, 50), + ) + + assert timer.is_on is True + assert timer.is_color is True + assert timer.repeat_days == ["Tue", "Sun"] + + data = timer.to_bytes() + assert data[9] == 0xA1 # Color action + assert data[10] == 123 # Hue + assert data[11] == 228 # Saturation + assert data[12] == 50 # Brightness + + +def test_led_timer_extended_inactive(): + """Test LedTimerExtended for inactive timer.""" + timer = LedTimerExtended( + slot=3, + active=False, + ) + + data = timer.to_bytes() + # Inactive timer should be all zeros + assert data == bytes(20) + + +def test_led_timer_extended_from_bytes_simple(): + """Test LedTimerExtended.from_bytes for simple timer.""" + # Simple OFF timer: slot 1, 13:25, no repeat (21 bytes) + data = bytes.fromhex("01f00d1900000ee001002400000000000000000000") + + timer, consumed = LedTimerExtended.from_bytes(data, 0) + + assert consumed == 21 + assert timer.slot == 1 + assert timer.active is True + assert timer.hour == 13 + assert timer.minute == 25 + assert timer.repeat_mask == 0 + assert timer.action_type == 0x24 # OFF + + +def test_led_timer_extended_from_bytes_empty(): + """Test LedTimerExtended.from_bytes for empty slot.""" + # Empty slot + data = bytes.fromhex("0300000000000000") + + timer, consumed = LedTimerExtended.from_bytes(data, 0) + + assert consumed == 7 + assert timer.slot == 3 + assert timer.active is False + + +def test_led_timer_extended_str(): + """Test LedTimerExtended string representation.""" + timer_off = LedTimerExtended( + slot=1, active=True, hour=22, minute=0, action_type=TIMER_ACTION_OFF + ) + assert "OFF" in str(timer_off) + assert "22:00" in str(timer_off) + + timer_on = LedTimerExtended( + slot=2, + active=True, + hour=8, + minute=30, + repeat_mask=LedTimerExtended.Weekdays, + action_type=TIMER_ACTION_ON, + ) + assert "ON" in str(timer_on) + assert "Mon" in str(timer_on) + + timer_inactive = LedTimerExtended(slot=3, active=False) + assert "Unset" in str(timer_inactive) + + +def test_protocol_construct_set_timer_0xB6(): + """Test timer set construction for 0xB6 device.""" + proto = ProtocolLEDENETExtendedCustom() + + timer = LedTimerExtended( + slot=1, + active=True, + hour=13, + minute=25, + repeat_mask=0, + action_type=TIMER_ACTION_OFF, + ) + + msg = proto.construct_set_timer(timer) + + # Should be wrapped message + assert msg[0:6] == bytes([0xB0, 0xB1, 0xB2, 0xB3, 0x00, 0x01]) + # Version byte + assert msg[6] == 0x01 + # Inner message starts at position 10 + assert msg[10] == 0xE0 # Extended command + assert msg[11] == 0x05 # Set timer command + assert msg[12] == 0x01 # Slot number + + +def test_led_timer_extended_scene_gradient(): + """Test LedTimerExtended for scene gradient action.""" + timer = LedTimerExtended( + slot=1, + active=True, + hour=18, + minute=30, + repeat_mask=LedTimerExtended.Everyday, + action_type=TIMER_ACTION_SCENE_GRADIENT, + pattern=ExtendedCustomEffectPattern.WAVE, + speed=80, + colors=[(100, 100, 100), (50, 50, 50)], + ) + + assert timer.is_on is True + assert timer.is_scene is True + assert timer.is_color is False + + # Test serialization + data = timer.to_bytes() + assert data[0] == 0xF0 # Active flag + assert data[1] == 18 # Hour + assert data[2] == 30 # Minute + assert data[5] == TIMER_ACTION_SCENE_GRADIENT + assert data[6] == 0xE1 # Effect marker + assert data[7] == 0x21 # Gradient effect type + + # Test __str__ + s = str(timer) + assert "Scene: Wave" in s + assert "2 colors" in s + + +def test_led_timer_extended_scene_segments(): + """Test LedTimerExtended for scene segments (colorful) action.""" + timer = LedTimerExtended( + slot=2, + active=True, + hour=20, + minute=0, + repeat_mask=LedTimerExtended.Weekend, + action_type=TIMER_ACTION_SCENE_SEGMENTS, + colors=[(180, 100, 100), (0, 100, 100), (60, 100, 100)], + ) + + assert timer.is_scene is True + + # Test serialization + data = timer.to_bytes() + assert data[5] == TIMER_ACTION_SCENE_SEGMENTS + assert data[6] == 0xE1 # Effect marker + assert data[7] == 0x22 # Segments effect type + + # Test __str__ + s = str(timer) + assert "Colorful" in s + assert "3 colors" in s + + +def test_led_timer_extended_from_bytes_scene_gradient(): + """Test LedTimerExtended.from_bytes for scene gradient timer.""" + # Scene gradient timer from real device data: + # slot=4, 18:38, repeat=0x0f, action=0x29, e1 21 header, 5 colors + data = bytes.fromhex( + "04f0122600f629e12100500300016450000000000000050c646400001e646400005a646400006ce464000096646400" + ) + + timer, _consumed = LedTimerExtended.from_bytes(data, 0) + + assert timer.slot == 4 + assert timer.active is True + assert timer.hour == 18 + assert timer.minute == 38 + assert timer.repeat_mask == 0xF6 # All days except bit 0 and 3 + assert timer.action_type == TIMER_ACTION_SCENE_GRADIENT + assert len(timer.colors) == 5 + # First color: 0c 64 64 = (12, 100, 100) + assert timer.colors[0] == (0x0C, 0x64, 0x64) + + +def test_led_timer_extended_from_bytes_scene_segments(): + """Test LedTimerExtended.from_bytes for scene segments timer.""" + # Scene segments timer: slot=1, 12:00, e1 22 header, 3 colors + # Construct a minimal segments timer + data = bytearray() + data.append(0x01) # slot + data.append(0xF0) # flags + data.append(12) # hour + data.append(0) # minute + data.append(0) # seconds + data.append(0) # repeat + data.append(0x6B) # action (segments) + data.append(0xE1) # effect marker + data.append(0x22) # segments type + data.extend([0, 0, 0, 0]) # header padding + data.append(3) # num colors + # 3 colors (5 bytes each) + data.extend([100, 100, 50, 0, 0]) + data.extend([50, 80, 60, 0, 0]) + data.extend([150, 90, 70, 0, 0]) + + timer, _consumed = LedTimerExtended.from_bytes(bytes(data), 0) + + assert timer.slot == 1 + assert timer.active is True + assert timer.action_type == TIMER_ACTION_SCENE_SEGMENTS + assert len(timer.colors) == 3 + assert timer.colors[0] == (100, 100, 50) + assert timer.colors[1] == (50, 80, 60) + assert timer.colors[2] == (150, 90, 70) + + +def test_led_timer_extended_str_unknown_action(): + """Test LedTimerExtended.__str__ for unknown action type.""" + timer = LedTimerExtended( + slot=1, + active=True, + hour=10, + minute=30, + action_type=0xFF, # Unknown action + ) + + s = str(timer) + assert "Unknown action: 0xff" in s + + +def test_led_timer_extended_from_bytes_truncated(): + """Test LedTimerExtended.from_bytes handles truncated data.""" + # Only 5 bytes - not enough for a valid timer + data = bytes([0x01, 0xF0, 12, 30, 0]) + _timer, consumed = LedTimerExtended.from_bytes(data, 0) + assert consumed == 5 # Returns what's available + + +# ============================================================================= +# CLI Timer Parsing Tests +# ============================================================================= + + +def test_cli_process_set_timer_args_extended_inactive(): + """Test CLI parsing for inactive extended timer.""" + parser = OptionParser() + timer = processSetTimerArgsExtended(parser, ["1", "inactive", ""]) + assert timer.slot == 1 + assert timer.active is False + assert timer.action_type == TIMER_ACTION_OFF + + +def test_cli_process_set_timer_args_extended_poweroff(): + """Test CLI parsing for poweroff extended timer.""" + parser = OptionParser() + timer = processSetTimerArgsExtended( + parser, ["2", "poweroff", "time:1430;repeat:12345"] + ) + assert timer.slot == 2 + assert timer.active is True + assert timer.hour == 14 + assert timer.minute == 30 + assert timer.action_type == TIMER_ACTION_OFF + # repeat 12345 = Mon|Tue|Wed|Thu|Fri = bits 1,2,3,4,5 = 0x3E + assert timer.repeat_mask == 0x3E + + +def test_cli_process_set_timer_args_extended_default_on(): + """Test CLI parsing for default (on) extended timer.""" + parser = OptionParser() + timer = processSetTimerArgsExtended(parser, ["3", "default", "time:0830;repeat:06"]) + assert timer.slot == 3 + assert timer.active is True + assert timer.hour == 8 + assert timer.minute == 30 + assert timer.action_type == TIMER_ACTION_ON + # repeat 06 = Sun|Sat = bits 7,6 = 0x80|0x40 = 0xC0 + assert timer.repeat_mask == 0xC0 + + +def test_cli_process_set_timer_args_extended_color(): + """Test CLI parsing for color extended timer.""" + parser = OptionParser() + # Use color name instead of hex code (hex needs # prefix) + timer = processSetTimerArgsExtended( + parser, ["4", "color", "time:2100;repeat:0123456;color:red"] + ) + assert timer.slot == 4 + assert timer.active is True + assert timer.hour == 21 + assert timer.minute == 0 + assert timer.action_type == TIMER_ACTION_COLOR + assert timer.color_hsv is not None + # red should have hue=0 + assert timer.color_hsv[0] == 0 # hue + + +def test_cli_process_set_timer_args_extended_color_with_brightness(): + """Test CLI parsing for color extended timer with brightness.""" + parser = OptionParser() + # Use hex with # prefix + timer = processSetTimerArgsExtended( + parser, ["5", "color", "time:1200;repeat:1;color:#00ff00;brightness:50"] + ) + assert timer.slot == 5 + assert timer.active is True + assert timer.action_type == TIMER_ACTION_COLOR + assert timer.color_hsv is not None + # Check brightness is 50 + assert timer.color_hsv[2] == 50 + + +def test_cli_process_set_timer_args_extended_weekend_repeat(): + """Test CLI parsing for weekend repeat (0=Sun, 6=Sat).""" + parser = OptionParser() + timer = processSetTimerArgsExtended( + parser, ["1", "poweroff", "time:2200;repeat:06"] + ) + # repeat 06 = Sun|Sat = bit7 | bit6 = 0x80 | 0x40 = 0xC0 + assert timer.repeat_mask == 0xC0 + + +def test_cli_process_set_timer_args_extended_everyday_repeat(): + """Test CLI parsing for everyday repeat (0123456).""" + parser = OptionParser() + timer = processSetTimerArgsExtended( + parser, ["1", "default", "time:0700;repeat:0123456"] + ) + # repeat 0123456 = Sun|Mon|Tue|Wed|Thu|Fri|Sat + # bit7=Sun + bits1-6 = 0x80 | 0x7E = 0xFE + assert timer.repeat_mask == 0xFE + + +def test_cli_process_set_timer_args_standard_inactive(): + """Test CLI parsing for inactive standard timer.""" + parser = OptionParser() + timer = processSetTimerArgs(parser, ["1", "inactive", ""]) + assert timer.isActive() is False + + +def test_cli_process_set_timer_args_standard_poweroff(): + """Test CLI parsing for poweroff standard timer.""" + parser = OptionParser() + timer = processSetTimerArgs(parser, ["2", "poweroff", "time:1430;repeat:12345"]) + assert timer.isActive() is True + assert timer.hour == 14 + assert timer.minute == 30 + # Check it's a turn-off timer + assert timer.turn_on is False + + +def test_cli_process_set_timer_args_standard_default(): + """Test CLI parsing for default (on) standard timer.""" + parser = OptionParser() + timer = processSetTimerArgs(parser, ["3", "default", "time:0830;repeat:06"]) + assert timer.isActive() is True + assert timer.hour == 8 + assert timer.minute == 30 + + +def test_cli_process_set_timer_args_standard_color(): + """Test CLI parsing for color standard timer.""" + parser = OptionParser() + timer = processSetTimerArgs( + parser, ["4", "color", "time:2100;repeat:0123456;color:255,0,0"] + ) + assert timer.isActive() is True + assert timer.hour == 21 + assert timer.minute == 0 + assert timer.red == 255 + assert timer.green == 0 + assert timer.blue == 0 + + +def test_cli_process_set_timer_args_standard_warmwhite(): + """Test CLI parsing for warmwhite standard timer.""" + parser = OptionParser() + timer = processSetTimerArgs( + parser, ["5", "warmwhite", "time:2200;repeat:12345;level:75"] + ) + assert timer.isActive() is True + assert timer.hour == 22 + assert timer.minute == 0 + # 75% is converted to byte: int((75 * 255) / 100) = 191 + assert timer.warmth_level == 191 + + +def test_cli_process_set_timer_args_standard_preset(): + """Test CLI parsing for preset standard timer.""" + parser = OptionParser() + timer = processSetTimerArgs( + parser, ["6", "preset", "time:1800;repeat:06;code:37;speed:50"] + ) + assert timer.isActive() is True + assert timer.hour == 18 + assert timer.minute == 0 + assert timer.pattern_code == 37 + + +# ============================================================================= +# Timer Display Formatting Tests (__str__) +# ============================================================================= + + +def test_led_timer_standard_str_inactive(): + """Test LedTimer.__str__ for inactive timer.""" + timer = LedTimer() + timer.setActive(False) + assert str(timer) == "Unset" + + +def test_led_timer_standard_str_on(): + """Test LedTimer.__str__ for turn-on timer.""" + timer = LedTimer() + timer.setActive(True) + timer.setTime(8, 30) + timer.setModeDefault() + timer.setRepeatMask(LedTimer.Weekdays) + + s = str(timer) + assert "[ON ]" in s + assert "08:30" in s + assert "Mo" in s + assert "Tu" in s + assert "Fr" in s + + +def test_led_timer_standard_str_off(): + """Test LedTimer.__str__ for turn-off timer.""" + timer = LedTimer() + timer.setActive(True) + timer.setTime(22, 0) + timer.setModeTurnOff() + timer.setRepeatMask(LedTimer.Everyday) + + s = str(timer) + assert "[OFF]" in s + assert "22:00" in s + + +def test_led_timer_standard_str_once(): + """Test LedTimer.__str__ for one-time timer.""" + timer = LedTimer() + timer.setActive(True) + timer.setTime(14, 30) + timer.setModeDefault() + timer.setDate(2025, 12, 25) + + s = str(timer) + assert "[ON ]" in s + assert "14:30" in s + assert "Once" in s + assert "2025-12-25" in s + + +def test_led_timer_standard_str_color(): + """Test LedTimer.__str__ for color timer.""" + timer = LedTimer() + timer.setActive(True) + timer.setTime(19, 0) + timer.setModeColor(255, 0, 0) + timer.setRepeatMask(LedTimer.Weekend) + + s = str(timer) + assert "[ON ]" in s + assert "19:00" in s + assert "Sa" in s + assert "Su" in s diff --git a/tests/test_sync.py b/tests/test_sync.py index 56badc11..9f591eb5 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -33,6 +33,7 @@ PROTOCOL_LEDENET_ADDRESSABLE_A1, PROTOCOL_LEDENET_ADDRESSABLE_A2, PROTOCOL_LEDENET_ADDRESSABLE_A3, + PROTOCOL_LEDENET_EXTENDED_CUSTOM, PROTOCOL_LEDENET_ORIGINAL, PROTOCOL_LEDENET_ORIGINAL_CCT, PROTOCOL_LEDENET_SOCKET, @@ -1422,6 +1423,112 @@ def read_data(expected): light.set_effect("colorloop", 50, 50) self.assertEqual(mock_send.call_args, mock.call(bytearray(b"8%\x102\x9f"))) + @patch("flux_led.WifiLedBulb._send_msg") + @patch("flux_led.WifiLedBulb._read_msg") + @patch("flux_led.WifiLedBulb.connect") + def test_0xB6_surplife_sync(self, mock_connect, mock_read, mock_send): + """Test 0xB6 Surplife device with sync API.""" + calls = 0 + + def read_data(expected): + nonlocal calls + calls += 1 + if calls == 1: + self.assertEqual(expected, 2) + # 0xB6 device returns extended state format (0xEA 0x81) + return bytearray(b"\xea\x81") + if calls == 2: + # Extended state: 21 bytes total, first 2 already read + # Format: EA 81 01 00 B6 01 23 61 00 64 0F 00 00 00 64 64 00 00 00 00 CS + self.assertEqual(expected, 19) + return bytearray( + b"\x01\x00\xb6\x01\x23\x61\x00\x64\x0f\x00\x00\x00\x64\x64\x00\x00\x00\x00\x83" + ) + + mock_read.side_effect = read_data + light = flux_led.WifiLedBulb("192.168.1.164") + + assert light.color_modes == {COLOR_MODE_RGB, COLOR_MODE_DIM} + self.assertEqual(light.protocol, PROTOCOL_LEDENET_EXTENDED_CUSTOM) + self.assertEqual(light.model_num, 0xB6) + self.assertEqual(light.version_num, 1) + assert "Surplife" in light.model + assert light.supports_extended_custom_effects is True + self.assertEqual(light.microphone, True) + + @patch("flux_led.WifiLedBulb._send_msg") + @patch("flux_led.WifiLedBulb._read_msg") + @patch("flux_led.WifiLedBulb.connect") + def test_0xB6_setExtendedCustomEffect(self, mock_connect, mock_read, mock_send): + """Test sync setExtendedCustomEffect for 0xB6 device.""" + calls = 0 + + def read_data(expected): + nonlocal calls + calls += 1 + if calls == 1: + self.assertEqual(expected, 2) + return bytearray(b"\xea\x81") + if calls == 2: + self.assertEqual(expected, 19) + return bytearray( + b"\x01\x00\xb6\x01\x23\x61\x00\x64\x0f\x00\x00\x00\x64\x64\x00\x00\x00\x00\x83" + ) + + mock_read.side_effect = read_data + light = flux_led.WifiLedBulb("192.168.1.164") + + self.assertEqual(light.protocol, PROTOCOL_LEDENET_EXTENDED_CUSTOM) + + # Call setExtendedCustomEffect + light.setExtendedCustomEffect( + pattern_id=1, + colors=[(255, 0, 0), (0, 255, 0)], + speed=50, + density=50, + ) + + # Verify _send_msg was called + assert mock_send.called + sent_data = mock_send.call_args[0][0] + # Verify it's a wrapped message + assert sent_data[0] == 0xB0 + assert sent_data[1] == 0xB1 + + @patch("flux_led.WifiLedBulb._send_msg") + @patch("flux_led.WifiLedBulb._read_msg") + @patch("flux_led.WifiLedBulb.connect") + def test_0xB6_setCustomSegmentColors(self, mock_connect, mock_read, mock_send): + """Test sync setCustomSegmentColors for 0xB6 device.""" + calls = 0 + + def read_data(expected): + nonlocal calls + calls += 1 + if calls == 1: + self.assertEqual(expected, 2) + return bytearray(b"\xea\x81") + if calls == 2: + self.assertEqual(expected, 19) + return bytearray( + b"\x01\x00\xb6\x01\x23\x61\x00\x64\x0f\x00\x00\x00\x64\x64\x00\x00\x00\x00\x83" + ) + + mock_read.side_effect = read_data + light = flux_led.WifiLedBulb("192.168.1.164") + + self.assertEqual(light.protocol, PROTOCOL_LEDENET_EXTENDED_CUSTOM) + + # Call setCustomSegmentColors + light.setCustomSegmentColors(segments=[(255, 0, 0), None, (0, 0, 255)]) + + # Verify _send_msg was called + assert mock_send.called + sent_data = mock_send.call_args[0][0] + # Verify it's a wrapped message + assert sent_data[0] == 0xB0 + assert sent_data[1] == 0xB1 + @patch("flux_led.WifiLedBulb._send_msg") @patch("flux_led.WifiLedBulb._read_msg") @patch("flux_led.WifiLedBulb.connect")