Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit f93a9a0

Browse files
author
Sergei Smolianinov
committed
Add CQs management methods to the client
1 parent c9fcede commit f93a9a0

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed

influxdb/client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,98 @@ def get_list_privileges(self, username):
767767
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
768768
return list(self.query(text).get_points())
769769

770+
def get_list_continuous_queries(self):
771+
"""Get the list of continuous queries in InfluxDB.
772+
773+
:return: all CQs in InfluxDB
774+
:rtype: list of dictionaries
775+
776+
:Example:
777+
778+
::
779+
780+
>> cqs = client.get_list_cqs()
781+
>> cqs
782+
[
783+
{
784+
u'db1': []
785+
},
786+
{
787+
u'db2': [
788+
{
789+
u'name': u'vampire',
790+
u'query': u'CREATE CONTINUOUS QUERY vampire ON '
791+
'mydb BEGIN SELECT count(dracula) INTO '
792+
'mydb.autogen.all_of_them FROM '
793+
'mydb.autogen.one GROUP BY time(5m) END'
794+
}
795+
]
796+
}
797+
]
798+
"""
799+
query_string = "SHOW CONTINUOUS QUERIES"
800+
return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
801+
802+
def create_continuous_query(self, name, select, database=None,
803+
resample_opts=None):
804+
"""Create a continuous query for a database.
805+
806+
:param name: the name of continuous query to create
807+
:type name: str
808+
:param select: select statement for the continuous query
809+
:type select: str
810+
:param database: the database for which the continuous query is
811+
created. Defaults to current client's database
812+
:type database: str
813+
:param resample_opts: resample options
814+
:type resample_opts: str
815+
816+
:Example:
817+
818+
::
819+
820+
>> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
821+
... 'FROM "cpu" GROUP BY time(1m)'
822+
>> client.create_continuous_query(
823+
... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
824+
... )
825+
>> client.get_list_continuous_queries()
826+
[
827+
{
828+
'db_name': [
829+
{
830+
'name': 'cpu_mean',
831+
'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
832+
'ON "db_name" '
833+
'RESAMPLE EVERY 10s FOR 2m '
834+
'BEGIN SELECT mean("value") '
835+
'INTO "cpu_mean" FROM "cpu" '
836+
'GROUP BY time(1m) END'
837+
}
838+
]
839+
}
840+
]
841+
"""
842+
query_string = (
843+
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
844+
).format(quote_ident(name), quote_ident(database or self._database),
845+
' RESAMPLE ' + resample_opts if resample_opts else '', select)
846+
self.query(query_string)
847+
848+
def drop_continuous_query(self, name, database=None):
849+
"""Drop an existing continuous query for a database.
850+
851+
:param name: the name of continuous query to drop
852+
:type name: str
853+
:param database: the database for which the continuous query is
854+
dropped. Defaults to current client's database
855+
:type database: str
856+
"""
857+
query_string = (
858+
"DROP CONTINUOUS QUERY {0} ON {1}"
859+
).format(quote_ident(name), quote_ident(database or self._database))
860+
self.query(query_string)
861+
770862
def send_packet(self, packet, protocol='json'):
771863
"""Send an UDP packet.
772864

influxdb/tests/client_test.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,108 @@ def test_get_list_privileges_fails(self):
788788
with _mocked_session(cli, 'get', 401):
789789
cli.get_list_privileges('test')
790790

791+
def test_get_list_continuous_queries(self):
792+
data = {
793+
"results": [
794+
{
795+
"statement_id": 0,
796+
"series": [
797+
{
798+
"name": "testdb01",
799+
"columns": ["name", "query"],
800+
"values": [["testname01", "testquery01"],
801+
["testname02", "testquery02"]]
802+
},
803+
{
804+
"name": "testdb02",
805+
"columns": ["name", "query"],
806+
"values": [["testname03", "testquery03"]]
807+
},
808+
{
809+
"name": "testdb03",
810+
"columns": ["name", "query"]
811+
}
812+
]
813+
}
814+
]
815+
}
816+
817+
with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
818+
self.assertListEqual(
819+
self.cli.get_list_continuous_queries(),
820+
[
821+
{
822+
'testdb01': [
823+
{'name': 'testname01', 'query': 'testquery01'},
824+
{'name': 'testname02', 'query': 'testquery02'}
825+
]
826+
},
827+
{
828+
'testdb02': [
829+
{'name': 'testname03', 'query': 'testquery03'}
830+
]
831+
},
832+
{
833+
'testdb03': []
834+
}
835+
]
836+
)
837+
838+
@raises(Exception)
839+
def test_get_list_continuous_queries_fails(self):
840+
with _mocked_session(self.cli, 'get', 400):
841+
self.cli.get_list_continuous_queries()
842+
843+
def test_create_continuous_query(self):
844+
data = {"results": [{}]}
845+
with requests_mock.Mocker() as m:
846+
m.register_uri(
847+
requests_mock.GET,
848+
"http://localhost:8086/query",
849+
text=json.dumps(data)
850+
)
851+
query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
852+
'"events" GROUP BY time(10m)'
853+
self.cli.create_continuous_query('cq_name', query, 'db_name')
854+
self.assertEqual(
855+
m.last_request.qs['q'][0],
856+
'create continuous query "cq_name" on "db_name" begin select '
857+
'count("value") into "6_months"."events" from "events" group '
858+
'by time(10m) end'
859+
)
860+
self.cli.create_continuous_query('cq_name', query, 'db_name',
861+
'EVERY 10s FOR 2m')
862+
self.assertEqual(
863+
m.last_request.qs['q'][0],
864+
'create continuous query "cq_name" on "db_name" resample '
865+
'every 10s for 2m begin select count("value") into '
866+
'"6_months"."events" from "events" group by time(10m) end'
867+
)
868+
869+
@raises(Exception)
870+
def test_create_continuous_query_fails(self):
871+
with _mocked_session(self.cli, 'get', 400):
872+
self.cli.create_continuous_query('cq_name', 'select', 'db_name')
873+
874+
def test_drop_continuous_query(self):
875+
data = {"results": [{}]}
876+
with requests_mock.Mocker() as m:
877+
m.register_uri(
878+
requests_mock.GET,
879+
"http://localhost:8086/query",
880+
text=json.dumps(data)
881+
)
882+
self.cli.drop_continuous_query('cq_name', 'db_name')
883+
self.assertEqual(
884+
m.last_request.qs['q'][0],
885+
'drop continuous query "cq_name" on "db_name"'
886+
)
887+
888+
@raises(Exception)
889+
def test_drop_continuous_query_fails(self):
890+
with _mocked_session(self.cli, 'get', 400):
891+
self.cli.drop_continuous_query('cq_name', 'db_name')
892+
791893
def test_invalid_port_fails(self):
792894
with self.assertRaises(ValueError):
793895
InfluxDBClient('host', '80/redir', 'username', 'password')

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,34 @@ def test_drop_retention_policy(self):
595595
rsp
596596
)
597597

598+
def test_create_continuous_query(self):
599+
self.cli.create_retention_policy('some_rp', '1d', 1)
600+
query = 'select count("value") into "some_rp"."events" from ' \
601+
'"events" group by time(10m)'
602+
self.cli.create_continuous_query('test_cq', query, 'db')
603+
cqs = self.cli.get_list_continuous_queries()
604+
expected_cqs = [
605+
{
606+
'db': [
607+
{
608+
'name': 'test_cq',
609+
'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
610+
'BEGIN SELECT count(value) INTO '
611+
'db.some_rp.events FROM db.autogen.events '
612+
'GROUP BY time(10m) END'
613+
}
614+
]
615+
}
616+
]
617+
self.assertEqual(cqs, expected_cqs)
618+
619+
def test_drop_continuous_query(self):
620+
self.test_create_continuous_query()
621+
self.cli.drop_continuous_query('test_cq', 'db')
622+
cqs = self.cli.get_list_continuous_queries()
623+
expected_cqs = [{'db': []}]
624+
self.assertEqual(cqs, expected_cqs)
625+
598626
def test_issue_143(self):
599627
pt = partial(point, 'a_serie_name', timestamp='2015-03-30T16:16:37Z')
600628
pts = [

0 commit comments

Comments
 (0)