From f50cdbc510e84cc19fac858411f039dbef47090c Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Sun, 26 Nov 2017 01:56:07 +0100 Subject: [PATCH 1/4] trying to fix #531 by returning an actual generator when using query() in chunked mode --- influxdb/client.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 02128462..3ee14dbe 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -201,7 +201,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200, headers=None): + expected_response_code=200, headers=None, chunked=False): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -249,7 +249,8 @@ def request(self, url, method='GET', params=None, data=None, headers=headers, proxies=self._proxies, verify=self._verify_ssl, - timeout=self._timeout + timeout=self._timeout, + stream=chunked ) break except (requests.exceptions.ConnectionError, @@ -315,17 +316,17 @@ def write(self, data, params=None, expected_response_code=204, @staticmethod def _read_chunked_response(response, raise_errors=True): - result_set = {} for line in response.iter_lines(): if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) + result_set = {} for result in data.get('results', []): for _key in result: if isinstance(result[_key], list): result_set.setdefault( _key, []).extend(result[_key]) - return ResultSet(result_set, raise_errors=raise_errors) + yield(ResultSet(result_set, raise_errors=raise_errors)) def query(self, query, @@ -391,7 +392,8 @@ def query(self, method='GET', params=params, data=None, - expected_response_code=expected_response_code + expected_response_code=expected_response_code, + chunked=chunked ) if chunked: From 15473eb61e40761d6b4f038c45efd63df5388e70 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Sun, 26 Nov 2017 17:53:21 +0100 Subject: [PATCH 2/4] now made streaming optional for chunked queries --- influxdb/client.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 3ee14dbe..7c733a53 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -201,7 +201,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200, headers=None, chunked=False): + expected_response_code=200, headers=None, stream=True): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -250,7 +250,7 @@ def request(self, url, method='GET', params=None, data=None, proxies=self._proxies, verify=self._verify_ssl, timeout=self._timeout, - stream=chunked + stream=stream ) break except (requests.exceptions.ConnectionError, @@ -316,17 +316,33 @@ def write(self, data, params=None, expected_response_code=204, @staticmethod def _read_chunked_response(response, raise_errors=True): + result_set = {} for line in response.iter_lines(): if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) + for result in data.get('results', []): + for _key in result: + if isinstance(result[_key], list): + result_set.setdefault( + _key, []).extend(result[_key]) + + return(ResultSet(result_set, raise_errors=raise_errors)) + + @staticmethod + def _read_chunked_response_generator(response, raise_errors=True): + for line in response.iter_lines(): result_set = {} + if isinstance(line, bytes): + line = line.decode('utf-8') + data = json.loads(line) for result in data.get('results', []): for _key in result: if isinstance(result[_key], list): result_set.setdefault( _key, []).extend(result[_key]) yield(ResultSet(result_set, raise_errors=raise_errors)) + result_set = {} def query(self, query, @@ -336,7 +352,8 @@ def query(self, database=None, raise_errors=True, chunked=False, - chunk_size=0): + chunk_size=0, + stream=False): """Send a query to InfluxDB. :param query: the actual query string @@ -393,11 +410,15 @@ def query(self, params=params, data=None, expected_response_code=expected_response_code, - chunked=chunked + stream=stream ) if chunked: - return self._read_chunked_response(response) + if stream: + return self._read_chunked_response_generator(response,raise_errors) + else: + return self._read_chunked_response(response,raise_errors) + data = response.json() From d9343a120ba7398050e11714518730c9b397fc65 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Sun, 26 Nov 2017 18:21:25 +0100 Subject: [PATCH 3/4] fixed error in existing documentation for chunked mode. (it always combines the results in ONE ResultSet, instead of many) Added documentation for stream-mode. Fixed coding style issues --- influxdb/client.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 7c733a53..e91644e9 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -326,8 +326,7 @@ def _read_chunked_response(response, raise_errors=True): if isinstance(result[_key], list): result_set.setdefault( _key, []).extend(result[_key]) - - return(ResultSet(result_set, raise_errors=raise_errors)) + return ResultSet(result_set, raise_errors=raise_errors) @staticmethod def _read_chunked_response_generator(response, raise_errors=True): @@ -380,13 +379,19 @@ def query(self, :type raise_errors: bool :param chunked: Enable to use chunked responses from InfluxDB. - With ``chunked`` enabled, one ResultSet is returned per chunk - containing all results within that chunk + Normally all chunks are automaticly combined into one huge + ResultSet, unless you use ``stream``. :type chunked: bool :param chunk_size: Size of each chunk to tell InfluxDB to use. :type chunk_size: int + :param stream: Will stream the data and return a generator that + generates one ResultSet per chunk containing. + This allows for huge datasets with virtually no limit. + + :type stream: bool + :returns: the queried data :rtype: :class:`~.ResultSet` """ @@ -415,10 +420,11 @@ def query(self, if chunked: if stream: - return self._read_chunked_response_generator(response,raise_errors) + return self._read_chunked_response_generator( + response, raise_errors + ) else: - return self._read_chunked_response(response,raise_errors) - + return self._read_chunked_response(response, raise_errors) data = response.json() From fd27d0d2a07625a6857511e73e989fb869ad4e03 Mon Sep 17 00:00:00 2001 From: Edwin Eefting Date: Sun, 26 Nov 2017 18:31:12 +0100 Subject: [PATCH 4/4] typo --- influxdb/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/client.py b/influxdb/client.py index e91644e9..8376e354 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -387,7 +387,7 @@ def query(self, :type chunk_size: int :param stream: Will stream the data and return a generator that - generates one ResultSet per chunk containing. + generates one ResultSet per chunk. This allows for huge datasets with virtually no limit. :type stream: bool