Skip to content

Commit 2c3d49c

Browse files
authored
Merge pull request influxdata#890 from rhajek/feat/tcp-keepalive
feat: Add custom socket_options
2 parents 6d90ef7 + 6ba88c0 commit 2c3d49c

File tree

2 files changed

+59
-3
lines changed

2 files changed

+59
-3
lines changed

influxdb/client.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import msgpack
2121
import requests
2222
import requests.exceptions
23+
from requests.adapters import HTTPAdapter
2324
from six.moves.urllib.parse import urlparse
2425

2526
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
@@ -87,6 +88,11 @@ class InfluxDBClient(object):
8788
:param headers: headers to add to Requests, will add 'Content-Type'
8889
and 'Accept' unless these are already present, defaults to {}
8990
:type headers: dict
91+
:param socket_options: use custom tcp socket options,
92+
If not specified, then defaults are loaded from
93+
``HTTPConnection.default_socket_options``
94+
:type socket_options: list
95+
9096
:raises ValueError: if cert is provided but ssl is disabled (set to False)
9197
"""
9298

@@ -109,6 +115,7 @@ def __init__(self,
109115
gzip=False,
110116
session=None,
111117
headers=None,
118+
socket_options=None,
112119
):
113120
"""Construct a new InfluxDBClient object."""
114121
self.__host = host
@@ -128,9 +135,10 @@ def __init__(self,
128135
session = requests.Session()
129136

130137
self._session = session
131-
adapter = requests.adapters.HTTPAdapter(
138+
adapter = _SocketOptionsAdapter(
132139
pool_connections=int(pool_size),
133-
pool_maxsize=int(pool_size)
140+
pool_maxsize=int(pool_size),
141+
socket_options=socket_options
134142
)
135143

136144
if use_udp:
@@ -626,7 +634,7 @@ def _batches(iterable, size):
626634
# http://code.activestate.com/recipes/303279-getting-items-in-batches/
627635
iterator = iter(iterable)
628636
while True:
629-
try: # Try get the first element in the iterator...
637+
try: # Try get the first element in the iterator...
630638
head = (next(iterator),)
631639
except StopIteration:
632640
return # ...so that we can stop if there isn't one
@@ -1249,3 +1257,16 @@ def _msgpack_parse_hook(code, data):
12491257
timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
12501258
return timestamp.isoformat() + 'Z'
12511259
return msgpack.ExtType(code, data)
1260+
1261+
1262+
class _SocketOptionsAdapter(HTTPAdapter):
1263+
"""_SocketOptionsAdapter injects socket_options into HTTP Adapter."""
1264+
1265+
def __init__(self, *args, **kwargs):
1266+
self.socket_options = kwargs.pop("socket_options", None)
1267+
super(_SocketOptionsAdapter, self).__init__(*args, **kwargs)
1268+
1269+
def init_poolmanager(self, *args, **kwargs):
1270+
if self.socket_options is not None:
1271+
kwargs["socket_options"] = self.socket_options
1272+
super(_SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs)

influxdb/tests/client_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import requests_mock
3434

3535
from nose.tools import raises
36+
from urllib3.connection import HTTPConnection
3637

3738
from influxdb import InfluxDBClient
3839
from influxdb.resultset import ResultSet
@@ -1498,6 +1499,40 @@ def test_auth_token(self):
14981499
self.assertEqual(m.last_request.headers["Authorization"],
14991500
"my-token")
15001501

1502+
def test_custom_socket_options(self):
1503+
"""Test custom socket options."""
1504+
test_socket_options = HTTPConnection.default_socket_options + \
1505+
[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
1506+
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60),
1507+
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)]
1508+
1509+
cli = InfluxDBClient(username=None, password=None,
1510+
socket_options=test_socket_options)
1511+
1512+
self.assertEquals(cli._session.adapters.get("http://").socket_options,
1513+
test_socket_options)
1514+
self.assertEquals(cli._session.adapters.get("http://").poolmanager.
1515+
connection_pool_kw.get("socket_options"),
1516+
test_socket_options)
1517+
1518+
connection_pool = cli._session.adapters.get("http://").poolmanager \
1519+
.connection_from_url(
1520+
url="http://localhost:8086")
1521+
new_connection = connection_pool._new_conn()
1522+
self.assertEquals(new_connection.socket_options, test_socket_options)
1523+
1524+
def test_none_socket_options(self):
1525+
"""Test default socket options."""
1526+
cli = InfluxDBClient(username=None, password=None)
1527+
self.assertEquals(cli._session.adapters.get("http://").socket_options,
1528+
None)
1529+
connection_pool = cli._session.adapters.get("http://").poolmanager \
1530+
.connection_from_url(
1531+
url="http://localhost:8086")
1532+
new_connection = connection_pool._new_conn()
1533+
self.assertEquals(new_connection.socket_options,
1534+
HTTPConnection.default_socket_options)
1535+
15011536

15021537
class FakeClient(InfluxDBClient):
15031538
"""Set up a fake client instance of InfluxDBClient."""

0 commit comments

Comments
 (0)