Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit a6bc05d

Browse files
committed
Fix chunked query to return chunk resultsets
When querying large data sets, it's vital to get a chunked responses to manage memory usage. Wrapping the query response in a generator and streaming the request provides the desired result. It also fixes `InfluxDBClient.query()` behavior for chunked queries that is currently not working according to [specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L410) close #585 close #531
1 parent 6baf7ee commit a6bc05d

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

influxdb/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def switch_user(self, username, password):
231231
self._username = username
232232
self._password = password
233233

234-
def request(self, url, method='GET', params=None, data=None,
234+
def request(self, url, method='GET', params=None, data=None, stream=False,
235235
expected_response_code=200, headers=None):
236236
"""Make a HTTP request to the InfluxDB API.
237237
@@ -243,6 +243,8 @@ def request(self, url, method='GET', params=None, data=None,
243243
:type params: dict
244244
:param data: the data of the request, defaults to None
245245
:type data: str
246+
:param stream: True if a query uses chunked responses
247+
:type stream: bool
246248
:param expected_response_code: the expected response code of
247249
the request, defaults to 200
248250
:type expected_response_code: int
@@ -277,6 +279,7 @@ def request(self, url, method='GET', params=None, data=None,
277279
auth=(self._username, self._password),
278280
params=params,
279281
data=data,
282+
stream=stream,
280283
headers=headers,
281284
proxies=self._proxies,
282285
verify=self._verify_ssl,
@@ -346,17 +349,17 @@ def write(self, data, params=None, expected_response_code=204,
346349

347350
@staticmethod
348351
def _read_chunked_response(response, raise_errors=True):
349-
result_set = {}
350352
for line in response.iter_lines():
351353
if isinstance(line, bytes):
352354
line = line.decode('utf-8')
353355
data = json.loads(line)
356+
result_set = {}
354357
for result in data.get('results', []):
355358
for _key in result:
356359
if isinstance(result[_key], list):
357360
result_set.setdefault(
358361
_key, []).extend(result[_key])
359-
return ResultSet(result_set, raise_errors=raise_errors)
362+
yield ResultSet(result_set, raise_errors=raise_errors)
360363

361364
def query(self,
362365
query,
@@ -447,6 +450,7 @@ def query(self,
447450
method=method,
448451
params=params,
449452
data=None,
453+
stream=chunked,
450454
expected_response_code=expected_response_code
451455
)
452456

influxdb/tests/client_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,8 +1239,8 @@ def test_chunked_response(self):
12391239
)
12401240
response = self.cli.query('show series limit 4 offset 0',
12411241
chunked=True, chunk_size=4)
1242-
self.assertTrue(len(response) == 4)
1243-
self.assertEqual(response.__repr__(), ResultSet(
1242+
self.assertTrue(len(list(response)) == 4)
1243+
self.assertEqual(list(response.__repr__()), ResultSet(
12441244
{'series': [{'values': [['value', 'integer']],
12451245
'name': 'cpu',
12461246
'columns': ['fieldKey', 'fieldType']},

0 commit comments

Comments
 (0)