Skip to content

Add LightEffectModule for dynamic light effects on SMART bulbs #887

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 2, 2024
38 changes: 18 additions & 20 deletions kasa/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@
title: str,
category: Feature.Category | None = None,
verbose: bool = False,
indent: str = "\t",
):
"""Print out a listing of features and their values."""
if category is not None:
Expand All @@ -598,13 +599,13 @@
echo(f"[bold]{title}[/bold]")
for _, feat in features.items():
try:
echo(f"\t{feat}")
echo(f"{indent}{feat}")
if verbose:
echo(f"\t\tType: {feat.type}")
echo(f"\t\tCategory: {feat.category}")
echo(f"\t\tIcon: {feat.icon}")
echo(f"{indent}\tType: {feat.type}")
echo(f"{indent}\tCategory: {feat.category}")
echo(f"{indent}\tIcon: {feat.icon}")

Check warning on line 606 in kasa/cli.py

View check run for this annotation

Codecov / codecov/patch

kasa/cli.py#L604-L606

Added lines #L604 - L606 were not covered by tests
except Exception as ex:
echo(f"\t{feat.name} ({feat.id}): got exception (%s)" % ex)
echo(f"{indent}{feat.name} ({feat.id}): [red]got exception ({ex})[/red]")

Check warning on line 608 in kasa/cli.py

View check run for this annotation

Codecov / codecov/patch

kasa/cli.py#L608

Added line #L608 was not covered by tests


def _echo_all_features(features, *, verbose=False, title_prefix=None):
Expand Down Expand Up @@ -1219,22 +1220,15 @@
echo(f"Targeting child device {child}")
dev = dev.get_child_device(child)
if not name:

def _print_features(dev):
for name, feat in dev.features.items():
try:
unit = f" {feat.unit}" if feat.unit else ""
echo(f"\t{feat.name} ({name}): {feat.value}{unit}")
except Exception as ex:
echo(f"\t{feat.name} ({name}): [red]{ex}[/red]")

echo("[bold]== Features ==[/bold]")
_print_features(dev)
_echo_features(dev.features, "\n[bold]== Features ==[/bold]\n", indent="")

if dev.children:
for child_dev in dev.children:
echo(f"[bold]== Child {child_dev.alias} ==")
_print_features(child_dev)
_echo_features(
child_dev.features,
f"\n[bold]== Child {child_dev.alias} ==\n",
indent="",
)

return

Expand All @@ -1249,9 +1243,13 @@
echo(f"{feat.name} ({name}): {feat.value}{unit}")
return feat.value

echo(f"Setting {name} to {value}")
value = ast.literal_eval(value)
return await dev.features[name].set_value(value)
echo(f"Changing {name} from {feat.value} to {value}")
response = await dev.features[name].set_value(value)
await dev.update()
echo(f"New state: {feat.value}")

return response


if __name__ == "__main__":
Expand Down
7 changes: 7 additions & 0 deletions kasa/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,16 @@ async def set_value(self, value):
def __repr__(self):
try:
value = self.value
choices = self.choices
except Exception as ex:
return f"Unable to read value ({self.id}): {ex}"

if self.type == Feature.Type.Choice:
if not isinstance(choices, list) or value not in choices:
return f"Value {value} is not a valid choice ({self.id}): {choices}"
value = " ".join(
[f"*{choice}*" if choice == value else choice for choice in choices]
)
if self.precision_hint is not None and value is not None:
value = round(self.value, self.precision_hint)

