Skip to content

Commit 57daf8c

Browse files
committed
Add support for messagepack
1 parent d5d1249 commit 57daf8c

File tree

3 files changed

+60
-7
lines changed

3 files changed

+60
-7
lines changed

influxdb/client.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
import random
1111

1212
import json
13+
import struct
14+
import datetime
1315
import socket
16+
import msgpack
1417
import requests
1518
import requests.exceptions
1619
from six.moves import xrange
@@ -128,7 +131,7 @@ def __init__(self,
128131

129132
self._headers = {
130133
'Content-Type': 'application/json',
131-
'Accept': 'text/plain'
134+
'Accept': 'application/x-msgpack'
132135
}
133136

134137
@property
@@ -277,13 +280,22 @@ def request(self, url, method='GET', params=None, data=None,
277280
time.sleep((2 ** _try) * random.random() / 100.0)
278281
if not retry:
279282
raise
283+
284+
def reformat_error(response):
285+
err = self._parse_msgpack(response)
286+
if err:
287+
return json.dumps(err, separators=(',', ':'))
288+
else:
289+
return response.content
290+
280291
# if there's not an error, there must have been a successful response
281292
if 500 <= response.status_code < 600:
282-
raise InfluxDBServerError(response.content)
293+
raise InfluxDBServerError(reformat_error(response))
283294
elif response.status_code == expected_response_code:
284295
return response
285296
else:
286-
raise InfluxDBClientError(response.content, response.status_code)
297+
err_msg = reformat_error(response)
298+
raise InfluxDBClientError(err_msg, response.status_code)
287299

288300
def write(self, data, params=None, expected_response_code=204,
289301
protocol='json'):
@@ -342,6 +354,21 @@ def _read_chunked_response(response, raise_errors=True):
342354
_key, []).extend(result[_key])
343355
return ResultSet(result_set, raise_errors=raise_errors)
344356

357+
@staticmethod
358+
def _parse_msgpack(response):
359+
"""Return the decoded response if it is encoded as msgpack."""
360+
def hook(code, data):
361+
if code == 5:
362+
(epoch_s, epoch_ns) = struct.unpack(">QI", data)
363+
time = datetime.datetime.utcfromtimestamp(epoch_s)
364+
time += datetime.timedelta(microseconds=(epoch_ns / 1000))
365+
return time.isoformat() + 'Z'
366+
return msgpack.ExtType(code, data)
367+
368+
headers = response.headers
369+
if headers and headers["Content-Type"] == "application/x-msgpack":
370+
return msgpack.unpackb(response.content, ext_hook=hook, raw=False)
371+
345372
def query(self,
346373
query,
347374
params=None,
@@ -434,10 +461,11 @@ def query(self,
434461
expected_response_code=expected_response_code
435462
)
436463

437-
if chunked:
438-
return self._read_chunked_response(response)
439-
440-
data = response.json()
464+
data = self._parse_msgpack(response)
465+
if not data:
466+
if chunked:
467+
return self._read_chunked_response(response)
468+
data = response.json()
441469

442470
results = [
443471
ResultSet(result, raise_errors=raise_errors)

influxdb/tests/client_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,29 @@ def test_query(self):
465465
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
466466
)
467467

468+
def test_query_msgpack(self):
469+
"""Test query method with a messagepack response."""
470+
example_response = bytes(bytearray.fromhex(
471+
"81a7726573756c74739182ac73746174656d656e745f696400a673657269"
472+
"65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
473+
"6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
474+
))
475+
476+
with requests_mock.Mocker() as m:
477+
m.register_uri(
478+
requests_mock.GET,
479+
"http://localhost:8086/query",
480+
request_headers={"Accept": "application/x-msgpack"},
481+
headers={"Content-Type": "application/x-msgpack"},
482+
content=example_response
483+
)
484+
rs = self.cli.query('select * from a')
485+
486+
self.assertListEqual(
487+
list(rs.get_points()),
488+
[{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
489+
)
490+
468491
def test_select_into_post(self):
469492
"""Test SELECT.*INTO is POSTed."""
470493
example_response = (

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@ python-dateutil>=2.6.0
22
pytz
33
requests>=2.17.0
44
six>=1.10.0
5+
msgpack==0.6.1
6+

0 commit comments

Comments
 (0)