From 6dcb0796b5c9f6f13b9b3d0301a6215df7f41c22 Mon Sep 17 00:00:00 2001 From: Sergei Smolianinov Date: Sat, 11 Feb 2017 16:05:31 +0200 Subject: [PATCH 1/4] Add CQs management methods to the client --- influxdb/client.py | 92 ++++++++++++++++ influxdb/tests/client_test.py | 102 ++++++++++++++++++ .../server_tests/client_test_with_server.py | 28 +++++ 3 files changed, 222 insertions(+) diff --git a/influxdb/client.py b/influxdb/client.py index 8f8b14ae..f1683508 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -908,6 +908,98 @@ def get_list_privileges(self, username): text = "SHOW GRANTS FOR {0}".format(quote_ident(username)) return list(self.query(text).get_points()) + def get_list_continuous_queries(self): + """Get the list of continuous queries in InfluxDB. + + :return: all CQs in InfluxDB + :rtype: list of dictionaries + + :Example: + + :: + + >> cqs = client.get_list_cqs() + >> cqs + [ + { + u'db1': [] + }, + { + u'db2': [ + { + u'name': u'vampire', + u'query': u'CREATE CONTINUOUS QUERY vampire ON ' + 'mydb BEGIN SELECT count(dracula) INTO ' + 'mydb.autogen.all_of_them FROM ' + 'mydb.autogen.one GROUP BY time(5m) END' + } + ] + } + ] + """ + query_string = "SHOW CONTINUOUS QUERIES" + return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()] + + def create_continuous_query(self, name, select, database=None, + resample_opts=None): + r"""Create a continuous query for a database. + + :param name: the name of continuous query to create + :type name: str + :param select: select statement for the continuous query + :type select: str + :param database: the database for which the continuous query is + created. Defaults to current client's database + :type database: str + :param resample_opts: resample options + :type resample_opts: str + + :Example: + + :: + + >> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \ + ... 'FROM "cpu" GROUP BY time(1m)' + >> client.create_continuous_query( + ... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m' + ... ) + >> client.get_list_continuous_queries() + [ + { + 'db_name': [ + { + 'name': 'cpu_mean', + 'query': 'CREATE CONTINUOUS QUERY "cpu_mean" ' + 'ON "db_name" ' + 'RESAMPLE EVERY 10s FOR 2m ' + 'BEGIN SELECT mean("value") ' + 'INTO "cpu_mean" FROM "cpu" ' + 'GROUP BY time(1m) END' + } + ] + } + ] + """ + query_string = ( + "CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END" + ).format(quote_ident(name), quote_ident(database or self._database), + ' RESAMPLE ' + resample_opts if resample_opts else '', select) + self.query(query_string) + + def drop_continuous_query(self, name, database=None): + """Drop an existing continuous query for a database. + + :param name: the name of continuous query to drop + :type name: str + :param database: the database for which the continuous query is + dropped. Defaults to current client's database + :type database: str + """ + query_string = ( + "DROP CONTINUOUS QUERY {0} ON {1}" + ).format(quote_ident(name), quote_ident(database or self._database)) + self.query(query_string) + def send_packet(self, packet, protocol='json', time_precision=None): """Send an UDP packet. diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index e27eef17..4a6c9725 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1027,6 +1027,108 @@ def test_get_list_privileges_fails(self): with _mocked_session(cli, 'get', 401): cli.get_list_privileges('test') + def test_get_list_continuous_queries(self): + data = { + "results": [ + { + "statement_id": 0, + "series": [ + { + "name": "testdb01", + "columns": ["name", "query"], + "values": [["testname01", "testquery01"], + ["testname02", "testquery02"]] + }, + { + "name": "testdb02", + "columns": ["name", "query"], + "values": [["testname03", "testquery03"]] + }, + { + "name": "testdb03", + "columns": ["name", "query"] + } + ] + } + ] + } + + with _mocked_session(self.cli, 'get', 200, json.dumps(data)): + self.assertListEqual( + self.cli.get_list_continuous_queries(), + [ + { + 'testdb01': [ + {'name': 'testname01', 'query': 'testquery01'}, + {'name': 'testname02', 'query': 'testquery02'} + ] + }, + { + 'testdb02': [ + {'name': 'testname03', 'query': 'testquery03'} + ] + }, + { + 'testdb03': [] + } + ] + ) + + @raises(Exception) + def test_get_list_continuous_queries_fails(self): + with _mocked_session(self.cli, 'get', 400): + self.cli.get_list_continuous_queries() + + def test_create_continuous_query(self): + data = {"results": [{}]} + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=json.dumps(data) + ) + query = 'SELECT count("value") INTO "6_months"."events" FROM ' \ + '"events" GROUP BY time(10m)' + self.cli.create_continuous_query('cq_name', query, 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" begin select ' + 'count("value") into "6_months"."events" from "events" group ' + 'by time(10m) end' + ) + self.cli.create_continuous_query('cq_name', query, 'db_name', + 'EVERY 10s FOR 2m') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" resample ' + 'every 10s for 2m begin select count("value") into ' + '"6_months"."events" from "events" group by time(10m) end' + ) + + @raises(Exception) + def test_create_continuous_query_fails(self): + with _mocked_session(self.cli, 'get', 400): + self.cli.create_continuous_query('cq_name', 'select', 'db_name') + + def test_drop_continuous_query(self): + data = {"results": [{}]} + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=json.dumps(data) + ) + self.cli.drop_continuous_query('cq_name', 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop continuous query "cq_name" on "db_name"' + ) + + @raises(Exception) + def test_drop_continuous_query_fails(self): + with _mocked_session(self.cli, 'get', 400): + self.cli.drop_continuous_query('cq_name', 'db_name') + def test_invalid_port_fails(self): """Test invalid port fail for TestInfluxDBClient object.""" with self.assertRaises(ValueError): diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 4dbc1b75..26c09eec 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -720,6 +720,34 @@ def test_drop_retention_policy(self): rsp ) + def test_create_continuous_query(self): + self.cli.create_retention_policy('some_rp', '1d', 1) + query = 'select count("value") into "some_rp"."events" from ' \ + '"events" group by time(10m)' + self.cli.create_continuous_query('test_cq', query, 'db') + cqs = self.cli.get_list_continuous_queries() + expected_cqs = [ + { + 'db': [ + { + 'name': 'test_cq', + 'query': 'CREATE CONTINUOUS QUERY test_cq ON db ' + 'BEGIN SELECT count(value) INTO ' + 'db.some_rp.events FROM db.autogen.events ' + 'GROUP BY time(10m) END' + } + ] + } + ] + self.assertEqual(cqs, expected_cqs) + + def test_drop_continuous_query(self): + self.test_create_continuous_query() + self.cli.drop_continuous_query('test_cq', 'db') + cqs = self.cli.get_list_continuous_queries() + expected_cqs = [{'db': []}] + self.assertEqual(cqs, expected_cqs) + def test_issue_143(self): """Test for PR#143 from repo.""" pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z') From f98d9b7456e91cf6fa86af3d6ca7b4358fa1d0e9 Mon Sep 17 00:00:00 2001 From: Lukasz Dudek Date: Fri, 1 Mar 2019 16:08:37 +0100 Subject: [PATCH 2/4] Add docstrings of test cases. --- influxdb/tests/client_test.py | 19 +++++++++++++++++++ .../server_tests/client_test_with_server.py | 2 ++ 2 files changed, 21 insertions(+) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 4a6c9725..5dfb4263 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1028,6 +1028,9 @@ def test_get_list_privileges_fails(self): cli.get_list_privileges('test') def test_get_list_continuous_queries(self): + """ + Test getting a list of continuous queries by TestInfluxDBClient object. + """ data = { "results": [ { @@ -1076,10 +1079,17 @@ def test_get_list_continuous_queries(self): @raises(Exception) def test_get_list_continuous_queries_fails(self): + """ + Test failing to get a list of continuous queries by TestInfluxDBClient + object. + """ with _mocked_session(self.cli, 'get', 400): self.cli.get_list_continuous_queries() def test_create_continuous_query(self): + """ + Test continuous query creation with TestInfluxDBClient object. + """ data = {"results": [{}]} with requests_mock.Mocker() as m: m.register_uri( @@ -1107,10 +1117,16 @@ def test_create_continuous_query(self): @raises(Exception) def test_create_continuous_query_fails(self): + """ + Test failing to create a continuous query by TestInfluxDBClient object. + """ with _mocked_session(self.cli, 'get', 400): self.cli.create_continuous_query('cq_name', 'select', 'db_name') def test_drop_continuous_query(self): + """ + Test dropping a continuous query by TestInfluxDBClient object. + """ data = {"results": [{}]} with requests_mock.Mocker() as m: m.register_uri( @@ -1126,6 +1142,9 @@ def test_drop_continuous_query(self): @raises(Exception) def test_drop_continuous_query_fails(self): + """ + Test failing to drop a continuous query by TestInfluxDBClient object. + """ with _mocked_session(self.cli, 'get', 400): self.cli.drop_continuous_query('cq_name', 'db_name') diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 26c09eec..2d71a5f1 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -721,6 +721,7 @@ def test_drop_retention_policy(self): ) def test_create_continuous_query(self): + """Test continuous query creation.""" self.cli.create_retention_policy('some_rp', '1d', 1) query = 'select count("value") into "some_rp"."events" from ' \ '"events" group by time(10m)' @@ -742,6 +743,7 @@ def test_create_continuous_query(self): self.assertEqual(cqs, expected_cqs) def test_drop_continuous_query(self): + """Test continuous query drop.""" self.test_create_continuous_query() self.cli.drop_continuous_query('test_cq', 'db') cqs = self.cli.get_list_continuous_queries() From 711508fc222f6ac0ea1c8748ebc46573c49e5645 Mon Sep 17 00:00:00 2001 From: Lukasz Dudek Date: Fri, 1 Mar 2019 22:31:51 +0100 Subject: [PATCH 3/4] Switch to one-line docstrings for test cases. --- influxdb/tests/client_test.py | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 5dfb4263..e1a30b81 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1028,9 +1028,7 @@ def test_get_list_privileges_fails(self): cli.get_list_privileges('test') def test_get_list_continuous_queries(self): - """ - Test getting a list of continuous queries by TestInfluxDBClient object. - """ + """Test getting a list of continuous queries.""" data = { "results": [ { @@ -1079,17 +1077,12 @@ def test_get_list_continuous_queries(self): @raises(Exception) def test_get_list_continuous_queries_fails(self): - """ - Test failing to get a list of continuous queries by TestInfluxDBClient - object. - """ + """Test failing to get a list of continuous queries.""" with _mocked_session(self.cli, 'get', 400): self.cli.get_list_continuous_queries() def test_create_continuous_query(self): - """ - Test continuous query creation with TestInfluxDBClient object. - """ + """Test continuous query creation.""" data = {"results": [{}]} with requests_mock.Mocker() as m: m.register_uri( @@ -1117,16 +1110,12 @@ def test_create_continuous_query(self): @raises(Exception) def test_create_continuous_query_fails(self): - """ - Test failing to create a continuous query by TestInfluxDBClient object. - """ + """Test failing to create a continuous query.""" with _mocked_session(self.cli, 'get', 400): self.cli.create_continuous_query('cq_name', 'select', 'db_name') def test_drop_continuous_query(self): - """ - Test dropping a continuous query by TestInfluxDBClient object. - """ + """Test dropping a continuous query.""" data = {"results": [{}]} with requests_mock.Mocker() as m: m.register_uri( @@ -1142,9 +1131,7 @@ def test_drop_continuous_query(self): @raises(Exception) def test_drop_continuous_query_fails(self): - """ - Test failing to drop a continuous query by TestInfluxDBClient object. - """ + """Test failing to drop a continuous query.""" with _mocked_session(self.cli, 'get', 400): self.cli.drop_continuous_query('cq_name', 'db_name') From 734b7ef2fb781e507afec834a74b1fbf4abfa130 Mon Sep 17 00:00:00 2001 From: Matthew McGinn Date: Mon, 1 Apr 2019 11:55:23 -0500 Subject: [PATCH 4/4] Add CHANGELOG entry for PR #681 Signed-off-by: Matthew McGinn --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d18d5bc4..9ca24a17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +- Add `get_list_continuous_queries`, `drop_continuous_query`, and `create_continuous_query` management methods for + continuous queries (#681 thx @lukaszdudek-silvair) ### Changed - Update test suite to add support for Python 3.7 and InfluxDB v1.6.4 and 1.7.4 (#692 thx @clslgrnc)