Expand Down
2 changes: 2 additions & 0 deletions kasa/smart/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .frostprotection import FrostProtectionModule
from .humidity import HumiditySensor
from .ledmodule import LedModule
from .lighteffectmodule import LightEffectModule
from .lighttransitionmodule import LightTransitionModule
from .reportmodule import ReportModule
from .temperature import TemperatureSensor
Expand All @@ -39,6 +40,7 @@
"FanModule",
"Firmware",
"CloudModule",
"LightEffectModule",
"LightTransitionModule",
"ColorTemperatureModule",
"ColorModule",
Expand Down
112 changes: 112 additions & 0 deletions kasa/smart/modules/lighteffectmodule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Module for light effects."""

from __future__ import annotations

import base64
import copy
from typing import TYPE_CHECKING, Any

from ...feature import Feature
from ..smartmodule import SmartModule

if TYPE_CHECKING:
from ..smartdevice import SmartDevice


class LightEffectModule(SmartModule):
"""Implementation of dynamic light effects."""

REQUIRED_COMPONENT = "light_effect"
QUERY_GETTER_NAME = "get_dynamic_light_effect_rules"
AVAILABLE_BULB_EFFECTS = {
"L1": "Party",
"L2": "Relax",
}
LIGHT_EFFECTS_OFF = "Off"

def __init__(self, device: SmartDevice, module: str):
super().__init__(device, module)
self._scenes_names_to_id: dict[str, str] = {}

def _initialize_features(self):
"""Initialize features."""
device = self._device
self._add_feature(
Feature(
device,
"Light effect",
container=self,
attribute_getter="effect",
attribute_setter="set_effect",
category=Feature.Category.Config,
type=Feature.Type.Choice,
choices_getter="effect_list",
)
)

def _initialize_effects(self) -> dict[str, dict[str, Any]]:
"""Return built-in effects."""
# Copy the effects so scene name updates do not update the underlying dict.
effects = copy.deepcopy(
{effect["id"]: effect for effect in self.data["rule_list"]}
)
for effect in effects.values():
if not effect["scene_name"]:
# If the name has not been edited scene_name will be an empty string
effect["scene_name"] = self.AVAILABLE_BULB_EFFECTS[effect["id"]]
else:
# Otherwise it will be b64 encoded
effect["scene_name"] = base64.b64decode(effect["scene_name"]).decode()
self._scenes_names_to_id = {
effect["scene_name"]: effect["id"] for effect in effects.values()
}
return effects

@property
def effect_list(self) -> list[str] | None:
"""Return built-in effects list.

Example:
['Party', 'Relax', ...]
"""
effects = [self.LIGHT_EFFECTS_OFF]
effects.extend(
[effect["scene_name"] for effect in self._initialize_effects().values()]
)
return effects

@property
def effect(self) -> str:
"""Return effect name."""
# get_dynamic_light_effect_rules also has an enable property and current_rule_id
# property that could be used here as an alternative
if self._device._info["dynamic_light_effect_enable"]:
return self._initialize_effects()[
self._device._info["dynamic_light_effect_id"]
]["scene_name"]
return self.LIGHT_EFFECTS_OFF

async def set_effect(
self,
effect: str,
) -> None:
"""Set an effect for the device.

The device doesn't store an active effect while not enabled so store locally.
"""
if effect != self.LIGHT_EFFECTS_OFF and effect not in self._scenes_names_to_id:
raise ValueError(
f"Cannot set light effect to {effect}, possible values "
f"are: {self.LIGHT_EFFECTS_OFF} "
f"{' '.join(self._scenes_names_to_id.keys())}"
)
enable = effect != self.LIGHT_EFFECTS_OFF
params: dict[str, bool | str] = {"enable": enable}
if enable:
effect_id = self._scenes_names_to_id[effect]
params["id"] = effect_id
return await self.call("set_dynamic_light_effect_rule_enable", params)

def query(self) -> dict:
"""Query to execute during the update cycle."""
return {self.QUERY_GETTER_NAME: {"start_index": 0}}
58 changes: 5 additions & 53 deletions kasa/smart/smartdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@
# same issue, homekit perhaps?
WALL_SWITCH_PARENT_ONLY_MODULES = [DeviceModule, TimeModule, Firmware, CloudModule]

AVAILABLE_BULB_EFFECTS = {
"L1": "Party",
"L2": "Relax",
}


# Device must go last as the other interfaces also inherit Device
# and python needs a consistent method resolution order.
Expand Down Expand Up @@ -683,44 +678,6 @@ def valid_temperature_range(self) -> ColorTempRange:
ColorTemperatureModule, self.modules["ColorTemperatureModule"]
).valid_temperature_range

@property
def has_effects(self) -> bool:
"""Return True if the device supports effects."""
return "dynamic_light_effect_enable" in self._info

@property
def effect(self) -> dict:
"""Return effect state.

