Skip to content

Commit c858a46

Browse files
KEClaytorKevin Claytorsebito91
authored
gzip compression for data (post and responses) (influxdata#732)
* gzip compression working in my influx stack. Needs proper tests. * Also gzip data from server, slightly more straightforward data handling. * Adding in test cases. * Switching back to zlib with gzip headers. * flake8 compatibility * Move parameter into correct position. per review * Switching back to gzip for the headers. * Fixing python 2.7 compatability with gzip. * flake8 compatibility. * flake8 testing Co-authored-by: Kevin Claytor <kevin.e.claytor.civ@mail.mil> Co-authored-by: Sebastian Borza <sebito91@gmail.com>
1 parent 72c18b8 commit c858a46

File tree

4 files changed

+162
-1
lines changed

4 files changed

+162
-1
lines changed

influxdb/client.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
from __future__ import unicode_literals
88

99
import datetime
10+
import gzip
1011
import itertools
12+
import io
1113
import json
1214
import random
1315
import socket
@@ -70,10 +72,11 @@ class InfluxDBClient(object):
7072
as a single file containing the private key and the certificate, or as
7173
a tuple of both files’ paths, defaults to None
7274
:type cert: str
75+
:param gzip: use gzip content encoding to compress requests
76+
:type gzip: bool
7377
:param session: allow for the new client request to use an existing
7478
requests Session, defaults to None
7579
:type session: requests.Session
76-
7780
:raises ValueError: if cert is provided but ssl is disabled (set to False)
7881
"""
7982

@@ -93,6 +96,7 @@ def __init__(self,
9396
pool_size=10,
9497
path='',
9598
cert=None,
99+
gzip=False,
96100
session=None,
97101
):
98102
"""Construct a new InfluxDBClient object."""
@@ -159,6 +163,8 @@ def __init__(self,
159163
'Accept': 'application/x-msgpack'
160164
}
161165

166+
self._gzip = gzip
167+
162168
@property
163169
def _baseurl(self):
164170
return self.__baseurl
@@ -278,6 +284,23 @@ def request(self, url, method='GET', params=None, data=None,
278284
if isinstance(data, (dict, list)):
279285
data = json.dumps(data)
280286

287+
if self._gzip:
288+
# Receive and send compressed data
289+
headers.update({
290+
'Accept-Encoding': 'gzip',
291+
'Content-Encoding': 'gzip',
292+
})
293+
if data is not None:
294+
# For Py 2.7 compatability use Gzipfile
295+
compressed = io.BytesIO()
296+
with gzip.GzipFile(
297+
compresslevel=9,
298+
fileobj=compressed,
299+
mode='w'
300+
) as f:
301+
f.write(data)
302+
data = compressed.getvalue()
303+
281304
# Try to send the request more than once by default (see #103)
282305
retry = True
283306
_try = 0

influxdb/tests/client_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import unittest
2525
import warnings
2626

27+
import io
28+
import gzip
2729
import json
2830
import mock
2931
import requests
@@ -214,6 +216,71 @@ def test_write_points(self):
214216
m.last_request.body.decode('utf-8'),
215217
)
216218

219+
def test_write_gzip(self):
220+
"""Test write in TestInfluxDBClient object."""
221+
with requests_mock.Mocker() as m:
222+
m.register_uri(
223+
requests_mock.POST,
224+
"http://localhost:8086/write",
225+
status_code=204
226+
)
227+
228+
cli = InfluxDBClient(database='db', gzip=True)
229+
cli.write(
230+
{"database": "mydb",
231+
"retentionPolicy": "mypolicy",
232+
"points": [{"measurement": "cpu_load_short",
233+
"tags": {"host": "server01",
234+
"region": "us-west"},
235+
"time": "2009-11-10T23:00:00Z",
236+
"fields": {"value": 0.64}}]}
237+
)
238+
239+
compressed = io.BytesIO()
240+
with gzip.GzipFile(
241+
compresslevel=9,
242+
fileobj=compressed,
243+
mode='w'
244+
) as f:
245+
f.write(
246+
b"cpu_load_short,host=server01,region=us-west "
247+
b"value=0.64 1257894000000000000\n"
248+
)
249+
250+
self.assertEqual(
251+
m.last_request.body,
252+
compressed.getvalue(),
253+
)
254+
255+
def test_write_points_gzip(self):
256+
"""Test write points for TestInfluxDBClient object."""
257+
with requests_mock.Mocker() as m:
258+
m.register_uri(
259+
requests_mock.POST,
260+
"http://localhost:8086/write",
261+
status_code=204
262+
)
263+
264+
cli = InfluxDBClient(database='db', gzip=True)
265+
cli.write_points(
266+
self.dummy_points,
267+
)
268+
269+
compressed = io.BytesIO()
270+
with gzip.GzipFile(
271+
compresslevel=9,
272+
fileobj=compressed,
273+
mode='w'
274+
) as f:
275+
f.write(
276+
b'cpu_load_short,host=server01,region=us-west '
277+
b'value=0.64 1257894000123456000\n'
278+
)
279+
self.assertEqual(
280+
m.last_request.body,
281+
compressed.getvalue(),
282+
)
283+
217284
def test_write_points_toplevel_attributes(self):
218285
"""Test write points attrs for TestInfluxDBClient object."""
219286
with requests_mock.Mocker() as m:

influxdb/tests/server_tests/base.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ def _setup_influxdb_server(inst):
3636
database='db')
3737

3838

39+
def _setup_gzip_client(inst):
40+
inst.cli = InfluxDBClient('localhost',
41+
inst.influxd_inst.http_port,
42+
'root',
43+
'',
44+
database='db',
45+
gzip=True)
46+
47+
3948
def _teardown_influxdb_server(inst):
4049
remove_tree = sys.exc_info() == (None, None, None)
4150
inst.influxd_inst.close(remove_tree=remove_tree)
@@ -89,3 +98,41 @@ def tearDownClass(cls):
8998
def tearDown(self):
9099
"""Deconstruct an instance of ManyTestCasesWithServerMixin."""
91100
self.cli.drop_database('db')
101+
102+
103+
class SingleTestCaseWithServerGzipMixin(object):
104+
"""Define the single testcase with server with gzip client mixin.
105+
106+
Same as the SingleTestCaseWithServerGzipMixin but the InfluxDBClient has
107+
gzip=True
108+
"""
109+
110+
@classmethod
111+
def setUp(cls):
112+
"""Set up an instance of the SingleTestCaseWithServerGzipMixin."""
113+
_setup_influxdb_server(cls)
114+
_setup_gzip_client(cls)
115+
116+
@classmethod
117+
def tearDown(cls):
118+
"""Tear down an instance of the SingleTestCaseWithServerMixin."""
119+
_teardown_influxdb_server(cls)
120+
121+
122+
class ManyTestCasesWithServerGzipMixin(object):
123+
"""Define the many testcase with server with gzip client mixin.
124+
125+
Same as the ManyTestCasesWithServerMixin but the InfluxDBClient has
126+
gzip=True.
127+
"""
128+
129+
@classmethod
130+
def setUpClass(cls):
131+
"""Set up an instance of the ManyTestCasesWithServerGzipMixin."""
132+
_setup_influxdb_server(cls)
133+
_setup_gzip_client(cls)
134+
135+
@classmethod
136+
def tearDown(cls):
137+
"""Tear down an instance of the SingleTestCaseWithServerMixin."""
138+
_teardown_influxdb_server(cls)

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from influxdb.tests import skip_if_pypy, using_pypy, skip_server_tests
2727
from influxdb.tests.server_tests.base import ManyTestCasesWithServerMixin
2828
from influxdb.tests.server_tests.base import SingleTestCaseWithServerMixin
29+
from influxdb.tests.server_tests.base import ManyTestCasesWithServerGzipMixin
30+
from influxdb.tests.server_tests.base import SingleTestCaseWithServerGzipMixin
2931

3032
# By default, raise exceptions on warnings
3133
warnings.simplefilter('error', FutureWarning)
@@ -913,3 +915,25 @@ def test_write_points_udp(self):
913915
],
914916
list(rsp['cpu_load_short'])
915917
)
918+
919+
920+
# Run the tests again, but with gzip enabled this time
921+
@skip_server_tests
922+
class GzipSimpleTests(SimpleTests, SingleTestCaseWithServerGzipMixin):
923+
"""Repeat the simple tests with InfluxDBClient where gzip=True."""
924+
925+
pass
926+
927+
928+
@skip_server_tests
929+
class GzipCommonTests(CommonTests, ManyTestCasesWithServerGzipMixin):
930+
"""Repeat the common tests with InfluxDBClient where gzip=True."""
931+
932+
pass
933+
934+
935+
@skip_server_tests
936+
class GzipUdpTests(UdpTests, ManyTestCasesWithServerGzipMixin):
937+
"""Repeat the UDP tests with InfluxDBClient where gzip=True."""
938+
939+
pass

0 commit comments

Comments
 (0)