|
10 | 10 | import random
|
11 | 11 |
|
12 | 12 | import json
|
| 13 | +import struct |
| 14 | +import datetime |
13 | 15 | import socket
|
| 16 | +import msgpack |
14 | 17 | import requests
|
15 | 18 | import requests.exceptions
|
16 | 19 | from six.moves import xrange
|
@@ -128,7 +131,7 @@ def __init__(self,
|
128 | 131 |
|
129 | 132 | self._headers = {
|
130 | 133 | 'Content-Type': 'application/json',
|
131 |
| - 'Accept': 'text/plain' |
| 134 | + 'Accept': 'application/x-msgpack' |
132 | 135 | }
|
133 | 136 |
|
134 | 137 | @property
|
@@ -277,13 +280,22 @@ def request(self, url, method='GET', params=None, data=None,
|
277 | 280 | time.sleep((2 ** _try) * random.random() / 100.0)
|
278 | 281 | if not retry:
|
279 | 282 | raise
|
| 283 | + |
| 284 | + def reformat_error(response): |
| 285 | + err = self._parse_msgpack(response) |
| 286 | + if err: |
| 287 | + return json.dumps(err, separators=(',', ':')) |
| 288 | + else: |
| 289 | + return response.content |
| 290 | + |
280 | 291 | # if there's not an error, there must have been a successful response
|
281 | 292 | if 500 <= response.status_code < 600:
|
282 |
| - raise InfluxDBServerError(response.content) |
| 293 | + raise InfluxDBServerError(reformat_error(response)) |
283 | 294 | elif response.status_code == expected_response_code:
|
284 | 295 | return response
|
285 | 296 | else:
|
286 |
| - raise InfluxDBClientError(response.content, response.status_code) |
| 297 | + err_msg = reformat_error(response) |
| 298 | + raise InfluxDBClientError(err_msg, response.status_code) |
287 | 299 |
|
288 | 300 | def write(self, data, params=None, expected_response_code=204,
|
289 | 301 | protocol='json'):
|
@@ -342,6 +354,21 @@ def _read_chunked_response(response, raise_errors=True):
|
342 | 354 | _key, []).extend(result[_key])
|
343 | 355 | return ResultSet(result_set, raise_errors=raise_errors)
|
344 | 356 |
|
| 357 | + @staticmethod |
| 358 | + def _parse_msgpack(response): |
| 359 | + """Return the decoded response if it is encoded as msgpack.""" |
| 360 | + def hook(code, data): |
| 361 | + if code == 5: |
| 362 | + (epoch_s, epoch_ns) = struct.unpack(">QI", data) |
| 363 | + time = datetime.datetime.utcfromtimestamp(epoch_s) |
| 364 | + time += datetime.timedelta(microseconds=(epoch_ns / 1000)) |
| 365 | + return time.isoformat() + 'Z' |
| 366 | + return msgpack.ExtType(code, data) |
| 367 | + |
| 368 | + headers = response.headers |
| 369 | + if headers and headers["Content-Type"] == "application/x-msgpack": |
| 370 | + return msgpack.unpackb(response.content, ext_hook=hook, raw=False) |
| 371 | + |
345 | 372 | def query(self,
|
346 | 373 | query,
|
347 | 374 | params=None,
|
@@ -434,10 +461,11 @@ def query(self,
|
434 | 461 | expected_response_code=expected_response_code
|
435 | 462 | )
|
436 | 463 |
|
437 |
| - if chunked: |
438 |
| - return self._read_chunked_response(response) |
439 |
| - |
440 |
| - data = response.json() |
| 464 | + data = self._parse_msgpack(response) |
| 465 | + if not data: |
| 466 | + if chunked: |
| 467 | + return self._read_chunked_response(response) |
| 468 | + data = response.json() |
441 | 469 |
|
442 | 470 | results = [
|
443 | 471 | ResultSet(result, raise_errors=raise_errors)
|
|
0 commit comments