This follows the format used by SmartLightStrip.

Example:
{'brightness': 50,
'custom': 0,
'enable': 0,
'id': '',
'name': ''}
"""
# If no effect is active, dynamic_light_effect_id does not appear in info
current_effect = self._info.get("dynamic_light_effect_id", "")
data = {
"brightness": self.brightness,
"enable": current_effect != "",
"id": current_effect,
"name": AVAILABLE_BULB_EFFECTS.get(current_effect, ""),
}

return data

@property
def effect_list(self) -> list[str] | None:
"""Return built-in effects list.

Example:
['Party', 'Relax', ...]
"""
return list(AVAILABLE_BULB_EFFECTS.keys()) if self.has_effects else None

@property
def hsv(self) -> HSV:
"""Return the current HSV state of the bulb.
Expand Down Expand Up @@ -807,17 +764,12 @@ async def set_brightness(
brightness
)

async def set_effect(
self,
effect: str,
*,
brightness: int | None = None,
transition: int | None = None,
) -> None:
"""Set an effect on the device."""
raise NotImplementedError()

@property
def presets(self) -> list[BulbPreset]:
"""Return a list of available bulb setting presets."""
return []

@property
def has_effects(self) -> bool:
"""Return True if the device supports effects."""
return "LightEffectModule" in self.modules
16 changes: 16 additions & 0 deletions kasa/tests/fakeprotocol_smart.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ def _handle_control_child(self, params: dict):
"Method %s not implemented for children" % child_method
)

def _set_light_effect(self, info, params):
"""Set or remove values as per the device behaviour."""
info["get_device_info"]["dynamic_light_effect_enable"] = params["enable"]
info["get_dynamic_light_effect_rules"]["enable"] = params["enable"]
if params["enable"]:
info["get_device_info"]["dynamic_light_effect_id"] = params["id"]
info["get_dynamic_light_effect_rules"]["current_rule_id"] = params["enable"]
else:
if "dynamic_light_effect_id" in info["get_device_info"]:
del info["get_device_info"]["dynamic_light_effect_id"]
if "current_rule_id" in info["get_dynamic_light_effect_rules"]:
del info["get_dynamic_light_effect_rules"]["current_rule_id"]

def _send_request(self, request_dict: dict):
method = request_dict["method"]
params = request_dict["params"]
Expand Down Expand Up @@ -223,6 +236,9 @@ def _send_request(self, request_dict: dict):
return retval
elif method == "set_qs_info":
return {"error_code": 0}
elif method == "set_dynamic_light_effect_rule_enable":
self._set_light_effect(info, params)
return {"error_code": 0}
elif method[:4] == "set_":
target_method = f"get_{method[4:]}"
info[target_method].update(params)
Expand Down
42 changes: 42 additions & 0 deletions kasa/tests/smart/modules/test_light_effect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

from itertools import chain
from typing import cast

import pytest
from pytest_mock import MockerFixture

from kasa import Device, Feature
from kasa.smart.modules import LightEffectModule
from kasa.tests.device_fixtures import parametrize

light_effect = parametrize(
"has light effect", component_filter="light_effect", protocol_filter={"SMART"}
)


@light_effect
async def test_light_effect(dev: Device, mocker: MockerFixture):
"""Test light effect."""
light_effect = cast(LightEffectModule, dev.modules.get("LightEffectModule"))
assert light_effect

feature = light_effect._module_features["light_effect"]
assert feature.type == Feature.Type.Choice

call = mocker.spy(light_effect, "call")
assert feature.choices == light_effect.effect_list
assert feature.choices
for effect in chain(reversed(feature.choices), feature.choices):
await light_effect.set_effect(effect)
enable = effect != LightEffectModule.LIGHT_EFFECTS_OFF
params: dict[str, bool | str] = {"enable": enable}
if enable:
params["id"] = light_effect._scenes_names_to_id[effect]
call.assert_called_with("set_dynamic_light_effect_rule_enable", params)
await dev.update()
assert light_effect.effect == effect
assert feature.value == effect

with pytest.raises(ValueError):
await light_effect.set_effect("foobar")
Loading
Loading