From de75e7351ce3fc084c145d16df38f96d18603bf5 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Thu, 28 May 2020 10:41:50 +0200 Subject: [PATCH 1/9] Add support for custom indexes for query in the DataFrameClient (#785) --- CHANGELOG.md | 1 + influxdb/_dataframe_client.py | 22 +++++++++++------ influxdb/tests/dataframe_client_test.py | 33 +++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92bbe42e..2b374faa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom) +- Add support for custom indexes for query in the DataFrameClient (#785) ### Changed - Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index ec58cebb..58063500 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -8,6 +8,7 @@ import math from collections import defaultdict +from typing import List import pandas as pd import numpy as np @@ -152,7 +153,8 @@ def query(self, chunked=False, chunk_size=0, method="GET", - dropna=True): + dropna=True, + data_frame_index: List[str] = None): """ Query data into a DataFrame. @@ -181,6 +183,7 @@ def query(self, containing all results within that chunk :param chunk_size: Size of each chunk to tell InfluxDB to use. :param dropna: drop columns where all values are missing + :param data_frame_index: the list of columns that are used as DataFrame index :returns: the queried data :rtype: :class:`~.ResultSet` """ @@ -196,13 +199,13 @@ def query(self, results = super(DataFrameClient, self).query(query, **query_args) if query.strip().upper().startswith("SELECT"): if len(results) > 0: - return self._to_dataframe(results, dropna) + return self._to_dataframe(results, dropna, data_frame_index=data_frame_index) else: return {} else: return results - def _to_dataframe(self, rs, dropna=True): + def _to_dataframe(self, rs, dropna=True, data_frame_index: List[str] = None): result = defaultdict(list) if isinstance(rs, list): return map(self._to_dataframe, rs, @@ -216,10 +219,15 @@ def _to_dataframe(self, rs, dropna=True): key = (name, tuple(sorted(tags.items()))) df = pd.DataFrame(data) df.time = pd.to_datetime(df.time) - df.set_index('time', inplace=True) - if df.index.tzinfo is None: - df.index = df.index.tz_localize('UTC') - df.index.name = None + + if data_frame_index: + df.set_index(data_frame_index, inplace=True) + else: + df.set_index('time', inplace=True) + if df.index.tzinfo is None: + df.index = df.index.tz_localize('UTC') + df.index.name = 'time' + result[key].append(df) for key, data in result.items(): df = pd.concat(data).sort_index() diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index a80498f3..cf82b49c 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -1240,3 +1240,36 @@ def test_write_points_from_dataframe_with_tags_and_nan_json(self): cli.write_points(dataframe, 'foo', tags=None, protocol='json', tag_columns=['tag_one', 'tag_two']) self.assertEqual(m.last_request.body, expected) + + def test_query_custom_index(self): + data = { + "results": [ + { + "series": [ + { + "name": "cpu_load_short", + "columns": ["time", "value", "host"], + "values": [ + [1, 0.55, "local"], + [2, 23422, "local"], + [3, 0.64, "local"] + ] + } + ] + } + ] + } + + cli = DataFrameClient('host', 8086, 'username', 'password', 'db') + iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \ + "SELECT count(value) FROM cpu_load_short WHERE region=$region" + bind_params = {'region': 'us-west'} + with _mocked_session(cli, 'GET', 200, data): + result = cli.query(iql, bind_params=bind_params, data_frame_index=["time", "host"]) + + _data_frame = result['cpu_load_short'] + print(_data_frame) + + self.assertListEqual(["time", "host"], list(_data_frame.index.names)) + + From d3fd851c8e99350524fad710c753a0d978a5f978 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 1 Jun 2020 08:26:39 +0200 Subject: [PATCH 2/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/_dataframe_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 58063500..0b9f282d 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -226,7 +226,7 @@ def _to_dataframe(self, rs, dropna=True, data_frame_index: List[str] = None): df.set_index('time', inplace=True) if df.index.tzinfo is None: df.index = df.index.tz_localize('UTC') - df.index.name = 'time' + df.index.name = None result[key].append(df) for key, data in result.items(): From 9a110a1abda2677340bc7fbd00890d27b8d3d5ab Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 1 Jun 2020 09:57:12 +0200 Subject: [PATCH 3/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/_dataframe_client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 0b9f282d..afd75b39 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -8,7 +8,6 @@ import math from collections import defaultdict -from typing import List import pandas as pd import numpy as np @@ -154,7 +153,7 @@ def query(self, chunk_size=0, method="GET", dropna=True, - data_frame_index: List[str] = None): + data_frame_index=None): """ Query data into a DataFrame. @@ -205,7 +204,7 @@ def query(self, else: return results - def _to_dataframe(self, rs, dropna=True, data_frame_index: List[str] = None): + def _to_dataframe(self, rs, dropna=True, data_frame_index=None): result = defaultdict(list) if isinstance(rs, list): return map(self._to_dataframe, rs, From 055d71fb83aeb83603bd9b5423d6b05d1f01c59e Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 1 Jun 2020 10:37:56 +0200 Subject: [PATCH 4/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/tests/dataframe_client_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index cf82b49c..2dd98398 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -1242,6 +1242,7 @@ def test_write_points_from_dataframe_with_tags_and_nan_json(self): self.assertEqual(m.last_request.body, expected) def test_query_custom_index(self): + """Test query with custom indexes.""" data = { "results": [ { From ddd82612f57da90476cd0d10539d7edf1ca25d49 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 1 Jun 2020 10:40:39 +0200 Subject: [PATCH 5/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/tests/dataframe_client_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 2dd98398..f6db3c22 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -1266,11 +1266,11 @@ def test_query_custom_index(self): "SELECT count(value) FROM cpu_load_short WHERE region=$region" bind_params = {'region': 'us-west'} with _mocked_session(cli, 'GET', 200, data): - result = cli.query(iql, bind_params=bind_params, data_frame_index=["time", "host"]) + result = cli.query(iql, bind_params=bind_params, + data_frame_index=["time", "host"]) _data_frame = result['cpu_load_short'] print(_data_frame) - self.assertListEqual(["time", "host"], list(_data_frame.index.names)) - - + self.assertListEqual(["time", "host"], + list(_data_frame.index.names)) From 64aeddd82358f68b1e5b662d80bfaee6485d0de7 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 1 Jun 2020 11:53:15 +0200 Subject: [PATCH 6/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/_dataframe_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index afd75b39..e7ae9c17 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -182,7 +182,8 @@ def query(self, containing all results within that chunk :param chunk_size: Size of each chunk to tell InfluxDB to use. :param dropna: drop columns where all values are missing - :param data_frame_index: the list of columns that are used as DataFrame index + :param data_frame_index: the list of columns that + are used as DataFrame index :returns: the queried data :rtype: :class:`~.ResultSet` """ @@ -198,7 +199,8 @@ def query(self, results = super(DataFrameClient, self).query(query, **query_args) if query.strip().upper().startswith("SELECT"): if len(results) > 0: - return self._to_dataframe(results, dropna, data_frame_index=data_frame_index) + return self._to_dataframe(results, dropna, + data_frame_index=data_frame_index) else: return {} else: From 6c45f30d6a7142c17cca0ca7736eb1b693f212e8 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 1 Jun 2020 13:53:30 +0200 Subject: [PATCH 7/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/influxdb08/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/influxdb/influxdb08/client.py b/influxdb/influxdb08/client.py index 965a91db..40c58145 100644 --- a/influxdb/influxdb08/client.py +++ b/influxdb/influxdb08/client.py @@ -292,10 +292,10 @@ def write_points(self, data, time_precision='s', *args, **kwargs): :type batch_size: int """ - def list_chunks(l, n): + def list_chunks(data_list, n): """Yield successive n-sized chunks from l.""" - for i in xrange(0, len(l), n): - yield l[i:i + n] + for i in xrange(0, len(data_list), n): + yield data_list[i:i + n] batch_size = kwargs.get('batch_size') if batch_size and batch_size > 0: From 49165bedaab5fad158f742fdf16020b44c92decf Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Tue, 2 Jun 2020 14:42:29 +0200 Subject: [PATCH 8/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/client.py | 4 ++-- influxdb/helper.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 404e14be..df9ef966 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -860,7 +860,7 @@ def alter_retention_policy(self, name, database=None, query_string = ( "ALTER RETENTION POLICY {0} ON {1}" ).format(quote_ident(name), - quote_ident(database or self._database), shard_duration) + quote_ident(database or self._database)) if duration: query_string += " DURATION {0}".format(duration) if shard_duration: @@ -958,7 +958,7 @@ def drop_user(self, username): :param username: the username to drop :type username: str """ - text = "DROP USER {0}".format(quote_ident(username), method="POST") + text = "DROP USER {0}".format(quote_ident(username)) self.query(text, method="POST") def set_user_password(self, username, password): diff --git a/influxdb/helper.py b/influxdb/helper.py index 74209354..fbf6b65d 100644 --- a/influxdb/helper.py +++ b/influxdb/helper.py @@ -82,7 +82,7 @@ def __new__(cls, *args, **kwargs): allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None] if cls._time_precision not in allowed_time_precisions: raise AttributeError( - 'In {0}, time_precision is set, but invalid use any of {}.' + 'In {0}, time_precision is set, but invalid use any of {1}.' .format(cls.__name__, ','.join(allowed_time_precisions))) cls._retention_policy = getattr(_meta, 'retention_policy', None) From cb3156c1fa9181f142f09e93afccf10355b4e5a1 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Tue, 2 Jun 2020 15:11:33 +0200 Subject: [PATCH 9/9] Add support for custom indexes for query in the DataFrameClient (#785) --- influxdb/helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/helper.py b/influxdb/helper.py index fbf6b65d..138cf6e8 100644 --- a/influxdb/helper.py +++ b/influxdb/helper.py @@ -82,7 +82,7 @@ def __new__(cls, *args, **kwargs): allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None] if cls._time_precision not in allowed_time_precisions: raise AttributeError( - 'In {0}, time_precision is set, but invalid use any of {1}.' + 'In {}, time_precision is set, but invalid use any of {}.' .format(cls.__name__, ','.join(allowed_time_precisions))) cls._retention_policy = getattr(_meta, 'retention_policy', None)