Skip to content

Commit dec2f05

Browse files
committed
feat: Add custom socket_options
1 parent 6d90ef7 commit dec2f05

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

influxdb/client.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import msgpack
2121
import requests
2222
import requests.exceptions
23+
from requests.adapters import HTTPAdapter
2324
from six.moves.urllib.parse import urlparse
2425

2526
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
@@ -87,6 +88,10 @@ class InfluxDBClient(object):
8788
:param headers: headers to add to Requests, will add 'Content-Type'
8889
and 'Accept' unless these are already present, defaults to {}
8990
:type headers: dict
91+
:param socket_options: use custom tcp socket options, If not specified, then defaults are loaded from
92+
``HTTPConnection.default_socket_options``
93+
:type socket_options: list
94+
9095
:raises ValueError: if cert is provided but ssl is disabled (set to False)
9196
"""
9297

@@ -109,6 +114,7 @@ def __init__(self,
109114
gzip=False,
110115
session=None,
111116
headers=None,
117+
socket_options=None,
112118
):
113119
"""Construct a new InfluxDBClient object."""
114120
self.__host = host
@@ -128,9 +134,10 @@ def __init__(self,
128134
session = requests.Session()
129135

130136
self._session = session
131-
adapter = requests.adapters.HTTPAdapter(
137+
adapter = SocketOptionsAdapter(
132138
pool_connections=int(pool_size),
133-
pool_maxsize=int(pool_size)
139+
pool_maxsize=int(pool_size),
140+
socket_options=socket_options
134141
)
135142

136143
if use_udp:
@@ -626,7 +633,7 @@ def _batches(iterable, size):
626633
# http://code.activestate.com/recipes/303279-getting-items-in-batches/
627634
iterator = iter(iterable)
628635
while True:
629-
try: # Try get the first element in the iterator...
636+
try: # Try get the first element in the iterator...
630637
head = (next(iterator),)
631638
except StopIteration:
632639
return # ...so that we can stop if there isn't one
@@ -1249,3 +1256,14 @@ def _msgpack_parse_hook(code, data):
12491256
timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
12501257
return timestamp.isoformat() + 'Z'
12511258
return msgpack.ExtType(code, data)
1259+
1260+
1261+
class SocketOptionsAdapter(HTTPAdapter):
1262+
def __init__(self, *args, **kwargs):
1263+
self.socket_options = kwargs.pop("socket_options", None)
1264+
super(SocketOptionsAdapter, self).__init__(*args, **kwargs)
1265+
1266+
def init_poolmanager(self, *args, **kwargs):
1267+
if self.socket_options is not None:
1268+
kwargs["socket_options"] = self.socket_options
1269+
super(SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs)

influxdb/tests/client_test.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import requests_mock
3434

3535
from nose.tools import raises
36+
from urllib3.connection import HTTPConnection
3637

3738
from influxdb import InfluxDBClient
3839
from influxdb.resultset import ResultSet
@@ -1498,6 +1499,30 @@ def test_auth_token(self):
14981499
self.assertEqual(m.last_request.headers["Authorization"],
14991500
"my-token")
15001501

1502+
def test_custom_socket_options(self):
1503+
test_socket_options = HTTPConnection.default_socket_options + [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
1504+
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60),
1505+
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)]
1506+
1507+
cli = InfluxDBClient(username=None, password=None, socket_options=test_socket_options)
1508+
1509+
self.assertEquals(cli._session.adapters.get("http://").socket_options, test_socket_options)
1510+
self.assertEquals(cli._session.adapters.get("http://").poolmanager.connection_pool_kw.get("socket_options"),
1511+
test_socket_options)
1512+
1513+
connection_pool = cli._session.adapters.get("http://").poolmanager.connection_from_url(
1514+
url="http://localhost:8086")
1515+
new_connection = connection_pool._new_conn()
1516+
self.assertEquals(new_connection.socket_options, test_socket_options)
1517+
1518+
def test_none_socket_options(self):
1519+
cli = InfluxDBClient(username=None, password=None)
1520+
self.assertEquals(cli._session.adapters.get("http://").socket_options, None)
1521+
connection_pool = cli._session.adapters.get("http://").poolmanager.connection_from_url(
1522+
url="http://localhost:8086")
1523+
new_connection = connection_pool._new_conn()
1524+
self.assertEquals(new_connection.socket_options, HTTPConnection.default_socket_options)
1525+
15011526

15021527
class FakeClient(InfluxDBClient):
15031528
"""Set up a fake client instance of InfluxDBClient."""

0 commit comments

Comments
 (0)