Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

gzip compression for data (post and responses) #732

Merged
merged 13 commits into from
Apr 10, 2020
Merged
25 changes: 24 additions & 1 deletion influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from __future__ import unicode_literals

import datetime
import gzip
import itertools
import io
import json
import random
import socket
Expand Down Expand Up @@ -70,10 +72,11 @@ class InfluxDBClient(object):
as a single file containing the private key and the certificate, or as
a tuple of both files’ paths, defaults to None
:type cert: str
:param gzip: use gzip content encoding to compress requests
:type gzip: bool
:param session: allow for the new client request to use an existing
requests Session, defaults to None
:type session: requests.Session

:raises ValueError: if cert is provided but ssl is disabled (set to False)
"""

Expand All @@ -93,6 +96,7 @@ def __init__(self,
pool_size=10,
path='',
cert=None,
gzip=False,
session=None,
):
"""Construct a new InfluxDBClient object."""
Expand Down Expand Up @@ -159,6 +163,8 @@ def __init__(self,
'Accept': 'application/x-msgpack'
}

self._gzip = gzip

@property
def _baseurl(self):
return self.__baseurl
Expand Down Expand Up @@ -278,6 +284,23 @@ def request(self, url, method='GET', params=None, data=None,
if isinstance(data, (dict, list)):
data = json.dumps(data)

if self._gzip:
# Receive and send compressed data
headers.update({
'Accept-Encoding': 'gzip',
'Content-Encoding': 'gzip',
})
if data is not None:
# For Py 2.7 compatability use Gzipfile
compressed = io.BytesIO()
with gzip.GzipFile(
compresslevel=9,
fileobj=compressed,
mode='w'
) as f:
f.write(data)
data = compressed.getvalue()

# Try to send the request more than once by default (see #103)
retry = True
_try = 0
Expand Down
67 changes: 67 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import unittest
import warnings

import io
import gzip
import json
import mock
import requests
Expand Down Expand Up @@ -214,6 +216,71 @@ def test_write_points(self):
m.last_request.body.decode('utf-8'),
)

def test_write_gzip(self):
"""Test write in TestInfluxDBClient object."""
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.POST,
"http://localhost:8086/write",
status_code=204
)

cli = InfluxDBClient(database='db', gzip=True)
cli.write(
{"database": "mydb",
"retentionPolicy": "mypolicy",
"points": [{"measurement": "cpu_load_short",
"tags": {"host": "server01",
"region": "us-west"},
"time": "2009-11-10T23:00:00Z",
"fields": {"value": 0.64}}]}
)

compressed = io.BytesIO()
with gzip.GzipFile(
compresslevel=9,
fileobj=compressed,
mode='w'
) as f:
f.write(
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000000000000\n"
)

self.assertEqual(
m.last_request.body,
compressed.getvalue(),
)

def test_write_points_gzip(self):
"""Test write points for TestInfluxDBClient object."""
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.POST,
"http://localhost:8086/write",
status_code=204
)

cli = InfluxDBClient(database='db', gzip=True)
cli.write_points(
self.dummy_points,
)

compressed = io.BytesIO()
with gzip.GzipFile(
compresslevel=9,
fileobj=compressed,
mode='w'
) as f:
f.write(
b'cpu_load_short,host=server01,region=us-west '
b'value=0.64 1257894000123456000\n'
)
self.assertEqual(
m.last_request.body,
compressed.getvalue(),
)

def test_write_points_toplevel_attributes(self):
"""Test write points attrs for TestInfluxDBClient object."""
with requests_mock.Mocker() as m:
Expand Down
47 changes: 47 additions & 0 deletions influxdb/tests/server_tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ def _setup_influxdb_server(inst):
database='db')


def _setup_gzip_client(inst):
inst.cli = InfluxDBClient('localhost',
inst.influxd_inst.http_port,
'root',
'',
database='db',
gzip=True)


def _teardown_influxdb_server(inst):
remove_tree = sys.exc_info() == (None, None, None)
inst.influxd_inst.close(remove_tree=remove_tree)
Expand Down Expand Up @@ -89,3 +98,41 @@ def tearDownClass(cls):
def tearDown(self):
"""Deconstruct an instance of ManyTestCasesWithServerMixin."""
self.cli.drop_database('db')


class SingleTestCaseWithServerGzipMixin(object):
"""Define the single testcase with server with gzip client mixin.

Same as the SingleTestCaseWithServerGzipMixin but the InfluxDBClient has
gzip=True
"""

@classmethod
def setUp(cls):
"""Set up an instance of the SingleTestCaseWithServerGzipMixin."""
_setup_influxdb_server(cls)
_setup_gzip_client(cls)

@classmethod
def tearDown(cls):
"""Tear down an instance of the SingleTestCaseWithServerMixin."""
_teardown_influxdb_server(cls)


class ManyTestCasesWithServerGzipMixin(object):
"""Define the many testcase with server with gzip client mixin.

Same as the ManyTestCasesWithServerMixin but the InfluxDBClient has
gzip=True.
"""

@classmethod
def setUpClass(cls):
"""Set up an instance of the ManyTestCasesWithServerGzipMixin."""
_setup_influxdb_server(cls)
_setup_gzip_client(cls)

@classmethod
def tearDown(cls):
"""Tear down an instance of the SingleTestCaseWithServerMixin."""
_teardown_influxdb_server(cls)
24 changes: 24 additions & 0 deletions influxdb/tests/server_tests/client_test_with_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from influxdb.tests import skip_if_pypy, using_pypy, skip_server_tests
from influxdb.tests.server_tests.base import ManyTestCasesWithServerMixin
from influxdb.tests.server_tests.base import SingleTestCaseWithServerMixin
from influxdb.tests.server_tests.base import ManyTestCasesWithServerGzipMixin
from influxdb.tests.server_tests.base import SingleTestCaseWithServerGzipMixin

# By default, raise exceptions on warnings
warnings.simplefilter('error', FutureWarning)
Expand Down Expand Up @@ -913,3 +915,25 @@ def test_write_points_udp(self):
],
list(rsp['cpu_load_short'])
)


# Run the tests again, but with gzip enabled this time
@skip_server_tests
class GzipSimpleTests(SimpleTests, SingleTestCaseWithServerGzipMixin):
"""Repeat the simple tests with InfluxDBClient where gzip=True."""

pass


@skip_server_tests
class GzipCommonTests(CommonTests, ManyTestCasesWithServerGzipMixin):
"""Repeat the common tests with InfluxDBClient where gzip=True."""

pass


@skip_server_tests
class GzipUdpTests(UdpTests, ManyTestCasesWithServerGzipMixin):
"""Repeat the UDP tests with InfluxDBClient where gzip=True."""

pass