Skip to content

Implement wifi interface for tapodevice #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kasa/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ async def scan(dev):

@wifi.command()
@click.argument("ssid")
@click.option("--keytype", prompt=True)
@click.option("--password", prompt=True, hide_input=True)
@click.option("--keytype", default=3)
@pass_dev
async def join(dev: SmartDevice, ssid, password, keytype):
async def join(dev: SmartDevice, ssid: str, password: str, keytype: str):
"""Join the given wifi network."""
echo(f"Asking the device to connect to {ssid}..")
res = await dev.wifi_join(ssid, password, keytype=keytype)
Expand Down
7 changes: 5 additions & 2 deletions kasa/smartdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class WifiNetwork:
channel: Optional[int] = None
rssi: Optional[int] = None

# For SMART devices
signal_level: Optional[int] = None


def merge(d, u):
"""Update dict recursively."""
Expand Down Expand Up @@ -687,7 +690,7 @@ async def _scan(target):

return [WifiNetwork(**x) for x in info["ap_list"]]

async def wifi_join(self, ssid, password, keytype=3): # noqa: D202
async def wifi_join(self, ssid: str, password: str, keytype: str = "3"): # noqa: D202
"""Join the given wifi network.

If joining the network fails, the device will return to AP mode after a while.
Expand All @@ -696,7 +699,7 @@ async def wifi_join(self, ssid, password, keytype=3): # noqa: D202
async def _join(target, payload):
return await self._query_helper(target, "set_stainfo", payload)

payload = {"ssid": ssid, "password": password, "key_type": keytype}
payload = {"ssid": ssid, "password": password, "key_type": int(keytype)}
try:
return await _join("netif", payload)
except SmartDeviceException as ex:
Expand Down
80 changes: 77 additions & 3 deletions kasa/tapo/tapodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import base64
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, Set, cast
from typing import Any, Dict, List, Optional, Set, cast

from ..aestransport import AesTransport
from ..deviceconfig import DeviceConfig
from ..emeterstatus import EmeterStatus
from ..exceptions import AuthenticationException
from ..exceptions import AuthenticationException, SmartDeviceException
from ..modules import Emeter
from ..protocol import TPLinkProtocol
from ..smartdevice import SmartDevice
from ..smartdevice import SmartDevice, WifiNetwork
from ..smartprotocol import SmartProtocol

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -247,3 +247,77 @@ def emeter_this_month(self) -> Optional[float]:
def emeter_today(self) -> Optional[float]:
"""Get the emeter value for today."""
return self._convert_energy_data(self._energy.get("today_energy"), 1 / 1000)

async def wifi_scan(self) -> List[WifiNetwork]:
"""Scan for available wifi networks."""

def _net_for_scan_info(res):
return WifiNetwork(
ssid=base64.b64decode(res["ssid"]).decode(),
cipher_type=res["cipher_type"],
key_type=res["key_type"],
channel=res["channel"],
signal_level=res["signal_level"],
bssid=res["bssid"],
)

async def _query_networks(networks=None, start_index=0):
_LOGGER.debug("Querying networks using start_index=%s", start_index)
if networks is None:
networks = []

resp = await self.protocol.query(
{"get_wireless_scan_info": {"start_index": start_index}}
)
network_list = [
_net_for_scan_info(net)
for net in resp["get_wireless_scan_info"]["ap_list"]
]
networks.extend(network_list)

if resp["get_wireless_scan_info"]["sum"] > start_index + 10:
return await _query_networks(networks, start_index=start_index + 10)

return networks

return await _query_networks()

async def wifi_join(self, ssid: str, password: str, keytype: str = "wpa2_psk"):
"""Join the given wifi network.

This method returns nothing as the device tries to activate the new
settings immediately instead of responding to the request.

If joining the network fails, the device will return to the previous state
after some delay.
"""
if not self.credentials:
raise AuthenticationException("Device requires authentication.")

payload = {
"account": {
"username": base64.b64encode(
self.credentials.username.encode()
).decode(),
"password": base64.b64encode(
self.credentials.password.encode()
).decode(),
},
"wireless": {
"key_type": keytype,
"password": base64.b64encode(password.encode()).decode(),
"ssid": base64.b64encode(ssid.encode()).decode(),
},
"time": self.internal_state["time"],
}

# The device does not respond to the request but changes the settings
# immediately which causes us to timeout.
# Thus, We limit retries and suppress the raised exception as useless.
try:
return await self.protocol.query({"set_qs_info": payload}, retry_count=0)
except SmartDeviceException as ex:
if ex.error_code: # Re-raise on device-reported errors
raise

_LOGGER.debug("Received an expected for wifi join, but this is expected")