Skip to content

Commit c903d73

Browse files
authored
Fix chunked query to return chunk resultsets (influxdata#753)
When querying large data sets, it's vital to get a chunked responses to manage memory usage. Wrapping the query response in a generator and streaming the request provides the desired result. It also fixes `InfluxDBClient.query()` behavior for chunked queries that is currently not working according to [specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429) Closes influxdata#585. Closes influxdata#531. Closes influxdata#538.
1 parent d6192a7 commit c903d73

File tree

2 files changed

+25
-29
lines changed

2 files changed

+25
-29
lines changed

influxdb/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def switch_user(self, username, password):
249249
self._username = username
250250
self._password = password
251251

252-
def request(self, url, method='GET', params=None, data=None,
252+
def request(self, url, method='GET', params=None, data=None, stream=False,
253253
expected_response_code=200, headers=None):
254254
"""Make a HTTP request to the InfluxDB API.
255255
@@ -261,6 +261,8 @@ def request(self, url, method='GET', params=None, data=None,
261261
:type params: dict
262262
:param data: the data of the request, defaults to None
263263
:type data: str
264+
:param stream: True if a query uses chunked responses
265+
:type stream: bool
264266
:param expected_response_code: the expected response code of
265267
the request, defaults to 200
266268
:type expected_response_code: int
@@ -312,6 +314,7 @@ def request(self, url, method='GET', params=None, data=None,
312314
auth=(self._username, self._password),
313315
params=params,
314316
data=data,
317+
stream=stream,
315318
headers=headers,
316319
proxies=self._proxies,
317320
verify=self._verify_ssl,
@@ -398,17 +401,17 @@ def write(self, data, params=None, expected_response_code=204,
398401

399402
@staticmethod
400403
def _read_chunked_response(response, raise_errors=True):
401-
result_set = {}
402404
for line in response.iter_lines():
403405
if isinstance(line, bytes):
404406
line = line.decode('utf-8')
405407
data = json.loads(line)
408+
result_set = {}
406409
for result in data.get('results', []):
407410
for _key in result:
408411
if isinstance(result[_key], list):
409412
result_set.setdefault(
410413
_key, []).extend(result[_key])
411-
return ResultSet(result_set, raise_errors=raise_errors)
414+
yield ResultSet(result_set, raise_errors=raise_errors)
412415

413416
def query(self,
414417
query,
@@ -499,6 +502,7 @@ def query(self,
499502
method=method,
500503
params=params,
501504
data=None,
505+
stream=chunked,
502506
expected_response_code=expected_response_code
503507
)
504508

influxdb/tests/client_test.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,40 +1400,32 @@ def test_invalid_port_fails(self):
14001400
def test_chunked_response(self):
14011401
"""Test chunked reponse for TestInfluxDBClient object."""
14021402
example_response = \
1403-
u'{"results":[{"statement_id":0,"series":' \
1404-
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
1405-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1406-
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
1407-
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
1408-
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
1409-
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
1410-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1411-
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
1412-
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
1403+
u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
1404+
'"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
1405+
'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
1406+
'"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
1407+
'["df"],["mount"]]}]}]}\n'
14131408

14141409
with requests_mock.Mocker() as m:
14151410
m.register_uri(
14161411
requests_mock.GET,
14171412
"http://localhost:8086/query",
14181413
text=example_response
14191414
)
1420-
response = self.cli.query('show series limit 4 offset 0',
1415+
response = self.cli.query('show series',
14211416
chunked=True, chunk_size=4)
1422-
self.assertTrue(len(response) == 4)
1423-
self.assertEqual(response.__repr__(), ResultSet(
1424-
{'series': [{'values': [['value', 'integer']],
1425-
'name': 'cpu',
1426-
'columns': ['fieldKey', 'fieldType']},
1427-
{'values': [['value', 'integer']],
1428-
'name': 'iops',
1429-
'columns': ['fieldKey', 'fieldType']},
1430-
{'values': [['value', 'integer']],
1431-
'name': 'load',
1432-
'columns': ['fieldKey', 'fieldType']},
1433-
{'values': [['value', 'integer']],
1434-
'name': 'memory',
1435-
'columns': ['fieldKey', 'fieldType']}]}
1436-
).__repr__())
1417+
res = list(response)
1418+
self.assertTrue(len(res) == 2)
1419+
self.assertEqual(res[0].__repr__(), ResultSet(
1420+
{'series': [{
1421+
'columns': ['key'],
1422+
'values': [['cpu'], ['memory'], ['iops'], ['network']]
1423+
}]}).__repr__())
1424+
self.assertEqual(res[1].__repr__(), ResultSet(
1425+
{'series': [{
1426+
'columns': ['key'],
1427+
'values': [['qps'], ['uptime'], ['df'], ['mount']]
1428+
}]}).__repr__())
14371429

14381430

14391431
class FakeClient(InfluxDBClient):

0 commit comments

Comments
 (0)