diff --git a/influxdb/client.py b/influxdb/client.py index ad4c6b66..4c873cbc 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -231,7 +231,7 @@ def switch_user(self, username, password): self._username = username self._password = password - def request(self, url, method='GET', params=None, data=None, + def request(self, url, method='GET', params=None, data=None, stream=False, expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. @@ -243,6 +243,8 @@ def request(self, url, method='GET', params=None, data=None, :type params: dict :param data: the data of the request, defaults to None :type data: str + :param stream: True if a query uses chunked responses + :type stream: bool :param expected_response_code: the expected response code of the request, defaults to 200 :type expected_response_code: int @@ -277,6 +279,7 @@ def request(self, url, method='GET', params=None, data=None, auth=(self._username, self._password), params=params, data=data, + stream=stream, headers=headers, proxies=self._proxies, verify=self._verify_ssl, @@ -346,17 +349,17 @@ def write(self, data, params=None, expected_response_code=204, @staticmethod def _read_chunked_response(response, raise_errors=True): - result_set = {} for line in response.iter_lines(): if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) + result_set = {} for result in data.get('results', []): for _key in result: if isinstance(result[_key], list): result_set.setdefault( _key, []).extend(result[_key]) - return ResultSet(result_set, raise_errors=raise_errors) + yield ResultSet(result_set, raise_errors=raise_errors) def query(self, query, @@ -447,6 +450,7 @@ def query(self, method=method, params=params, data=None, + stream=chunked, expected_response_code=expected_response_code ) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 571b7ebc..0011293a 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1220,16 +1220,11 @@ def test_invalid_port_fails(self): def test_chunked_response(self): """Test chunked reponse for TestInfluxDBClient object.""" example_response = \ - u'{"results":[{"statement_id":0,"series":' \ - '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"iops","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ - '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ - '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"memory","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' + u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \ + '"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \ + 'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \ + '"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \ + '["df"],["mount"]]}]}]}\n' with requests_mock.Mocker() as m: m.register_uri( @@ -1237,23 +1232,20 @@ def test_chunked_response(self): "http://localhost:8086/query", text=example_response ) - response = self.cli.query('show series limit 4 offset 0', + response = self.cli.query('show series', chunked=True, chunk_size=4) - self.assertTrue(len(response) == 4) - self.assertEqual(response.__repr__(), ResultSet( - {'series': [{'values': [['value', 'integer']], - 'name': 'cpu', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'iops', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'load', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'memory', - 'columns': ['fieldKey', 'fieldType']}]} - ).__repr__()) + res = list(response) + self.assertTrue(len(res) == 2) + self.assertEqual(res[0].__repr__(), ResultSet( + {'series': [{ + 'columns': ['key'], + 'values': [['cpu'], ['memory'], ['iops'], ['network']] + }]}).__repr__()) + self.assertEqual(res[1].__repr__(), ResultSet( + {'series': [{ + 'columns': ['key'], + 'values': [['qps'], ['uptime'], ['df'], ['mount']] + }]}).__repr__()) class FakeClient(InfluxDBClient):