Skip to content

Commit 73503b5

Browse files
lukaszdudek-silvairxginn8
authored andcommitted
Add CQs management methods to the client (influxdata#681)
* Add CQs management methods to the client
1 parent afcfd25 commit 73503b5

File tree

4 files changed

+232
-0
lines changed

4 files changed

+232
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
77
## [Unreleased]
88

99
### Added
10+
- Add `get_list_continuous_queries`, `drop_continuous_query`, and `create_continuous_query` management methods for
11+
continuous queries (#681 thx @lukaszdudek-silvair)
1012
- query() now accepts a bind_params argument for parameter binding (#678 thx @clslgrnc)
1113

1214
### Changed

influxdb/client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,98 @@ def get_list_privileges(self, username):
926926
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
927927
return list(self.query(text).get_points())
928928

929+
def get_list_continuous_queries(self):
930+
"""Get the list of continuous queries in InfluxDB.
931+
932+
:return: all CQs in InfluxDB
933+
:rtype: list of dictionaries
934+
935+
:Example:
936+
937+
::
938+
939+
>> cqs = client.get_list_cqs()
940+
>> cqs
941+
[
942+
{
943+
u'db1': []
944+
},
945+
{
946+
u'db2': [
947+
{
948+
u'name': u'vampire',
949+
u'query': u'CREATE CONTINUOUS QUERY vampire ON '
950+
'mydb BEGIN SELECT count(dracula) INTO '
951+
'mydb.autogen.all_of_them FROM '
952+
'mydb.autogen.one GROUP BY time(5m) END'
953+
}
954+
]
955+
}
956+
]
957+
"""
958+
query_string = "SHOW CONTINUOUS QUERIES"
959+
return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
960+
961+
def create_continuous_query(self, name, select, database=None,
962+
resample_opts=None):
963+
r"""Create a continuous query for a database.
964+
965+
:param name: the name of continuous query to create
966+
:type name: str
967+
:param select: select statement for the continuous query
968+
:type select: str
969+
:param database: the database for which the continuous query is
970+
created. Defaults to current client's database
971+
:type database: str
972+
:param resample_opts: resample options
973+
:type resample_opts: str
974+
975+
:Example:
976+
977+
::
978+
979+
>> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
980+
... 'FROM "cpu" GROUP BY time(1m)'
981+
>> client.create_continuous_query(
982+
... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
983+
... )
984+
>> client.get_list_continuous_queries()
985+
[
986+
{
987+
'db_name': [
988+
{
989+
'name': 'cpu_mean',
990+
'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
991+
'ON "db_name" '
992+
'RESAMPLE EVERY 10s FOR 2m '
993+
'BEGIN SELECT mean("value") '
994+
'INTO "cpu_mean" FROM "cpu" '
995+
'GROUP BY time(1m) END'
996+
}
997+
]
998+
}
999+
]
1000+
"""
1001+
query_string = (
1002+
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
1003+
).format(quote_ident(name), quote_ident(database or self._database),
1004+
' RESAMPLE ' + resample_opts if resample_opts else '', select)
1005+
self.query(query_string)
1006+
1007+
def drop_continuous_query(self, name, database=None):
1008+
"""Drop an existing continuous query for a database.
1009+
1010+
:param name: the name of continuous query to drop
1011+
:type name: str
1012+
:param database: the database for which the continuous query is
1013+
dropped. Defaults to current client's database
1014+
:type database: str
1015+
"""
1016+
query_string = (
1017+
"DROP CONTINUOUS QUERY {0} ON {1}"
1018+
).format(quote_ident(name), quote_ident(database or self._database))
1019+
self.query(query_string)
1020+
9291021
def send_packet(self, packet, protocol='json', time_precision=None):
9301022
"""Send an UDP packet.
9311023

influxdb/tests/client_test.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,114 @@ def test_get_list_privileges_fails(self):
10271027
with _mocked_session(cli, 'get', 401):
10281028
cli.get_list_privileges('test')
10291029

1030+
def test_get_list_continuous_queries(self):
1031+
"""Test getting a list of continuous queries."""
1032+
data = {
1033+
"results": [
1034+
{
1035+
"statement_id": 0,
1036+
"series": [
1037+
{
1038+
"name": "testdb01",
1039+
"columns": ["name", "query"],
1040+
"values": [["testname01", "testquery01"],
1041+
["testname02", "testquery02"]]
1042+
},
1043+
{
1044+
"name": "testdb02",
1045+
"columns": ["name", "query"],
1046+
"values": [["testname03", "testquery03"]]
1047+
},
1048+
{
1049+
"name": "testdb03",
1050+
"columns": ["name", "query"]
1051+
}
1052+
]
1053+
}
1054+
]
1055+
}
1056+
1057+
with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
1058+
self.assertListEqual(
1059+
self.cli.get_list_continuous_queries(),
1060+
[
1061+
{
1062+
'testdb01': [
1063+
{'name': 'testname01', 'query': 'testquery01'},
1064+
{'name': 'testname02', 'query': 'testquery02'}
1065+
]
1066+
},
1067+
{
1068+
'testdb02': [
1069+
{'name': 'testname03', 'query': 'testquery03'}
1070+
]
1071+
},
1072+
{
1073+
'testdb03': []
1074+
}
1075+
]
1076+
)
1077+
1078+
@raises(Exception)
1079+
def test_get_list_continuous_queries_fails(self):
1080+
"""Test failing to get a list of continuous queries."""
1081+
with _mocked_session(self.cli, 'get', 400):
1082+
self.cli.get_list_continuous_queries()
1083+
1084+
def test_create_continuous_query(self):
1085+
"""Test continuous query creation."""
1086+
data = {"results": [{}]}
1087+
with requests_mock.Mocker() as m:
1088+
m.register_uri(
1089+
requests_mock.GET,
1090+
"http://localhost:8086/query",
1091+
text=json.dumps(data)
1092+
)
1093+
query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
1094+
'"events" GROUP BY time(10m)'
1095+
self.cli.create_continuous_query('cq_name', query, 'db_name')
1096+
self.assertEqual(
1097+
m.last_request.qs['q'][0],
1098+
'create continuous query "cq_name" on "db_name" begin select '
1099+
'count("value") into "6_months"."events" from "events" group '
1100+
'by time(10m) end'
1101+
)
1102+
self.cli.create_continuous_query('cq_name', query, 'db_name',
1103+
'EVERY 10s FOR 2m')
1104+
self.assertEqual(
1105+
m.last_request.qs['q'][0],
1106+
'create continuous query "cq_name" on "db_name" resample '
1107+
'every 10s for 2m begin select count("value") into '
1108+
'"6_months"."events" from "events" group by time(10m) end'
1109+
)
1110+
1111+
@raises(Exception)
1112+
def test_create_continuous_query_fails(self):
1113+
"""Test failing to create a continuous query."""
1114+
with _mocked_session(self.cli, 'get', 400):
1115+
self.cli.create_continuous_query('cq_name', 'select', 'db_name')
1116+
1117+
def test_drop_continuous_query(self):
1118+
"""Test dropping a continuous query."""
1119+
data = {"results": [{}]}
1120+
with requests_mock.Mocker() as m:
1121+
m.register_uri(
1122+
requests_mock.GET,
1123+
"http://localhost:8086/query",
1124+
text=json.dumps(data)
1125+
)
1126+
self.cli.drop_continuous_query('cq_name', 'db_name')
1127+
self.assertEqual(
1128+
m.last_request.qs['q'][0],
1129+
'drop continuous query "cq_name" on "db_name"'
1130+
)
1131+
1132+
@raises(Exception)
1133+
def test_drop_continuous_query_fails(self):
1134+
"""Test failing to drop a continuous query."""
1135+
with _mocked_session(self.cli, 'get', 400):
1136+
self.cli.drop_continuous_query('cq_name', 'db_name')
1137+
10301138
def test_invalid_port_fails(self):
10311139
"""Test invalid port fail for TestInfluxDBClient object."""
10321140
with self.assertRaises(ValueError):

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,36 @@ def test_drop_retention_policy(self):
722722
rsp
723723
)
724724

725+
def test_create_continuous_query(self):
726+
"""Test continuous query creation."""
727+
self.cli.create_retention_policy('some_rp', '1d', 1)
728+
query = 'select count("value") into "some_rp"."events" from ' \
729+
'"events" group by time(10m)'
730+
self.cli.create_continuous_query('test_cq', query, 'db')
731+
cqs = self.cli.get_list_continuous_queries()
732+
expected_cqs = [
733+
{
734+
'db': [
735+
{
736+
'name': 'test_cq',
737+
'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
738+
'BEGIN SELECT count(value) INTO '
739+
'db.some_rp.events FROM db.autogen.events '
740+
'GROUP BY time(10m) END'
741+
}
742+
]
743+
}
744+
]
745+
self.assertEqual(cqs, expected_cqs)
746+
747+
def test_drop_continuous_query(self):
748+
"""Test continuous query drop."""
749+
self.test_create_continuous_query()
750+
self.cli.drop_continuous_query('test_cq', 'db')
751+
cqs = self.cli.get_list_continuous_queries()
752+
expected_cqs = [{'db': []}]
753+
self.assertEqual(cqs, expected_cqs)
754+
725755
def test_issue_143(self):
726756
"""Test for PR#143 from repo."""
727757
pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z')

0 commit comments

Comments
 (0)