From a9cd3d676f122c0ec144b8aedddf9a87874c8a20 Mon Sep 17 00:00:00 2001 From: Amy Bowersox Date: Wed, 10 Aug 2022 10:23:36 -0600 Subject: [PATCH] some initial code for EDR Live Query --- src/cbapi/response/livequery.py | 70 +++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 src/cbapi/response/livequery.py diff --git a/src/cbapi/response/livequery.py b/src/cbapi/response/livequery.py new file mode 100644 index 00000000..530948e1 --- /dev/null +++ b/src/cbapi/response/livequery.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python + +"""CBAPI Live Query implementation""" + +from cbapi.errors import ApiError + + +class LiveQuery: + def __init__(self, cb): + self._cb = cb + self._request_body = {"query": ""} + self._cached_results = None + + @classmethod + def enable(cls, cb, flag=True): + cb.put_object("/api/v1/config_mgmt/LiveQueryEnabled", {"LiveQueryEnabled": {"value": flag}}) + + @classmethod + def disable(cls, cb): + LiveQuery.enable(cb, False) + + @classmethod + def _query_implementation(cls, cb): + return LiveQuery(cb) + + def where(self, **kwargs): + param = kwargs.pop("sql", None) + if param: + self.set_query(param) + param = kwargs.pop("group_ids", None) + if param: + self.set_group_ids(param) + param = kwargs.pop("sensor_ids", None) + if param: + self.set_sensor_ids(param) + return self + + def set_query(self, query): + self._request_body["query"] = query + self._cached_results = None + return self + + def set_group_ids(self, ids): + if isinstance(ids, list): + if not all([isinstance(value, int) for value in ids]): + raise ApiError("group IDs must all be integer") + self._request_body["group_ids"] = ids + elif isinstance(ids, int): + self._request_body["group_ids"] = [ids] + else: + raise ApiError("group IDs must be either int or list of ints") + self._cached_results = None + return self + + def set_sensor_ids(self, ids): + if isinstance(ids, list): + if not all([isinstance(value, int) for value in ids]): + raise ApiError("sensor IDs must all be integer") + self._request_body["sensor_ids"] = ids + elif isinstance(ids, int): + self._request_body["sensor_ids"] = [ids] + else: + raise ApiError("sensor IDs must be either int or list of ints") + self._cached_results = None + return self + + def _execute(self): + if not self._cached_results: + self._cached_results = self._cb.post_object("/api/v1/livequery/query", body=self._request_body) + return self._cached_results