Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Add CQs management methods to the client #681

Merged
merged 5 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
- Add `get_list_continuous_queries`, `drop_continuous_query`, and `create_continuous_query` management methods for
continuous queries (#681 thx @lukaszdudek-silvair)
- query() now accepts a bind_params argument for parameter binding (#678 thx @clslgrnc)

### Changed
Expand Down
92 changes: 92 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,98 @@ def get_list_privileges(self, username):
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
return list(self.query(text).get_points())

def get_list_continuous_queries(self):
"""Get the list of continuous queries in InfluxDB.

:return: all CQs in InfluxDB
:rtype: list of dictionaries

:Example:

::

>> cqs = client.get_list_cqs()
>> cqs
[
{
u'db1': []
},
{
u'db2': [
{
u'name': u'vampire',
u'query': u'CREATE CONTINUOUS QUERY vampire ON '
'mydb BEGIN SELECT count(dracula) INTO '
'mydb.autogen.all_of_them FROM '
'mydb.autogen.one GROUP BY time(5m) END'
}
]
}
]
"""
query_string = "SHOW CONTINUOUS QUERIES"
return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]

def create_continuous_query(self, name, select, database=None,
resample_opts=None):
r"""Create a continuous query for a database.

:param name: the name of continuous query to create
:type name: str
:param select: select statement for the continuous query
:type select: str
:param database: the database for which the continuous query is
created. Defaults to current client's database
:type database: str
:param resample_opts: resample options
:type resample_opts: str

:Example:

::

>> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
... 'FROM "cpu" GROUP BY time(1m)'
>> client.create_continuous_query(
... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
... )
>> client.get_list_continuous_queries()
[
{
'db_name': [
{
'name': 'cpu_mean',
'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
'ON "db_name" '
'RESAMPLE EVERY 10s FOR 2m '
'BEGIN SELECT mean("value") '
'INTO "cpu_mean" FROM "cpu" '
'GROUP BY time(1m) END'
}
]
}
]
"""
query_string = (
"CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
).format(quote_ident(name), quote_ident(database or self._database),
' RESAMPLE ' + resample_opts if resample_opts else '', select)
self.query(query_string)

def drop_continuous_query(self, name, database=None):
"""Drop an existing continuous query for a database.

:param name: the name of continuous query to drop
:type name: str
:param database: the database for which the continuous query is
dropped. Defaults to current client's database
:type database: str
"""
query_string = (
"DROP CONTINUOUS QUERY {0} ON {1}"
).format(quote_ident(name), quote_ident(database or self._database))
self.query(query_string)

def send_packet(self, packet, protocol='json', time_precision=None):
"""Send an UDP packet.

Expand Down
108 changes: 108 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,114 @@ def test_get_list_privileges_fails(self):
with _mocked_session(cli, 'get', 401):
cli.get_list_privileges('test')

def test_get_list_continuous_queries(self):
"""Test getting a list of continuous queries."""
data = {
"results": [
{
"statement_id": 0,
"series": [
{
"name": "testdb01",
"columns": ["name", "query"],
"values": [["testname01", "testquery01"],
["testname02", "testquery02"]]
},
{
"name": "testdb02",
"columns": ["name", "query"],
"values": [["testname03", "testquery03"]]
},
{
"name": "testdb03",
"columns": ["name", "query"]
}
]
}
]
}

with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
self.assertListEqual(
self.cli.get_list_continuous_queries(),
[
{
'testdb01': [
{'name': 'testname01', 'query': 'testquery01'},
{'name': 'testname02', 'query': 'testquery02'}
]
},
{
'testdb02': [
{'name': 'testname03', 'query': 'testquery03'}
]
},
{
'testdb03': []
}
]
)

@raises(Exception)
def test_get_list_continuous_queries_fails(self):
"""Test failing to get a list of continuous queries."""
with _mocked_session(self.cli, 'get', 400):
self.cli.get_list_continuous_queries()

def test_create_continuous_query(self):
"""Test continuous query creation."""
data = {"results": [{}]}
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=json.dumps(data)
)
query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
'"events" GROUP BY time(10m)'
self.cli.create_continuous_query('cq_name', query, 'db_name')
self.assertEqual(
m.last_request.qs['q'][0],
'create continuous query "cq_name" on "db_name" begin select '
'count("value") into "6_months"."events" from "events" group '
'by time(10m) end'
)
self.cli.create_continuous_query('cq_name', query, 'db_name',
'EVERY 10s FOR 2m')
self.assertEqual(
m.last_request.qs['q'][0],
'create continuous query "cq_name" on "db_name" resample '
'every 10s for 2m begin select count("value") into '
'"6_months"."events" from "events" group by time(10m) end'
)

@raises(Exception)
def test_create_continuous_query_fails(self):
"""Test failing to create a continuous query."""
with _mocked_session(self.cli, 'get', 400):
self.cli.create_continuous_query('cq_name', 'select', 'db_name')

def test_drop_continuous_query(self):
"""Test dropping a continuous query."""
data = {"results": [{}]}
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=json.dumps(data)
)
self.cli.drop_continuous_query('cq_name', 'db_name')
self.assertEqual(
m.last_request.qs['q'][0],
'drop continuous query "cq_name" on "db_name"'
)

@raises(Exception)
def test_drop_continuous_query_fails(self):
"""Test failing to drop a continuous query."""
with _mocked_session(self.cli, 'get', 400):
self.cli.drop_continuous_query('cq_name', 'db_name')

def test_invalid_port_fails(self):
"""Test invalid port fail for TestInfluxDBClient object."""
with self.assertRaises(ValueError):
Expand Down
30 changes: 30 additions & 0 deletions influxdb/tests/server_tests/client_test_with_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,36 @@ def test_drop_retention_policy(self):
rsp
)

def test_create_continuous_query(self):
"""Test continuous query creation."""
self.cli.create_retention_policy('some_rp', '1d', 1)
query = 'select count("value") into "some_rp"."events" from ' \
'"events" group by time(10m)'
self.cli.create_continuous_query('test_cq', query, 'db')
cqs = self.cli.get_list_continuous_queries()
expected_cqs = [
{
'db': [
{
'name': 'test_cq',
'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
'BEGIN SELECT count(value) INTO '
'db.some_rp.events FROM db.autogen.events '
'GROUP BY time(10m) END'
}
]
}
]
self.assertEqual(cqs, expected_cqs)

def test_drop_continuous_query(self):
"""Test continuous query drop."""
self.test_create_continuous_query()
self.cli.drop_continuous_query('test_cq', 'db')
cqs = self.cli.get_list_continuous_queries()
expected_cqs = [{'db': []}]
self.assertEqual(cqs, expected_cqs)

def test_issue_143(self):
"""Test for PR#143 from repo."""
pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z')
Expand Down