Skip to content

Commit 1df262c

Browse files
author
Panos
committed
Added chunked query responses implementation and test. Added chunked parameter to client query function.
1 parent c9fcede commit 1df262c

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

influxdb/client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,13 +293,23 @@ def write(self, data, params=None, expected_response_code=204,
293293
)
294294
return True
295295

296+
def _read_chunked_response(self, response, raise_errors=True):
297+
for line in response.iter_lines():
298+
# import ipdb; ipdb.set_trace()
299+
if isinstance(line, bytes):
300+
line = line.decode('utf-8')
301+
data = json.loads(line)
302+
for result in data.get('results', []):
303+
yield ResultSet(result, raise_errors=raise_errors)
304+
296305
def query(self,
297306
query,
298307
params=None,
299308
epoch=None,
300309
expected_response_code=200,
301310
database=None,
302-
raise_errors=True):
311+
raise_errors=True,
312+
chunked=False):
303313
"""Send a query to InfluxDB.
304314
305315
:param query: the actual query string
@@ -339,6 +349,10 @@ def query(self,
339349
expected_response_code=expected_response_code
340350
)
341351

352+
if chunked or 'chunked' in params:
353+
params['chunked'] = 'true'
354+
return self._read_chunked_response(response)
355+
342356
data = response.json()
343357

344358
results = [

influxdb/tests/client_test.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import unittest
3333

3434
from influxdb import InfluxDBClient
35-
35+
from influxdb.resultset import ResultSet
3636

3737
def _build_response_object(status_code=200, content=""):
3838
resp = requests.Response()
@@ -792,6 +792,37 @@ def test_invalid_port_fails(self):
792792
with self.assertRaises(ValueError):
793793
InfluxDBClient('host', '80/redir', 'username', 'password')
794794

795+
def test_chunked_response(self):
796+
example_response = u'{"results":[{"statement_id":0,"series": ' \
797+
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
798+
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
799+
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
800+
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
801+
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
802+
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
803+
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
804+
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
805+
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
806+
807+
with requests_mock.Mocker() as m:
808+
m.register_uri(
809+
requests_mock.GET,
810+
"http://localhost:8086/query",
811+
text=example_response
812+
)
813+
response = list(self.cli.query('show series limit 4 offset 0', chunked=True))
814+
self.assertTrue(len(response) == 4)
815+
self.assertEqual(response[0].raw, ResultSet(
816+
{"statement_id":0,
817+
"series": [{"name":"cpu","columns":["fieldKey","fieldType"],
818+
"values": [["value","integer"]]}],"partial":True}
819+
).raw)
820+
self.assertEqual(response[3].raw, ResultSet(
821+
{"statement_id":0,
822+
"series":[{"name":"memory","columns":
823+
["fieldKey","fieldType"],
824+
"values":[["value","integer"]]}]}
825+
).raw)
795826

796827
class FakeClient(InfluxDBClient):
797828

0 commit comments

Comments
 (0)