From dec2f0587ca0592cb94072d58b5f545ebfe68ba2 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 28 Apr 2021 15:55:50 +0200 Subject: [PATCH 1/2] feat: Add custom socket_options --- influxdb/client.py | 24 +++++++++++++++++++++--- influxdb/tests/client_test.py | 25 +++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index c413181b..2d0b0ddb 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -20,6 +20,7 @@ import msgpack import requests import requests.exceptions +from requests.adapters import HTTPAdapter from six.moves.urllib.parse import urlparse from influxdb.line_protocol import make_lines, quote_ident, quote_literal @@ -87,6 +88,10 @@ class InfluxDBClient(object): :param headers: headers to add to Requests, will add 'Content-Type' and 'Accept' unless these are already present, defaults to {} :type headers: dict + :param socket_options: use custom tcp socket options, If not specified, then defaults are loaded from + ``HTTPConnection.default_socket_options`` + :type socket_options: list + :raises ValueError: if cert is provided but ssl is disabled (set to False) """ @@ -109,6 +114,7 @@ def __init__(self, gzip=False, session=None, headers=None, + socket_options=None, ): """Construct a new InfluxDBClient object.""" self.__host = host @@ -128,9 +134,10 @@ def __init__(self, session = requests.Session() self._session = session - adapter = requests.adapters.HTTPAdapter( + adapter = SocketOptionsAdapter( pool_connections=int(pool_size), - pool_maxsize=int(pool_size) + pool_maxsize=int(pool_size), + socket_options=socket_options ) if use_udp: @@ -626,7 +633,7 @@ def _batches(iterable, size): # http://code.activestate.com/recipes/303279-getting-items-in-batches/ iterator = iter(iterable) while True: - try: # Try get the first element in the iterator... + try: # Try get the first element in the iterator... head = (next(iterator),) except StopIteration: return # ...so that we can stop if there isn't one @@ -1249,3 +1256,14 @@ def _msgpack_parse_hook(code, data): timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000)) return timestamp.isoformat() + 'Z' return msgpack.ExtType(code, data) + + +class SocketOptionsAdapter(HTTPAdapter): + def __init__(self, *args, **kwargs): + self.socket_options = kwargs.pop("socket_options", None) + super(SocketOptionsAdapter, self).__init__(*args, **kwargs) + + def init_poolmanager(self, *args, **kwargs): + if self.socket_options is not None: + kwargs["socket_options"] = self.socket_options + super(SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 1f9d704a..6b65249c 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -33,6 +33,7 @@ import requests_mock from nose.tools import raises +from urllib3.connection import HTTPConnection from influxdb import InfluxDBClient from influxdb.resultset import ResultSet @@ -1498,6 +1499,30 @@ def test_auth_token(self): self.assertEqual(m.last_request.headers["Authorization"], "my-token") + def test_custom_socket_options(self): + test_socket_options = HTTPConnection.default_socket_options + [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60), + (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)] + + cli = InfluxDBClient(username=None, password=None, socket_options=test_socket_options) + + self.assertEquals(cli._session.adapters.get("http://").socket_options, test_socket_options) + self.assertEquals(cli._session.adapters.get("http://").poolmanager.connection_pool_kw.get("socket_options"), + test_socket_options) + + connection_pool = cli._session.adapters.get("http://").poolmanager.connection_from_url( + url="http://localhost:8086") + new_connection = connection_pool._new_conn() + self.assertEquals(new_connection.socket_options, test_socket_options) + + def test_none_socket_options(self): + cli = InfluxDBClient(username=None, password=None) + self.assertEquals(cli._session.adapters.get("http://").socket_options, None) + connection_pool = cli._session.adapters.get("http://").poolmanager.connection_from_url( + url="http://localhost:8086") + new_connection = connection_pool._new_conn() + self.assertEquals(new_connection.socket_options, HTTPConnection.default_socket_options) + class FakeClient(InfluxDBClient): """Set up a fake client instance of InfluxDBClient.""" From 6ba88c064e7fd99df7251409c1b8aad2c88335e2 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 29 Apr 2021 13:30:47 +0200 Subject: [PATCH 2/2] feat: Add custom socket_options --- influxdb/client.py | 13 ++++++++----- influxdb/tests/client_test.py | 30 ++++++++++++++++++++---------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 2d0b0ddb..548c5772 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -88,7 +88,8 @@ class InfluxDBClient(object): :param headers: headers to add to Requests, will add 'Content-Type' and 'Accept' unless these are already present, defaults to {} :type headers: dict - :param socket_options: use custom tcp socket options, If not specified, then defaults are loaded from + :param socket_options: use custom tcp socket options, + If not specified, then defaults are loaded from ``HTTPConnection.default_socket_options`` :type socket_options: list @@ -134,7 +135,7 @@ def __init__(self, session = requests.Session() self._session = session - adapter = SocketOptionsAdapter( + adapter = _SocketOptionsAdapter( pool_connections=int(pool_size), pool_maxsize=int(pool_size), socket_options=socket_options @@ -1258,12 +1259,14 @@ def _msgpack_parse_hook(code, data): return msgpack.ExtType(code, data) -class SocketOptionsAdapter(HTTPAdapter): +class _SocketOptionsAdapter(HTTPAdapter): + """_SocketOptionsAdapter injects socket_options into HTTP Adapter.""" + def __init__(self, *args, **kwargs): self.socket_options = kwargs.pop("socket_options", None) - super(SocketOptionsAdapter, self).__init__(*args, **kwargs) + super(_SocketOptionsAdapter, self).__init__(*args, **kwargs) def init_poolmanager(self, *args, **kwargs): if self.socket_options is not None: kwargs["socket_options"] = self.socket_options - super(SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs) + super(_SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 6b65249c..115fbc48 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -1500,28 +1500,38 @@ def test_auth_token(self): "my-token") def test_custom_socket_options(self): - test_socket_options = HTTPConnection.default_socket_options + [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), - (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60), - (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)] + """Test custom socket options.""" + test_socket_options = HTTPConnection.default_socket_options + \ + [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60), + (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)] - cli = InfluxDBClient(username=None, password=None, socket_options=test_socket_options) + cli = InfluxDBClient(username=None, password=None, + socket_options=test_socket_options) - self.assertEquals(cli._session.adapters.get("http://").socket_options, test_socket_options) - self.assertEquals(cli._session.adapters.get("http://").poolmanager.connection_pool_kw.get("socket_options"), + self.assertEquals(cli._session.adapters.get("http://").socket_options, + test_socket_options) + self.assertEquals(cli._session.adapters.get("http://").poolmanager. + connection_pool_kw.get("socket_options"), test_socket_options) - connection_pool = cli._session.adapters.get("http://").poolmanager.connection_from_url( + connection_pool = cli._session.adapters.get("http://").poolmanager \ + .connection_from_url( url="http://localhost:8086") new_connection = connection_pool._new_conn() self.assertEquals(new_connection.socket_options, test_socket_options) def test_none_socket_options(self): + """Test default socket options.""" cli = InfluxDBClient(username=None, password=None) - self.assertEquals(cli._session.adapters.get("http://").socket_options, None) - connection_pool = cli._session.adapters.get("http://").poolmanager.connection_from_url( + self.assertEquals(cli._session.adapters.get("http://").socket_options, + None) + connection_pool = cli._session.adapters.get("http://").poolmanager \ + .connection_from_url( url="http://localhost:8086") new_connection = connection_pool._new_conn() - self.assertEquals(new_connection.socket_options, HTTPConnection.default_socket_options) + self.assertEquals(new_connection.socket_options, + HTTPConnection.default_socket_options) class FakeClient(InfluxDBClient):