Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit bec6760

Browse files
author
Sergei Smolianinov
committed
Add CQs management methods to the client
1 parent d1aa81a commit bec6760

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed

influxdb/client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,98 @@ def get_list_privileges(self, username):
797797
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
798798
return list(self.query(text).get_points())
799799

800+
def get_list_continuous_queries(self):
801+
"""Get the list of continuous queries in InfluxDB.
802+
803+
:return: all CQs in InfluxDB
804+
:rtype: list of dictionaries
805+
806+
:Example:
807+
808+
::
809+
810+
>> cqs = client.get_list_cqs()
811+
>> cqs
812+
[
813+
{
814+
u'db1': []
815+
},
816+
{
817+
u'db2': [
818+
{
819+
u'name': u'vampire',
820+
u'query': u'CREATE CONTINUOUS QUERY vampire ON '
821+
'mydb BEGIN SELECT count(dracula) INTO '
822+
'mydb.autogen.all_of_them FROM '
823+
'mydb.autogen.one GROUP BY time(5m) END'
824+
}
825+
]
826+
}
827+
]
828+
"""
829+
query_string = "SHOW CONTINUOUS QUERIES"
830+
return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
831+
832+
def create_continuous_query(self, name, select, database=None,
833+
resample_opts=None):
834+
r"""Create a continuous query for a database.
835+
836+
:param name: the name of continuous query to create
837+
:type name: str
838+
:param select: select statement for the continuous query
839+
:type select: str
840+
:param database: the database for which the continuous query is
841+
created. Defaults to current client's database
842+
:type database: str
843+
:param resample_opts: resample options
844+
:type resample_opts: str
845+
846+
:Example:
847+
848+
::
849+
850+
>> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
851+
... 'FROM "cpu" GROUP BY time(1m)'
852+
>> client.create_continuous_query(
853+
... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
854+
... )
855+
>> client.get_list_continuous_queries()
856+
[
857+
{
858+
'db_name': [
859+
{
860+
'name': 'cpu_mean',
861+
'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
862+
'ON "db_name" '
863+
'RESAMPLE EVERY 10s FOR 2m '
864+
'BEGIN SELECT mean("value") '
865+
'INTO "cpu_mean" FROM "cpu" '
866+
'GROUP BY time(1m) END'
867+
}
868+
]
869+
}
870+
]
871+
"""
872+
query_string = (
873+
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
874+
).format(quote_ident(name), quote_ident(database or self._database),
875+
' RESAMPLE ' + resample_opts if resample_opts else '', select)
876+
self.query(query_string)
877+
878+
def drop_continuous_query(self, name, database=None):
879+
"""Drop an existing continuous query for a database.
880+
881+
:param name: the name of continuous query to drop
882+
:type name: str
883+
:param database: the database for which the continuous query is
884+
dropped. Defaults to current client's database
885+
:type database: str
886+
"""
887+
query_string = (
888+
"DROP CONTINUOUS QUERY {0} ON {1}"
889+
).format(quote_ident(name), quote_ident(database or self._database))
890+
self.query(query_string)
891+
800892
def send_packet(self, packet, protocol='json'):
801893
"""Send an UDP packet.
802894

influxdb/tests/client_test.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,108 @@ def test_get_list_privileges_fails(self):
789789
with _mocked_session(cli, 'get', 401):
790790
cli.get_list_privileges('test')
791791

792+
def test_get_list_continuous_queries(self):
793+
data = {
794+
"results": [
795+
{
796+
"statement_id": 0,
797+
"series": [
798+
{
799+
"name": "testdb01",
800+
"columns": ["name", "query"],
801+
"values": [["testname01", "testquery01"],
802+
["testname02", "testquery02"]]
803+
},
804+
{
805+
"name": "testdb02",
806+
"columns": ["name", "query"],
807+
"values": [["testname03", "testquery03"]]
808+
},
809+
{
810+
"name": "testdb03",
811+
"columns": ["name", "query"]
812+
}
813+
]
814+
}
815+
]
816+
}
817+
818+
with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
819+
self.assertListEqual(
820+
self.cli.get_list_continuous_queries(),
821+
[
822+
{
823+
'testdb01': [
824+
{'name': 'testname01', 'query': 'testquery01'},
825+
{'name': 'testname02', 'query': 'testquery02'}
826+
]
827+
},
828+
{
829+
'testdb02': [
830+
{'name': 'testname03', 'query': 'testquery03'}
831+
]
832+
},
833+
{
834+
'testdb03': []
835+
}
836+
]
837+
)
838+
839+
@raises(Exception)
840+
def test_get_list_continuous_queries_fails(self):
841+
with _mocked_session(self.cli, 'get', 400):
842+
self.cli.get_list_continuous_queries()
843+
844+
def test_create_continuous_query(self):
845+
data = {"results": [{}]}
846+
with requests_mock.Mocker() as m:
847+
m.register_uri(
848+
requests_mock.GET,
849+
"http://localhost:8086/query",
850+
text=json.dumps(data)
851+
)
852+
query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
853+
'"events" GROUP BY time(10m)'
854+
self.cli.create_continuous_query('cq_name', query, 'db_name')
855+
self.assertEqual(
856+
m.last_request.qs['q'][0],
857+
'create continuous query "cq_name" on "db_name" begin select '
858+
'count("value") into "6_months"."events" from "events" group '
859+
'by time(10m) end'
860+
)
861+
self.cli.create_continuous_query('cq_name', query, 'db_name',
862+
'EVERY 10s FOR 2m')
863+
self.assertEqual(
864+
m.last_request.qs['q'][0],
865+
'create continuous query "cq_name" on "db_name" resample '
866+
'every 10s for 2m begin select count("value") into '
867+
'"6_months"."events" from "events" group by time(10m) end'
868+
)
869+
870+
@raises(Exception)
871+
def test_create_continuous_query_fails(self):
872+
with _mocked_session(self.cli, 'get', 400):
873+
self.cli.create_continuous_query('cq_name', 'select', 'db_name')
874+
875+
def test_drop_continuous_query(self):
876+
data = {"results": [{}]}
877+
with requests_mock.Mocker() as m:
878+
m.register_uri(
879+
requests_mock.GET,
880+
"http://localhost:8086/query",
881+
text=json.dumps(data)
882+
)
883+
self.cli.drop_continuous_query('cq_name', 'db_name')
884+
self.assertEqual(
885+
m.last_request.qs['q'][0],
886+
'drop continuous query "cq_name" on "db_name"'
887+
)
888+
889+
@raises(Exception)
890+
def test_drop_continuous_query_fails(self):
891+
with _mocked_session(self.cli, 'get', 400):
892+
self.cli.drop_continuous_query('cq_name', 'db_name')
893+
792894
def test_invalid_port_fails(self):
793895
with self.assertRaises(ValueError):
794896
InfluxDBClient('host', '80/redir', 'username', 'password')

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,34 @@ def test_drop_retention_policy(self):
595595
rsp
596596
)
597597

598+
def test_create_continuous_query(self):
599+
self.cli.create_retention_policy('some_rp', '1d', 1)
600+
query = 'select count("value") into "some_rp"."events" from ' \
601+
'"events" group by time(10m)'
602+
self.cli.create_continuous_query('test_cq', query, 'db')
603+
cqs = self.cli.get_list_continuous_queries()
604+
expected_cqs = [
605+
{
606+
'db': [
607+
{
608+
'name': 'test_cq',
609+
'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
610+
'BEGIN SELECT count(value) INTO '
611+
'db.some_rp.events FROM db.autogen.events '
612+
'GROUP BY time(10m) END'
613+
}
614+
]
615+
}
616+
]
617+
self.assertEqual(cqs, expected_cqs)
618+
619+
def test_drop_continuous_query(self):
620+
self.test_create_continuous_query()
621+
self.cli.drop_continuous_query('test_cq', 'db')
622+
cqs = self.cli.get_list_continuous_queries()
623+
expected_cqs = [{'db': []}]
624+
self.assertEqual(cqs, expected_cqs)
625+
598626
def test_issue_143(self):
599627
pt = partial(point, 'a_serie_name', timestamp='2015-03-30T16:16:37Z')
600628
pts = [

0 commit comments

Comments
 (0)