-
-
Notifications
You must be signed in to change notification settings - Fork 226
Add event listener to smartcam #1388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
56261e6
99e8a2f
6b41170
d201e59
f4857ef
8b4f3ed
3eb85b8
ed98d58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
"""Module for cli listen commands.""" | ||
|
||
import asyncio | ||
from contextlib import suppress | ||
from typing import cast | ||
|
||
import asyncclick as click | ||
|
||
from kasa import ( | ||
Credentials, | ||
Device, | ||
) | ||
from kasa.eventtype import EventType | ||
|
||
from .common import echo, error, pass_dev_or_child | ||
|
||
|
||
async def wait_on_keyboard_interrupt(msg: str): | ||
"""Non loop blocking get input.""" | ||
echo(msg + ", press Ctrl-C to cancel\n") | ||
|
||
with suppress(asyncio.CancelledError): | ||
await asyncio.Event().wait() | ||
|
||
|
||
@click.command() | ||
@click.option( | ||
"--cam-username", | ||
required=True, | ||
envvar="KASA_CAMERA_USERNAME", | ||
help="Camera account username address to authenticate to device.", | ||
) | ||
@click.option( | ||
"--cam-password", | ||
required=True, | ||
envvar="KASA_CAMERA_PASSWORD", | ||
help="Camera account password to use to authenticate to device.", | ||
) | ||
@click.option( | ||
"--listen-port", | ||
default=None, | ||
required=False, | ||
envvar="KASA_LISTEN_PORT", | ||
help="Port to listen on for onvif notifications.", | ||
) | ||
@click.option( | ||
"--listen-ip", | ||
default=None, | ||
required=False, | ||
envvar="KASA_LISTEN_IP", | ||
help="Ip address to listen on for onvif notifications.", | ||
) | ||
@click.option( | ||
"-et", | ||
"--event-types", | ||
default=None, | ||
required=False, | ||
multiple=True, | ||
type=click.Choice([et for et in EventType], case_sensitive=False), | ||
help="Event types to listen to.", | ||
) | ||
@pass_dev_or_child | ||
async def listen( | ||
dev: Device, | ||
cam_username: str, | ||
cam_password: str, | ||
listen_port: int | None, | ||
listen_ip: str | None, | ||
event_types: list[EventType] | None, | ||
) -> None: | ||
"""Listen for events like motion, triggers or alarms.""" | ||
try: | ||
import onvif # type: ignore[import-untyped] # noqa: F401 | ||
except ImportError: | ||
error("python-kasa must be installed with onvif extra for listen.") | ||
|
||
from kasa.smartcam.modules.onviflisten import OnvifListen | ||
|
||
listen: OnvifListen = cast(OnvifListen, dev.modules.get(OnvifListen._module_name())) | ||
if not listen: | ||
error(f"Device {dev.host} does not support listening for events.") | ||
|
||
def on_event(event: EventType) -> None: | ||
echo(f"Device {dev.host} received event {event}") | ||
|
||
creds = Credentials(cam_username, cam_password) | ||
await listen.listen( | ||
on_event, | ||
creds, | ||
listen_ip=listen_ip, | ||
listen_port=listen_port, | ||
event_types=event_types, | ||
) | ||
|
||
msg = f"Listening for events on {listen.listening_address}" | ||
|
||
await wait_on_keyboard_interrupt(msg) | ||
|
||
echo("\nStopping listener") | ||
await listen.stop() | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,6 +71,7 @@ def _legacy_type_to_class(_type: str) -> Any: | |
"device": None, | ||
"feature": None, | ||
"light": None, | ||
"listen": None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about a new sub group "camera", which would contain this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above I think we should keep this generic so we can add other |
||
"wifi": None, | ||
"time": None, | ||
"schedule": None, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
"""Module for listen event types.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want to keep this inside smartcam for now, as the types are very onvif-specific(?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to have this at the top level so we can add other |
||
|
||
from enum import StrEnum, auto | ||
|
||
|
||
class EventType(StrEnum): | ||
"""Listen event types.""" | ||
|
||
MOTION_DETECTED = auto() | ||
PERSON_DETECTED = auto() | ||
TAMPER_DETECTED = auto() | ||
BABY_CRY_DETECTED = auto() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
"""Implementation of motion detection module.""" | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
import logging | ||
import os | ||
import socket | ||
import uuid | ||
from collections.abc import Callable, Iterable | ||
from datetime import timedelta | ||
|
||
import onvif # type: ignore[import-untyped] | ||
from aiohttp import web | ||
from onvif.managers import NotificationManager # type: ignore[import-untyped] | ||
|
||
from ...credentials import Credentials | ||
from ...eventtype import EventType | ||
from ...exceptions import KasaException | ||
from ..smartcammodule import SmartCamModule | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
logging.getLogger("aiohttp").setLevel(logging.WARNING) | ||
logging.getLogger("httpx").setLevel(logging.WARNING) | ||
|
||
DEFAULT_LISTEN_PORT = 28002 | ||
|
||
|
||
TOPIC_EVENT_TYPE = { | ||
"tns1:RuleEngine/CellMotionDetector/Motion": EventType.MOTION_DETECTED, | ||
"tns1:RuleEngine/CellMotionDetector/People": EventType.PERSON_DETECTED, | ||
"tns1:RuleEngine/TamperDetector/Tamper": EventType.TAMPER_DETECTED, | ||
} | ||
|
||
|
||
class OnvifListen(SmartCamModule): | ||
"""Implementation of lens mask module.""" | ||
|
||
manager: NotificationManager | ||
callback: Callable[[EventType], None] | ||
event_types: Iterable[EventType] | None | ||
listening = False | ||
site: web.TCPSite | ||
runner: web.AppRunner | ||
instance_id: str | ||
path: str | ||
_listening_address: str | None = None | ||
|
||
@property | ||
def listening_address(self) -> str | None: | ||
"""Address the listener is receiving onvif notifications on. | ||
|
||
Or None if not listening. | ||
""" | ||
return self._listening_address | ||
|
||
async def _invoke_callback(self, event: EventType) -> None: | ||
self.callback(event) | ||
|
||
async def _handle_event(self, request: web.Request) -> web.Response: | ||
content = await request.read() | ||
result = self.manager.process(content) | ||
for msg in result.NotificationMessage: | ||
_LOGGER.debug( | ||
"Received notification message for %s: %s", | ||
self._device.host, | ||
msg, | ||
) | ||
if (event := TOPIC_EVENT_TYPE.get(msg.Topic._value_1)) and ( | ||
(not self.event_types or event in self.event_types) | ||
and (simple_items := msg.Message._value_1.Data.SimpleItem) | ||
and simple_items[0].Value == "true" | ||
): | ||
asyncio.create_task(self._invoke_callback(event)) | ||
return web.Response() | ||
|
||
async def listen( | ||
self, | ||
callback: Callable[[EventType], None], | ||
camera_credentials: Credentials, | ||
*, | ||
event_types: Iterable[EventType] | None = None, | ||
listen_ip: str | None = None, | ||
listen_port: int | None = None, | ||
) -> None: | ||
"""Start listening for events.""" | ||
self.callback = callback | ||
self.event_types = event_types | ||
self.instance_id = str(uuid.uuid4()) | ||
self.path = f"/{self._device.host}/{self.instance_id}/" | ||
|
||
if listen_port is None: | ||
listen_port = DEFAULT_LISTEN_PORT | ||
|
||
def subscription_lost() -> None: | ||
_LOGGER.debug("Notification subscription lost for %s", self._device.host) | ||
asyncio.create_task(self.stop()) | ||
|
||
wsdl = f"{os.path.dirname(onvif.__file__)}/wsdl/" | ||
|
||
mycam = onvif.ONVIFCamera( | ||
self._device.host, | ||
2020, | ||
camera_credentials.username, | ||
camera_credentials.password, | ||
wsdl, | ||
) | ||
await mycam.update_xaddrs() | ||
|
||
host_port = await self._start_server(listen_ip, listen_port) | ||
|
||
self.manager = await mycam.create_notification_manager( | ||
address=host_port + self.path, | ||
interval=timedelta(minutes=10), | ||
subscription_lost_callback=subscription_lost, | ||
) | ||
|
||
self._listening_address = host_port | ||
self.listening = True | ||
_LOGGER.debug("Listener started for %s", self._device.host) | ||
|
||
async def stop(self) -> None: | ||
"""Stop the listener.""" | ||
if not self.listening: | ||
_LOGGER.debug("Listener for %s already stopped", self._device.host) | ||
return | ||
|
||
_LOGGER.debug("Stopping listener for %s", self._device.host) | ||
self.listening = False | ||
self._listening_address = None | ||
await self.site.stop() | ||
await self.runner.shutdown() | ||
|
||
async def _get_host_ip(self) -> str: | ||
def get_ip() -> str: | ||
# From https://stackoverflow.com/a/28950776 | ||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
s.settimeout(0) | ||
try: | ||
# doesn't even have to be reachable | ||
s.connect(("10.254.254.254", 1)) | ||
ip = s.getsockname()[0] | ||
finally: | ||
s.close() | ||
return ip | ||
|
||
loop = asyncio.get_running_loop() | ||
return await loop.run_in_executor(None, get_ip) | ||
|
||
async def _start_server(self, listen_ip: str | None, listen_port: int) -> str: | ||
app = web.Application() | ||
app.add_routes([web.post(self.path, self._handle_event)]) | ||
|
||
self.runner = web.AppRunner(app) | ||
await self.runner.setup() | ||
|
||
if not listen_ip: | ||
try: | ||
listen_ip = await self._get_host_ip() | ||
except Exception as ex: | ||
raise KasaException( | ||
"Unable to determine listen ip starting " | ||
f"listener for {self._device.host}", | ||
ex, | ||
) from ex | ||
|
||
self.site = web.TCPSite(self.runner, listen_ip, listen_port) | ||
try: | ||
await self.site.start() | ||
except Exception: | ||
_LOGGER.exception( | ||
"Error trying to start listener for %s: ", self._device.host | ||
) | ||
|
||
_LOGGER.debug( | ||
"Listen handler for %s running on %s:%s", | ||
self._device.host, | ||
listen_ip, | ||
listen_port, | ||
) | ||
|
||
return f"http://{listen_ip}:{listen_port}" | ||
Uh oh!
There was an error while loading. Please reload this page.