Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Support streamed chunking (fixing #531 and others) #538

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def switch_user(self, username, password):
self._password = password

def request(self, url, method='GET', params=None, data=None,
expected_response_code=200, headers=None):
expected_response_code=200, headers=None, stream=True):
"""Make a HTTP request to the InfluxDB API.

:param url: the path of the HTTP request, e.g. write, query, etc.
Expand Down Expand Up @@ -264,7 +264,8 @@ def request(self, url, method='GET', params=None, data=None,
headers=headers,
proxies=self._proxies,
verify=self._verify_ssl,
timeout=self._timeout
timeout=self._timeout,
stream=stream
)
break
except (requests.exceptions.ConnectionError,
Expand Down Expand Up @@ -342,6 +343,21 @@ def _read_chunked_response(response, raise_errors=True):
_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)

@staticmethod
def _read_chunked_response_generator(response, raise_errors=True):
for line in response.iter_lines():
result_set = {}
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
for result in data.get('results', []):
for _key in result:
if isinstance(result[_key], list):
result_set.setdefault(
_key, []).extend(result[_key])
yield(ResultSet(result_set, raise_errors=raise_errors))
result_set = {}

def query(self,
query,
params=None,
Expand All @@ -350,7 +366,8 @@ def query(self,
database=None,
raise_errors=True,
chunked=False,
chunk_size=0):
chunk_size=0,
stream=False):
"""Send a query to InfluxDB.

:param query: the actual query string
Expand All @@ -377,13 +394,19 @@ def query(self,
:type raise_errors: bool

:param chunked: Enable to use chunked responses from InfluxDB.
With ``chunked`` enabled, one ResultSet is returned per chunk
containing all results within that chunk
Normally all chunks are automaticly combined into one huge
ResultSet, unless you use ``stream``.
:type chunked: bool

:param chunk_size: Size of each chunk to tell InfluxDB to use.
:type chunk_size: int

:param stream: Will stream the data and return a generator that
generates one ResultSet per chunk.
This allows for huge datasets with virtually no limit.

:type stream: bool

:returns: the queried data
:rtype: :class:`~.ResultSet`
"""
Expand All @@ -406,11 +429,17 @@ def query(self,
method='GET',
params=params,
data=None,
expected_response_code=expected_response_code
expected_response_code=expected_response_code,
stream=stream
)

if chunked:
return self._read_chunked_response(response)
if stream:
return self._read_chunked_response_generator(
response, raise_errors
)
else:
return self._read_chunked_response(response, raise_errors)

data = response.json()

Expand Down