Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Fix chunked query to return chunk resultsets #753

Merged
merged 1 commit into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def switch_user(self, username, password):
self._username = username
self._password = password

def request(self, url, method='GET', params=None, data=None,
def request(self, url, method='GET', params=None, data=None, stream=False,
expected_response_code=200, headers=None):
"""Make a HTTP request to the InfluxDB API.

Expand All @@ -243,6 +243,8 @@ def request(self, url, method='GET', params=None, data=None,
:type params: dict
:param data: the data of the request, defaults to None
:type data: str
:param stream: True if a query uses chunked responses
:type stream: bool
:param expected_response_code: the expected response code of
the request, defaults to 200
:type expected_response_code: int
Expand Down Expand Up @@ -277,6 +279,7 @@ def request(self, url, method='GET', params=None, data=None,
auth=(self._username, self._password),
params=params,
data=data,
stream=stream,
headers=headers,
proxies=self._proxies,
verify=self._verify_ssl,
Expand Down Expand Up @@ -346,17 +349,17 @@ def write(self, data, params=None, expected_response_code=204,

@staticmethod
def _read_chunked_response(response, raise_errors=True):
result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
result_set = {}
for result in data.get('results', []):
for _key in result:
if isinstance(result[_key], list):
result_set.setdefault(
_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)
yield ResultSet(result_set, raise_errors=raise_errors)

def query(self,
query,
Expand Down Expand Up @@ -447,6 +450,7 @@ def query(self,
method=method,
params=params,
data=None,
stream=chunked,
expected_response_code=expected_response_code
)

Expand Down
44 changes: 18 additions & 26 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,40 +1220,32 @@ def test_invalid_port_fails(self):
def test_chunked_response(self):
"""Test chunked reponse for TestInfluxDBClient object."""
example_response = \
u'{"results":[{"statement_id":0,"series":' \
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
'"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
'"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
'["df"],["mount"]]}]}]}\n'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
response = self.cli.query('show series limit 4 offset 0',
response = self.cli.query('show series',
chunked=True, chunk_size=4)
self.assertTrue(len(response) == 4)
self.assertEqual(response.__repr__(), ResultSet(
{'series': [{'values': [['value', 'integer']],
'name': 'cpu',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'iops',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'load',
'columns': ['fieldKey', 'fieldType']},
{'values': [['value', 'integer']],
'name': 'memory',
'columns': ['fieldKey', 'fieldType']}]}
).__repr__())
res = list(response)
self.assertTrue(len(res) == 2)
self.assertEqual(res[0].__repr__(), ResultSet(
{'series': [{
'columns': ['key'],
'values': [['cpu'], ['memory'], ['iops'], ['network']]
}]}).__repr__())
self.assertEqual(res[1].__repr__(), ResultSet(
{'series': [{
'columns': ['key'],
'values': [['qps'], ['uptime'], ['df'], ['mount']]
}]}).__repr__())


class FakeClient(InfluxDBClient):
Expand Down