Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit 3832bd3

Browse files
committed
Fix chunked query to return chunk resultsets
When querying large data sets, it's vital to get a chunked responses to manage memory usage. Wrapping the query response in a generator and streaming the request provides the desired result. It also fixes `InfluxDBClient.query()` behavior for chunked queries that is currently not working according to [specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429) close #585 close #531
1 parent 6baf7ee commit 3832bd3

File tree

2 files changed

+25
-29
lines changed

2 files changed

+25
-29
lines changed

influxdb/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def switch_user(self, username, password):
231231
self._username = username
232232
self._password = password
233233

234-
def request(self, url, method='GET', params=None, data=None,
234+
def request(self, url, method='GET', params=None, data=None, stream=False,
235235
expected_response_code=200, headers=None):
236236
"""Make a HTTP request to the InfluxDB API.
237237
@@ -243,6 +243,8 @@ def request(self, url, method='GET', params=None, data=None,
243243
:type params: dict
244244
:param data: the data of the request, defaults to None
245245
:type data: str
246+
:param stream: True if a query uses chunked responses
247+
:type stream: bool
246248
:param expected_response_code: the expected response code of
247249
the request, defaults to 200
248250
:type expected_response_code: int
@@ -277,6 +279,7 @@ def request(self, url, method='GET', params=None, data=None,
277279
auth=(self._username, self._password),
278280
params=params,
279281
data=data,
282+
stream=stream,
280283
headers=headers,
281284
proxies=self._proxies,
282285
verify=self._verify_ssl,
@@ -346,17 +349,17 @@ def write(self, data, params=None, expected_response_code=204,
346349

347350
@staticmethod
348351
def _read_chunked_response(response, raise_errors=True):
349-
result_set = {}
350352
for line in response.iter_lines():
351353
if isinstance(line, bytes):
352354
line = line.decode('utf-8')
353355
data = json.loads(line)
356+
result_set = {}
354357
for result in data.get('results', []):
355358
for _key in result:
356359
if isinstance(result[_key], list):
357360
result_set.setdefault(
358361
_key, []).extend(result[_key])
359-
return ResultSet(result_set, raise_errors=raise_errors)
362+
yield ResultSet(result_set, raise_errors=raise_errors)
360363

361364
def query(self,
362365
query,
@@ -447,6 +450,7 @@ def query(self,
447450
method=method,
448451
params=params,
449452
data=None,
453+
stream=chunked,
450454
expected_response_code=expected_response_code
451455
)
452456

influxdb/tests/client_test.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,40 +1220,32 @@ def test_invalid_port_fails(self):
12201220
def test_chunked_response(self):
12211221
"""Test chunked reponse for TestInfluxDBClient object."""
12221222
example_response = \
1223-
u'{"results":[{"statement_id":0,"series":' \
1224-
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
1225-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1226-
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
1227-
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
1228-
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
1229-
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
1230-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1231-
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
1232-
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
1223+
u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
1224+
'"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
1225+
'true}],"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
1226+
'[{"columns":["key"],"values":[["qps"],["uptime"],["df"],'\
1227+
'["mount"]]}]}]}\n'
12331228

12341229
with requests_mock.Mocker() as m:
12351230
m.register_uri(
12361231
requests_mock.GET,
12371232
"http://localhost:8086/query",
12381233
text=example_response
12391234
)
1240-
response = self.cli.query('show series limit 4 offset 0',
1235+
response = self.cli.query('show series',
12411236
chunked=True, chunk_size=4)
1242-
self.assertTrue(len(response) == 4)
1243-
self.assertEqual(response.__repr__(), ResultSet(
1244-
{'series': [{'values': [['value', 'integer']],
1245-
'name': 'cpu',
1246-
'columns': ['fieldKey', 'fieldType']},
1247-
{'values': [['value', 'integer']],
1248-
'name': 'iops',
1249-
'columns': ['fieldKey', 'fieldType']},
1250-
{'values': [['value', 'integer']],
1251-
'name': 'load',
1252-
'columns': ['fieldKey', 'fieldType']},
1253-
{'values': [['value', 'integer']],
1254-
'name': 'memory',
1255-
'columns': ['fieldKey', 'fieldType']}]}
1256-
).__repr__())
1237+
res = list(response)
1238+
self.assertTrue(len(res) == 2)
1239+
self.assertEqual(res[0].__repr__(), ResultSet(
1240+
{'series': [{
1241+
'columns': ['key'],
1242+
'values': [['cpu'], ['memory'], ['iops'], ['network']]
1243+
}]}).__repr__())
1244+
self.assertEqual(res[1].__repr__(), ResultSet(
1245+
{'series': [{
1246+
'columns': ['key'],
1247+
'values': [['qps'], ['uptime'], ['df'], ['mount']]
1248+
}]}).__repr__())
12571249

12581250

12591251
class FakeClient(InfluxDBClient):

0 commit comments

Comments
 (0)