From 67301be062e26777255fed051522da131d64f751 Mon Sep 17 00:00:00 2001
From: Stefan 'hr' Berder
Date: Sun, 8 Sep 2019 17:38:13 +0800
Subject: [PATCH] Fix chunked query to return chunk resultsets
When querying large data sets, it's vital to get a chunked responses to
manage memory usage. Wrapping the query response in a generator and
streaming the request provides the desired result.
It also fixes `InfluxDBClient.query()` behavior for chunked queries that
is currently not working according to
[specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429)
close #585
close #531
---
influxdb/client.py | 10 +++++---
influxdb/tests/client_test.py | 44 ++++++++++++++---------------------
2 files changed, 25 insertions(+), 29 deletions(-)
diff --git a/influxdb/client.py b/influxdb/client.py
index ad4c6b66..4c873cbc 100644
--- a/influxdb/client.py
+++ b/influxdb/client.py
@@ -231,7 +231,7 @@ def switch_user(self, username, password):
self._username = username
self._password = password
- def request(self, url, method='GET', params=None, data=None,
+ def request(self, url, method='GET', params=None, data=None, stream=False,
expected_response_code=200, headers=None):
"""Make a HTTP request to the InfluxDB API.
@@ -243,6 +243,8 @@ def request(self, url, method='GET', params=None, data=None,
:type params: dict
:param data: the data of the request, defaults to None
:type data: str
+ :param stream: True if a query uses chunked responses
+ :type stream: bool
:param expected_response_code: the expected response code of
the request, defaults to 200
:type expected_response_code: int
@@ -277,6 +279,7 @@ def request(self, url, method='GET', params=None, data=None,
auth=(self._username, self._password),
params=params,
data=data,
+ stream=stream,
headers=headers,
proxies=self._proxies,
verify=self._verify_ssl,
@@ -346,17 +349,17 @@ def write(self, data, params=None, expected_response_code=204,
@staticmethod
def _read_chunked_response(response, raise_errors=True):
- result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
+ result_set = {}
for result in data.get('results', []):
for _key in result:
if isinstance(result[_key], list):
result_set.setdefault(
_key, []).extend(result[_key])
- return ResultSet(result_set, raise_errors=raise_errors)
+ yield ResultSet(result_set, raise_errors=raise_errors)
def query(self,
query,
@@ -447,6 +450,7 @@ def query(self,
method=method,
params=params,
data=None,
+ stream=chunked,
expected_response_code=expected_response_code
)
diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py
index 571b7ebc..0011293a 100644
--- a/influxdb/tests/client_test.py
+++ b/influxdb/tests/client_test.py
@@ -1220,16 +1220,11 @@ def test_invalid_port_fails(self):
def test_chunked_response(self):
"""Test chunked reponse for TestInfluxDBClient object."""
example_response = \
- u'{"results":[{"statement_id":0,"series":' \
- '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
- '[["value","integer"]]}],"partial":true}]}\n{"results":' \
- '[{"statement_id":0,"series":[{"name":"iops","columns":' \
- '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
- '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
- '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
- '[["value","integer"]]}],"partial":true}]}\n{"results":' \
- '[{"statement_id":0,"series":[{"name":"memory","columns":' \
- '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
+ u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
+ '"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
+ 'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
+ '"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
+ '["df"],["mount"]]}]}]}\n'
with requests_mock.Mocker() as m:
m.register_uri(
@@ -1237,23 +1232,20 @@ def test_chunked_response(self):
"http://localhost:8086/query",
text=example_response
)
- response = self.cli.query('show series limit 4 offset 0',
+ response = self.cli.query('show series',
chunked=True, chunk_size=4)
- self.assertTrue(len(response) == 4)
- self.assertEqual(response.__repr__(), ResultSet(
- {'series': [{'values': [['value', 'integer']],
- 'name': 'cpu',
- 'columns': ['fieldKey', 'fieldType']},
- {'values': [['value', 'integer']],
- 'name': 'iops',
- 'columns': ['fieldKey', 'fieldType']},
- {'values': [['value', 'integer']],
- 'name': 'load',
- 'columns': ['fieldKey', 'fieldType']},
- {'values': [['value', 'integer']],
- 'name': 'memory',
- 'columns': ['fieldKey', 'fieldType']}]}
- ).__repr__())
+ res = list(response)
+ self.assertTrue(len(res) == 2)
+ self.assertEqual(res[0].__repr__(), ResultSet(
+ {'series': [{
+ 'columns': ['key'],
+ 'values': [['cpu'], ['memory'], ['iops'], ['network']]
+ }]}).__repr__())
+ self.assertEqual(res[1].__repr__(), ResultSet(
+ {'series': [{
+ 'columns': ['key'],
+ 'values': [['qps'], ['uptime'], ['df'], ['mount']]
+ }]}).__repr__())
class FakeClient(InfluxDBClient):