Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

feat: Add custom socket_options #890

Merged
merged 2 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import msgpack
import requests
import requests.exceptions
from requests.adapters import HTTPAdapter
from six.moves.urllib.parse import urlparse

from influxdb.line_protocol import make_lines, quote_ident, quote_literal
Expand Down Expand Up @@ -87,6 +88,11 @@ class InfluxDBClient(object):
:param headers: headers to add to Requests, will add 'Content-Type'
and 'Accept' unless these are already present, defaults to {}
:type headers: dict
:param socket_options: use custom tcp socket options,
If not specified, then defaults are loaded from
``HTTPConnection.default_socket_options``
:type socket_options: list

:raises ValueError: if cert is provided but ssl is disabled (set to False)
"""

Expand All @@ -109,6 +115,7 @@ def __init__(self,
gzip=False,
session=None,
headers=None,
socket_options=None,
):
"""Construct a new InfluxDBClient object."""
self.__host = host
Expand All @@ -128,9 +135,10 @@ def __init__(self,
session = requests.Session()

self._session = session
adapter = requests.adapters.HTTPAdapter(
adapter = _SocketOptionsAdapter(
pool_connections=int(pool_size),
pool_maxsize=int(pool_size)
pool_maxsize=int(pool_size),
socket_options=socket_options
)

if use_udp:
Expand Down Expand Up @@ -626,7 +634,7 @@ def _batches(iterable, size):
# http://code.activestate.com/recipes/303279-getting-items-in-batches/
iterator = iter(iterable)
while True:
try: # Try get the first element in the iterator...
try: # Try get the first element in the iterator...
head = (next(iterator),)
except StopIteration:
return # ...so that we can stop if there isn't one
Expand Down Expand Up @@ -1249,3 +1257,16 @@ def _msgpack_parse_hook(code, data):
timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
return timestamp.isoformat() + 'Z'
return msgpack.ExtType(code, data)


class _SocketOptionsAdapter(HTTPAdapter):
"""_SocketOptionsAdapter injects socket_options into HTTP Adapter."""

def __init__(self, *args, **kwargs):
self.socket_options = kwargs.pop("socket_options", None)
super(_SocketOptionsAdapter, self).__init__(*args, **kwargs)

def init_poolmanager(self, *args, **kwargs):
if self.socket_options is not None:
kwargs["socket_options"] = self.socket_options
super(_SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs)
35 changes: 35 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import requests_mock

from nose.tools import raises
from urllib3.connection import HTTPConnection

from influxdb import InfluxDBClient
from influxdb.resultset import ResultSet
Expand Down Expand Up @@ -1498,6 +1499,40 @@ def test_auth_token(self):
self.assertEqual(m.last_request.headers["Authorization"],
"my-token")

def test_custom_socket_options(self):
"""Test custom socket options."""
test_socket_options = HTTPConnection.default_socket_options + \
[(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)]

cli = InfluxDBClient(username=None, password=None,
socket_options=test_socket_options)

self.assertEquals(cli._session.adapters.get("http://").socket_options,
test_socket_options)
self.assertEquals(cli._session.adapters.get("http://").poolmanager.
connection_pool_kw.get("socket_options"),
test_socket_options)

connection_pool = cli._session.adapters.get("http://").poolmanager \
.connection_from_url(
url="http://localhost:8086")
new_connection = connection_pool._new_conn()
self.assertEquals(new_connection.socket_options, test_socket_options)

def test_none_socket_options(self):
"""Test default socket options."""
cli = InfluxDBClient(username=None, password=None)
self.assertEquals(cli._session.adapters.get("http://").socket_options,
None)
connection_pool = cli._session.adapters.get("http://").poolmanager \
.connection_from_url(
url="http://localhost:8086")
new_connection = connection_pool._new_conn()
self.assertEquals(new_connection.socket_options,
HTTPConnection.default_socket_options)


class FakeClient(InfluxDBClient):
"""Set up a fake client instance of InfluxDBClient."""
Expand Down