From bec6760bcb01f785ee4a7e2d1ce2d2821c3832af Mon Sep 17 00:00:00 2001 From: Sergei Smolianinov Date: Sat, 11 Feb 2017 16:05:31 +0200 Subject: [PATCH 1/3] Add CQs management methods to the client --- influxdb/client.py | 92 ++++++++++++++++ influxdb/tests/client_test.py | 102 ++++++++++++++++++ .../server_tests/client_test_with_server.py | 28 +++++ 3 files changed, 222 insertions(+) diff --git a/influxdb/client.py b/influxdb/client.py index 0698c871..7c9af5d3 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -797,6 +797,98 @@ def get_list_privileges(self, username): text = "SHOW GRANTS FOR {0}".format(quote_ident(username)) return list(self.query(text).get_points()) + def get_list_continuous_queries(self): + """Get the list of continuous queries in InfluxDB. + + :return: all CQs in InfluxDB + :rtype: list of dictionaries + + :Example: + + :: + + >> cqs = client.get_list_cqs() + >> cqs + [ + { + u'db1': [] + }, + { + u'db2': [ + { + u'name': u'vampire', + u'query': u'CREATE CONTINUOUS QUERY vampire ON ' + 'mydb BEGIN SELECT count(dracula) INTO ' + 'mydb.autogen.all_of_them FROM ' + 'mydb.autogen.one GROUP BY time(5m) END' + } + ] + } + ] + """ + query_string = "SHOW CONTINUOUS QUERIES" + return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()] + + def create_continuous_query(self, name, select, database=None, + resample_opts=None): + r"""Create a continuous query for a database. + + :param name: the name of continuous query to create + :type name: str + :param select: select statement for the continuous query + :type select: str + :param database: the database for which the continuous query is + created. Defaults to current client's database + :type database: str + :param resample_opts: resample options + :type resample_opts: str + + :Example: + + :: + + >> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \ + ... 'FROM "cpu" GROUP BY time(1m)' + >> client.create_continuous_query( + ... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m' + ... ) + >> client.get_list_continuous_queries() + [ + { + 'db_name': [ + { + 'name': 'cpu_mean', + 'query': 'CREATE CONTINUOUS QUERY "cpu_mean" ' + 'ON "db_name" ' + 'RESAMPLE EVERY 10s FOR 2m ' + 'BEGIN SELECT mean("value") ' + 'INTO "cpu_mean" FROM "cpu" ' + 'GROUP BY time(1m) END' + } + ] + } + ] + """ + query_string = ( + "CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END" + ).format(quote_ident(name), quote_ident(database or self._database), + ' RESAMPLE ' + resample_opts if resample_opts else '', select) + self.query(query_string) + + def drop_continuous_query(self, name, database=None): + """Drop an existing continuous query for a database. + + :param name: the name of continuous query to drop + :type name: str + :param database: the database for which the continuous query is + dropped. Defaults to current client's database + :type database: str + """ + query_string = ( + "DROP CONTINUOUS QUERY {0} ON {1}" + ).format(quote_ident(name), quote_ident(database or self._database)) + self.query(query_string) + def send_packet(self, packet, protocol='json'): """Send an UDP packet. diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 0ba04f4a..0dc7c5f1 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -789,6 +789,108 @@ def test_get_list_privileges_fails(self): with _mocked_session(cli, 'get', 401): cli.get_list_privileges('test') + def test_get_list_continuous_queries(self): + data = { + "results": [ + { + "statement_id": 0, + "series": [ + { + "name": "testdb01", + "columns": ["name", "query"], + "values": [["testname01", "testquery01"], + ["testname02", "testquery02"]] + }, + { + "name": "testdb02", + "columns": ["name", "query"], + "values": [["testname03", "testquery03"]] + }, + { + "name": "testdb03", + "columns": ["name", "query"] + } + ] + } + ] + } + + with _mocked_session(self.cli, 'get', 200, json.dumps(data)): + self.assertListEqual( + self.cli.get_list_continuous_queries(), + [ + { + 'testdb01': [ + {'name': 'testname01', 'query': 'testquery01'}, + {'name': 'testname02', 'query': 'testquery02'} + ] + }, + { + 'testdb02': [ + {'name': 'testname03', 'query': 'testquery03'} + ] + }, + { + 'testdb03': [] + } + ] + ) + + @raises(Exception) + def test_get_list_continuous_queries_fails(self): + with _mocked_session(self.cli, 'get', 400): + self.cli.get_list_continuous_queries() + + def test_create_continuous_query(self): + data = {"results": [{}]} + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=json.dumps(data) + ) + query = 'SELECT count("value") INTO "6_months"."events" FROM ' \ + '"events" GROUP BY time(10m)' + self.cli.create_continuous_query('cq_name', query, 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" begin select ' + 'count("value") into "6_months"."events" from "events" group ' + 'by time(10m) end' + ) + self.cli.create_continuous_query('cq_name', query, 'db_name', + 'EVERY 10s FOR 2m') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" resample ' + 'every 10s for 2m begin select count("value") into ' + '"6_months"."events" from "events" group by time(10m) end' + ) + + @raises(Exception) + def test_create_continuous_query_fails(self): + with _mocked_session(self.cli, 'get', 400): + self.cli.create_continuous_query('cq_name', 'select', 'db_name') + + def test_drop_continuous_query(self): + data = {"results": [{}]} + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=json.dumps(data) + ) + self.cli.drop_continuous_query('cq_name', 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop continuous query "cq_name" on "db_name"' + ) + + @raises(Exception) + def test_drop_continuous_query_fails(self): + with _mocked_session(self.cli, 'get', 400): + self.cli.drop_continuous_query('cq_name', 'db_name') + def test_invalid_port_fails(self): with self.assertRaises(ValueError): InfluxDBClient('host', '80/redir', 'username', 'password') diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index d81054c9..d426cbad 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -595,6 +595,34 @@ def test_drop_retention_policy(self): rsp ) + def test_create_continuous_query(self): + self.cli.create_retention_policy('some_rp', '1d', 1) + query = 'select count("value") into "some_rp"."events" from ' \ + '"events" group by time(10m)' + self.cli.create_continuous_query('test_cq', query, 'db') + cqs = self.cli.get_list_continuous_queries() + expected_cqs = [ + { + 'db': [ + { + 'name': 'test_cq', + 'query': 'CREATE CONTINUOUS QUERY test_cq ON db ' + 'BEGIN SELECT count(value) INTO ' + 'db.some_rp.events FROM db.autogen.events ' + 'GROUP BY time(10m) END' + } + ] + } + ] + self.assertEqual(cqs, expected_cqs) + + def test_drop_continuous_query(self): + self.test_create_continuous_query() + self.cli.drop_continuous_query('test_cq', 'db') + cqs = self.cli.get_list_continuous_queries() + expected_cqs = [{'db': []}] + self.assertEqual(cqs, expected_cqs) + def test_issue_143(self): pt = partial(point, 'a_serie_name', timestamp='2015-03-30T16:16:37Z') pts = [ From ba14dfa85cb4fcb30b70609ed6d3b9329a6583de Mon Sep 17 00:00:00 2001 From: Sebastian Borza Date: Wed, 8 Apr 2020 16:13:55 -0500 Subject: [PATCH 2/3] chore(server_tests): update pep257 and flake8 commentary --- influxdb/tests/server_tests/client_test_with_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 184708f2..05110a23 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -637,6 +637,7 @@ def test_drop_retention_policy(self): ) def test_create_continuous_query(self): + """Test create a continuous query.""" self.cli.create_retention_policy('some_rp', '1d', 1) query = 'select count("value") into "some_rp"."events" from ' \ '"events" group by time(10m)' @@ -658,6 +659,7 @@ def test_create_continuous_query(self): self.assertEqual(cqs, expected_cqs) def test_drop_continuous_query(self): + """Test dropping a continuous query.""" self.test_create_continuous_query() self.cli.drop_continuous_query('test_cq', 'db') cqs = self.cli.get_list_continuous_queries() From 43a2270494924afab76466d9f26fc8f386e85a7d Mon Sep 17 00:00:00 2001 From: Sebastian Borza Date: Wed, 8 Apr 2020 16:16:07 -0500 Subject: [PATCH 3/3] chore(client_test): update comments based on pep257 and flake8 --- influxdb/tests/client_test.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 67b8411d..73a4f911 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -897,7 +897,7 @@ def test_revoke_privilege_invalid(self): self.cli.revoke_privilege('', 'testdb', 'test') def test_get_list_privileges(self): - """Tst get list of privs for TestInfluxDBClient object.""" + """Test get list of privs for TestInfluxDBClient object.""" data = {'results': [ {'series': [ {'columns': ['database', 'privilege'], @@ -924,6 +924,7 @@ def test_get_list_privileges_fails(self): cli.get_list_privileges('test') def test_get_list_continuous_queries(self): + """Test get list of continuous queries.""" data = { "results": [ { @@ -972,10 +973,12 @@ def test_get_list_continuous_queries(self): @raises(Exception) def test_get_list_continuous_queries_fails(self): + """Test failing get list of continuous queries.""" with _mocked_session(self.cli, 'get', 400): self.cli.get_list_continuous_queries() def test_create_continuous_query(self): + """Test create a continuous query.""" data = {"results": [{}]} with requests_mock.Mocker() as m: m.register_uri( @@ -1003,10 +1006,12 @@ def test_create_continuous_query(self): @raises(Exception) def test_create_continuous_query_fails(self): + """Test failing creation of continuous query.""" with _mocked_session(self.cli, 'get', 400): self.cli.create_continuous_query('cq_name', 'select', 'db_name') def test_drop_continuous_query(self): + """Test dropping a continuous query.""" data = {"results": [{}]} with requests_mock.Mocker() as m: m.register_uri( @@ -1022,6 +1027,7 @@ def test_drop_continuous_query(self): @raises(Exception) def test_drop_continuous_query_fails(self): + """Test failing drop of continuous query.""" with _mocked_session(self.cli, 'get', 400): self.cli.drop_continuous_query('cq_name', 'db_name')