From 06dbc7d7cee2de2811a770792afbef081d8a2a83 Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Wed, 4 Dec 2024 14:04:20 +0100 Subject: [PATCH 1/7] Add powerprotection module --- kasa/smart/modules/__init__.py | 2 + kasa/smart/modules/powerprotection.py | 98 +++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 kasa/smart/modules/powerprotection.py diff --git a/kasa/smart/modules/__init__.py b/kasa/smart/modules/__init__.py index 99820cfaf..073b58b88 100644 --- a/kasa/smart/modules/__init__.py +++ b/kasa/smart/modules/__init__.py @@ -24,6 +24,7 @@ from .lightstripeffect import LightStripEffect from .lighttransition import LightTransition from .motionsensor import MotionSensor +from .powerprotection import PowerProtection from .reportmode import ReportMode from .temperaturecontrol import TemperatureControl from .temperaturesensor import TemperatureSensor @@ -64,4 +65,5 @@ "FrostProtection", "Thermostat", "SmartLightEffect", + "PowerProtection", ] diff --git a/kasa/smart/modules/powerprotection.py b/kasa/smart/modules/powerprotection.py new file mode 100644 index 000000000..5e371b892 --- /dev/null +++ b/kasa/smart/modules/powerprotection.py @@ -0,0 +1,98 @@ +"""Power protection module.""" + +from __future__ import annotations + +from ...feature import Feature +from ..smartmodule import SmartModule + + +class PowerProtection(SmartModule): + """Implementation for power_protection.""" + + REQUIRED_COMPONENT = "power_protection" + + def _initialize_features(self) -> None: + """Initialize features after the initial update.""" + self._add_feature( + Feature( + device=self._device, + id="overloaded", + name="Overloaded", + container=self, + attribute_getter="overloaded", + type=Feature.Type.BinarySensor, + category=Feature.Category.Info, + ) + ) + self._add_feature( + Feature( + device=self._device, + id="power_protection_enabled", + name="Power protection enabled", + container=self, + attribute_getter="enabled", + attribute_setter="set_enabled", + type=Feature.Type.Switch, + category=Feature.Category.Config, + ) + ) + self._add_feature( + Feature( + device=self._device, + id="power_protection_threshold", + name="Power protection threshold", + container=self, + attribute_getter="protection_threshold", + attribute_setter="set_protection_threshold", + unit_getter=lambda: "W", + type=Feature.Type.Number, + range_getter="protection_threshold_range", + category=Feature.Category.Config, + ) + ) + + def query(self) -> dict: + """Query to execute during the update cycle.""" + return {"get_protection_power": None, "get_max_power": None} + + @property + def overloaded(self) -> bool: + """Return True is power protection has been triggered. + + This value remains True until the device is turned on again. + """ + return self._device.sys_info["power_protection_status"] == "overloaded" + + @property + def enabled(self) -> bool: + """Return True if child protection is enabled.""" + return self.data["get_protection_power"]["enabled"] + + async def set_enabled(self, enabled: bool) -> dict: + """Set child protection.""" + params = {**self.data["get_protection_power"], "enabled": enabled} + return await self.call("set_protection_power", params) + + @property + def protection_threshold_range(self) -> tuple[int, int]: + """Return threshold range.""" + return 0, self.data["get_max_power"]["max_power"] + + @property + def protection_threshold(self) -> int: + """Return protection threshold in watts.""" + # If never configured, there is no value set. + return self.data["get_protection_power"].get("protection_power", 0) + + async def set_protection_threshold(self, threshold: int) -> dict: + """Set protection threshold.""" + if threshold < 0 or threshold > self.protection_threshold_range[1]: + raise ValueError( + "Threshold out of range: %s (%s)", threshold, self.protection_threshold + ) + + params = { + **self.data["get_protection_power"], + "protection_power": threshold, + } + return await self.call("set_protection_power", params) From bd30f4bb1fe29f9e743576f99ae9f7dd03e695fe Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 5 Dec 2024 17:30:59 +0100 Subject: [PATCH 2/7] fix tests --- tests/fakeprotocol_smart.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/fakeprotocol_smart.py b/tests/fakeprotocol_smart.py index 448729ca7..1647b7ba0 100644 --- a/tests/fakeprotocol_smart.py +++ b/tests/fakeprotocol_smart.py @@ -151,6 +151,14 @@ def credentials_hash(self): "energy_monitoring", {"igain": 10861, "vgain": 118657}, ), + "get_protection_power": ( + "power_protection", + {"enabled": False, "protection_power": 0}, + ), + "get_max_power": ( + "power_protection", + {"max_power": 3904}, + ), } async def send(self, request: str): From 51d112e6f846fa270a1d6a2325f746ca0b617176 Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 5 Dec 2024 17:52:45 +0100 Subject: [PATCH 3/7] Add PowerProtection to Module --- kasa/module.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kasa/module.py b/kasa/module.py index 2b2e65f93..9a34c859a 100644 --- a/kasa/module.py +++ b/kasa/module.py @@ -150,6 +150,9 @@ class Module(ABC): "ChildProtection" ) TriggerLogs: Final[ModuleName[smart.TriggerLogs]] = ModuleName("TriggerLogs") + PowerProtection: Final[ModuleName[smart.PowerProtection]] = ModuleName( + "PowerProtection" + ) # SMARTCAM only modules Camera: Final[ModuleName[smartcam.Camera]] = ModuleName("Camera") From c0f5d5b5941a19e839ab61f57bf46acd8a182ab7 Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 5 Dec 2024 17:52:48 +0100 Subject: [PATCH 4/7] Add tests --- tests/smart/modules/test_powerprotection.py | 76 +++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 tests/smart/modules/test_powerprotection.py diff --git a/tests/smart/modules/test_powerprotection.py b/tests/smart/modules/test_powerprotection.py new file mode 100644 index 000000000..8db897211 --- /dev/null +++ b/tests/smart/modules/test_powerprotection.py @@ -0,0 +1,76 @@ +import pytest +from pytest_mock import MockerFixture + +from kasa import Module, SmartDevice +from kasa.smart.modules import PowerProtection + +from ...device_fixtures import parametrize + +powerprotection = parametrize( + "has powerprotection", + component_filter="power_protection", + protocol_filter={"SMART"}, +) + + +def _skip_on_unavailable(dev: SmartDevice): + if Module.PowerProtection not in dev.modules: + pytest.skip(f"No powerprotection module on {dev}, maybe a strip parent?") + + +@powerprotection +@pytest.mark.parametrize( + ("feature", "prop_name", "type"), + [ + ("overloaded", "overloaded", bool | None), + ("power_protection_enabled", "enabled", bool), + ("power_protection_threshold", "protection_threshold", int), + ], +) +async def test_features(dev, feature, prop_name, type): + """Test that features are registered and work as expected.""" + _skip_on_unavailable(dev) + + powerprot: PowerProtection = dev.modules[Module.PowerProtection] + + prop = getattr(powerprot, prop_name) + assert isinstance(prop, type) + + feat = dev.features[feature] + assert feat.value == prop + assert isinstance(feat.value, type) + + +@powerprotection +async def test_set_enable(dev: SmartDevice, mocker: MockerFixture): + """Test enable.""" + _skip_on_unavailable(dev) + + powerprot: PowerProtection = dev.modules[Module.PowerProtection] + + call_spy = mocker.spy(powerprot, "call") + await powerprot.set_enabled(True) + params = { + "enabled": mocker.ANY, + "protection_power": mocker.ANY, + } + call_spy.assert_called_with("set_protection_power", params) + + +@powerprotection +async def test_set_threshold(dev: SmartDevice, mocker: MockerFixture): + """Test enable.""" + _skip_on_unavailable(dev) + + powerprot: PowerProtection = dev.modules[Module.PowerProtection] + + call_spy = mocker.spy(powerprot, "call") + await powerprot.set_protection_threshold(123) + params = { + "enabled": mocker.ANY, + "protection_power": 123, + } + call_spy.assert_called_with("set_protection_power", params) + + with pytest.raises(ValueError, match="Threshold out of range"): + await powerprot.set_protection_threshold(-10) From f3ebb02b1017121c54ece2c027d0729f6abb3bc5 Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 5 Dec 2024 17:59:50 +0100 Subject: [PATCH 5/7] Add _check_supported --- kasa/smart/modules/powerprotection.py | 7 +++++++ tests/smart/modules/test_powerprotection.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/kasa/smart/modules/powerprotection.py b/kasa/smart/modules/powerprotection.py index 5e371b892..095bfbe12 100644 --- a/kasa/smart/modules/powerprotection.py +++ b/kasa/smart/modules/powerprotection.py @@ -96,3 +96,10 @@ async def set_protection_threshold(self, threshold: int) -> dict: "protection_power": threshold, } return await self.call("set_protection_power", params) + + async def _check_supported(self) -> bool: + """Return True if module is supported. + + This is needed, as strips like P304M report the status only for children. + """ + return "power_protection_status" in self._device.sys_info diff --git a/tests/smart/modules/test_powerprotection.py b/tests/smart/modules/test_powerprotection.py index 8db897211..10e94edef 100644 --- a/tests/smart/modules/test_powerprotection.py +++ b/tests/smart/modules/test_powerprotection.py @@ -22,7 +22,7 @@ def _skip_on_unavailable(dev: SmartDevice): @pytest.mark.parametrize( ("feature", "prop_name", "type"), [ - ("overloaded", "overloaded", bool | None), + ("overloaded", "overloaded", bool), ("power_protection_enabled", "enabled", bool), ("power_protection_threshold", "protection_threshold", int), ], From 86e1f5ae383fe562fe657a604d6ad19fea770922 Mon Sep 17 00:00:00 2001 From: Steven B <51370195+sdb9696@users.noreply.github.com> Date: Thu, 23 Jan 2025 17:34:00 +0000 Subject: [PATCH 6/7] Apply suggestions from code review --- kasa/module.py | 16 +++-- kasa/smart/modules/powerprotection.py | 57 +++++++++------- tests/smart/modules/test_powerprotection.py | 74 +++++++++++++-------- 3 files changed, 94 insertions(+), 53 deletions(-) diff --git a/kasa/module.py b/kasa/module.py index 0e098a1c0..a21e69e41 100644 --- a/kasa/module.py +++ b/kasa/module.py @@ -81,6 +81,9 @@ class FeatureAttribute: """Class for annotating attributes bound to feature.""" + def __init__(self, feature_name: str | None = None) -> None: + self.feature_name = feature_name + def __repr__(self) -> str: return "FeatureAttribute" @@ -237,7 +240,7 @@ def __repr__(self) -> str: ) -def _is_bound_feature(attribute: property | Callable) -> bool: +def _get_feature_attribute(attribute: property | Callable) -> FeatureAttribute | None: """Check if an attribute is bound to a feature with FeatureAttribute.""" if isinstance(attribute, property): hints = get_type_hints(attribute.fget, include_extras=True) @@ -248,9 +251,9 @@ def _is_bound_feature(attribute: property | Callable) -> bool: metadata = hints["return"].__metadata__ for meta in metadata: if isinstance(meta, FeatureAttribute): - return True + return meta - return False + return None @cache @@ -277,12 +280,17 @@ def _get_bound_feature( f"module {module.__class__.__name__}" ) - if not _is_bound_feature(attribute_callable): + if not (fa := _get_feature_attribute(attribute_callable)): raise KasaException( f"Attribute {attribute_name} of module {module.__class__.__name__}" " is not bound to a feature" ) + # If a feature_name was passed to the FeatureAttribute use that to check + # for the feature. Otherwise check the getters and setters in the features + if fa.feature_name: + return module._all_features.get(fa.feature_name) + check = {attribute_name, attribute_callable} for feature in module._all_features.values(): if (getter := feature.attribute_getter) and getter in check: diff --git a/kasa/smart/modules/powerprotection.py b/kasa/smart/modules/powerprotection.py index 095bfbe12..b4d9c9884 100644 --- a/kasa/smart/modules/powerprotection.py +++ b/kasa/smart/modules/powerprotection.py @@ -2,7 +2,10 @@ from __future__ import annotations +from typing import Annotated + from ...feature import Feature +from ...module import FeatureAttribute from ..smartmodule import SmartModule @@ -24,36 +27,28 @@ def _initialize_features(self) -> None: category=Feature.Category.Info, ) ) - self._add_feature( - Feature( - device=self._device, - id="power_protection_enabled", - name="Power protection enabled", - container=self, - attribute_getter="enabled", - attribute_setter="set_enabled", - type=Feature.Type.Switch, - category=Feature.Category.Config, - ) - ) self._add_feature( Feature( device=self._device, id="power_protection_threshold", name="Power protection threshold", container=self, - attribute_getter="protection_threshold", - attribute_setter="set_protection_threshold", + attribute_getter=lambda x: self.protection_threshold + if self.enabled + else 0, + attribute_setter=lambda x: self.set_enabled(False) + if x == 0 + else self.set_enabled(True, threshold=x), unit_getter=lambda: "W", type=Feature.Type.Number, - range_getter="protection_threshold_range", + range_getter=lambda: (0, self._max_power), category=Feature.Category.Config, ) ) def query(self) -> dict: """Query to execute during the update cycle.""" - return {"get_protection_power": None, "get_max_power": None} + return {"get_protection_power": {}, "get_max_power": {}} @property def overloaded(self) -> bool: @@ -68,25 +63,41 @@ def enabled(self) -> bool: """Return True if child protection is enabled.""" return self.data["get_protection_power"]["enabled"] - async def set_enabled(self, enabled: bool) -> dict: - """Set child protection.""" + async def set_enabled(self, enabled: bool, *, threshold: int | None = None) -> dict: + """Set child protection. + + If power protection has never been enabled before the threshold will + be 0 so if threshold is not provided it will be set to half the max. + """ + if threshold is None and enabled and self.protection_threshold == 0: + threshold = int(self._max_power / 2) + + if threshold and (threshold < 0 or threshold > self._max_power): + raise ValueError( + "Threshold out of range: %s (%s)", threshold, self.protection_threshold + ) + params = {**self.data["get_protection_power"], "enabled": enabled} + if threshold is not None: + params["protection_power"] = threshold return await self.call("set_protection_power", params) @property - def protection_threshold_range(self) -> tuple[int, int]: - """Return threshold range.""" - return 0, self.data["get_max_power"]["max_power"] + def _max_power(self) -> int: + """Return max power.""" + return self.data["get_max_power"]["max_power"] @property - def protection_threshold(self) -> int: + def protection_threshold( + self, + ) -> Annotated[int, FeatureAttribute("power_protection_threshold")]: """Return protection threshold in watts.""" # If never configured, there is no value set. return self.data["get_protection_power"].get("protection_power", 0) async def set_protection_threshold(self, threshold: int) -> dict: """Set protection threshold.""" - if threshold < 0 or threshold > self.protection_threshold_range[1]: + if threshold < 0 or threshold > self._max_power: raise ValueError( "Threshold out of range: %s (%s)", threshold, self.protection_threshold ) diff --git a/tests/smart/modules/test_powerprotection.py b/tests/smart/modules/test_powerprotection.py index 10e94edef..7f03c0e9a 100644 --- a/tests/smart/modules/test_powerprotection.py +++ b/tests/smart/modules/test_powerprotection.py @@ -2,9 +2,8 @@ from pytest_mock import MockerFixture from kasa import Module, SmartDevice -from kasa.smart.modules import PowerProtection -from ...device_fixtures import parametrize +from ...device_fixtures import get_parent_and_child_modules, parametrize powerprotection = parametrize( "has powerprotection", @@ -13,30 +12,24 @@ ) -def _skip_on_unavailable(dev: SmartDevice): - if Module.PowerProtection not in dev.modules: - pytest.skip(f"No powerprotection module on {dev}, maybe a strip parent?") - - @powerprotection @pytest.mark.parametrize( ("feature", "prop_name", "type"), [ ("overloaded", "overloaded", bool), - ("power_protection_enabled", "enabled", bool), ("power_protection_threshold", "protection_threshold", int), ], ) async def test_features(dev, feature, prop_name, type): """Test that features are registered and work as expected.""" - _skip_on_unavailable(dev) - - powerprot: PowerProtection = dev.modules[Module.PowerProtection] + powerprot = next(get_parent_and_child_modules(dev, Module.PowerProtection)) + assert powerprot + device = powerprot._device prop = getattr(powerprot, prop_name) assert isinstance(prop, type) - feat = dev.features[feature] + feat = device.features[feature] assert feat.value == prop assert isinstance(feat.value, type) @@ -44,25 +37,54 @@ async def test_features(dev, feature, prop_name, type): @powerprotection async def test_set_enable(dev: SmartDevice, mocker: MockerFixture): """Test enable.""" - _skip_on_unavailable(dev) - - powerprot: PowerProtection = dev.modules[Module.PowerProtection] - - call_spy = mocker.spy(powerprot, "call") - await powerprot.set_enabled(True) - params = { - "enabled": mocker.ANY, - "protection_power": mocker.ANY, - } - call_spy.assert_called_with("set_protection_power", params) + powerprot = next(get_parent_and_child_modules(dev, Module.PowerProtection)) + assert powerprot + device = powerprot._device + + original_enabled = powerprot.enabled + original_threshold = powerprot.protection_threshold + + try: + # Simple enable with an existing threshold + call_spy = mocker.spy(powerprot, "call") + await powerprot.set_enabled(True) + params = { + "enabled": True, + "protection_power": mocker.ANY, + } + call_spy.assert_called_with("set_protection_power", params) + + # Enable with no threshold param when 0 + call_spy.reset_mock() + await powerprot.set_protection_threshold(0) + await device.update() + await powerprot.set_enabled(True) + params = { + "enabled": True, + "protection_power": int(powerprot._max_power / 2), + } + call_spy.assert_called_with("set_protection_power", params) + + # Enable false should not update the threshold + call_spy.reset_mock() + await powerprot.set_protection_threshold(0) + await device.update() + await powerprot.set_enabled(False) + params = { + "enabled": False, + "protection_power": 0, + } + call_spy.assert_called_with("set_protection_power", params) + + finally: + await powerprot.set_enabled(original_enabled, threshold=original_threshold) @powerprotection async def test_set_threshold(dev: SmartDevice, mocker: MockerFixture): """Test enable.""" - _skip_on_unavailable(dev) - - powerprot: PowerProtection = dev.modules[Module.PowerProtection] + powerprot = next(get_parent_and_child_modules(dev, Module.PowerProtection)) + assert powerprot call_spy = mocker.spy(powerprot, "call") await powerprot.set_protection_threshold(123) From 7bce95885c9d2cc89c91c76997934023d609c4f5 Mon Sep 17 00:00:00 2001 From: Steven B <51370195+sdb9696@users.noreply.github.com> Date: Sat, 25 Jan 2025 18:22:20 +0000 Subject: [PATCH 7/7] Replace lambda with def --- kasa/smart/modules/powerprotection.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/kasa/smart/modules/powerprotection.py b/kasa/smart/modules/powerprotection.py index b4d9c9884..ff7e726d5 100644 --- a/kasa/smart/modules/powerprotection.py +++ b/kasa/smart/modules/powerprotection.py @@ -33,12 +33,8 @@ def _initialize_features(self) -> None: id="power_protection_threshold", name="Power protection threshold", container=self, - attribute_getter=lambda x: self.protection_threshold - if self.enabled - else 0, - attribute_setter=lambda x: self.set_enabled(False) - if x == 0 - else self.set_enabled(True, threshold=x), + attribute_getter="_threshold_or_zero", + attribute_setter="_set_threshold_auto_enable", unit_getter=lambda: "W", type=Feature.Type.Number, range_getter=lambda: (0, self._max_power), @@ -64,7 +60,7 @@ def enabled(self) -> bool: return self.data["get_protection_power"]["enabled"] async def set_enabled(self, enabled: bool, *, threshold: int | None = None) -> dict: - """Set child protection. + """Set power protection enabled. If power protection has never been enabled before the threshold will be 0 so if threshold is not provided it will be set to half the max. @@ -82,6 +78,18 @@ async def set_enabled(self, enabled: bool, *, threshold: int | None = None) -> d params["protection_power"] = threshold return await self.call("set_protection_power", params) + async def _set_threshold_auto_enable(self, threshold: int) -> dict: + """Set power protection and enable.""" + if threshold == 0: + return await self.set_enabled(False) + else: + return await self.set_enabled(True, threshold=threshold) + + @property + def _threshold_or_zero(self) -> int: + """Get power protection threshold. 0 if not enabled.""" + return self.protection_threshold if self.enabled else 0 + @property def _max_power(self) -> int: """Return max power."""