diff --git a/influxdb/client.py b/influxdb/client.py index 62d5a025..9b5be97b 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -216,7 +216,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200, headers=None): + expected_response_code=200, headers=None, stream=True): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -264,7 +264,8 @@ def request(self, url, method='GET', params=None, data=None, headers=headers, proxies=self._proxies, verify=self._verify_ssl, - timeout=self._timeout + timeout=self._timeout, + stream=stream ) break except (requests.exceptions.ConnectionError, @@ -342,6 +343,21 @@ def _read_chunked_response(response, raise_errors=True): _key, []).extend(result[_key]) return ResultSet(result_set, raise_errors=raise_errors) + @staticmethod + def _read_chunked_response_generator(response, raise_errors=True): + for line in response.iter_lines(): + result_set = {} + if isinstance(line, bytes): + line = line.decode('utf-8') + data = json.loads(line) + for result in data.get('results', []): + for _key in result: + if isinstance(result[_key], list): + result_set.setdefault( + _key, []).extend(result[_key]) + yield(ResultSet(result_set, raise_errors=raise_errors)) + result_set = {} + def query(self, query, params=None, @@ -350,7 +366,8 @@ def query(self, database=None, raise_errors=True, chunked=False, - chunk_size=0): + chunk_size=0, + stream=False): """Send a query to InfluxDB. :param query: the actual query string @@ -377,13 +394,19 @@ def query(self, :type raise_errors: bool :param chunked: Enable to use chunked responses from InfluxDB. - With ``chunked`` enabled, one ResultSet is returned per chunk - containing all results within that chunk + Normally all chunks are automaticly combined into one huge + ResultSet, unless you use ``stream``. :type chunked: bool :param chunk_size: Size of each chunk to tell InfluxDB to use. :type chunk_size: int + :param stream: Will stream the data and return a generator that + generates one ResultSet per chunk. + This allows for huge datasets with virtually no limit. + + :type stream: bool + :returns: the queried data :rtype: :class:`~.ResultSet` """ @@ -406,11 +429,17 @@ def query(self, method='GET', params=params, data=None, - expected_response_code=expected_response_code + expected_response_code=expected_response_code, + stream=stream ) if chunked: - return self._read_chunked_response(response) + if stream: + return self._read_chunked_response_generator( + response, raise_errors + ) + else: + return self._read_chunked_response(response, raise_errors) data = response.json()