Skip to content

Commit 996d009

Browse files
committed
Merge pull request influxdata#265 from PierreF/cluster-client-threadsafe
Make InfluxDBClusterClient thread-safe (Thanks @PierreF !)
2 parents 42bc641 + 4b1e87d commit 996d009

File tree

1 file changed

+50
-15
lines changed

1 file changed

+50
-15
lines changed

influxdb/client.py

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from functools import wraps
77
import json
88
import socket
9+
import threading
910
import random
1011
import requests
1112
import requests.exceptions
@@ -73,7 +74,7 @@ def __init__(self,
7374
proxies=None,
7475
):
7576
"""Construct a new InfluxDBClient object."""
76-
self._host = host
77+
self.__host = host
7778
self._port = port
7879
self._username = username
7980
self._password = password
@@ -98,7 +99,7 @@ def __init__(self,
9899
else:
99100
self._proxies = proxies
100101

101-
self._baseurl = "{0}://{1}:{2}".format(
102+
self.__baseurl = "{0}://{1}:{2}".format(
102103
self._scheme,
103104
self._host,
104105
self._port)
@@ -108,6 +109,22 @@ def __init__(self,
108109
'Accept': 'text/plain'
109110
}
110111

112+
# _baseurl and _host are properties to allow InfluxDBClusterClient
113+
# to override them with thread-local variables
114+
@property
115+
def _baseurl(self):
116+
return self._get_baseurl()
117+
118+
def _get_baseurl(self):
119+
return self.__baseurl
120+
121+
@property
122+
def _host(self):
123+
return self._get_host()
124+
125+
def _get_host(self):
126+
return self.__host
127+
111128
@staticmethod
112129
def from_DSN(dsn, **kwargs):
113130
"""Return an instance of :class:`~.InfluxDBClient` from the provided
@@ -740,6 +757,8 @@ def __init__(self,
740757
self.bad_hosts = [] # Corresponding server has failures in history
741758
self.shuffle = shuffle
742759
host, port = self.hosts[0]
760+
self._hosts_lock = threading.Lock()
761+
self._thread_local = threading.local()
743762
self._client = client_base_class(host=host,
744763
port=port,
745764
username=username,
@@ -757,6 +776,10 @@ def __init__(self,
757776

758777
setattr(self, method, self._make_func(orig_attr))
759778

779+
self._client._get_host = self._get_host
780+
self._client._get_baseurl = self._get_baseurl
781+
self._update_client_host(self.hosts[0])
782+
760783
@staticmethod
761784
def from_DSN(dsn, client_base_class=InfluxDBClient,
762785
shuffle=True, **kwargs):
@@ -791,19 +814,29 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
791814
return cluster_client
792815

793816
def _update_client_host(self, host):
794-
self._client._host, self._client._port = host
795-
self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme,
796-
self._client._host,
797-
self._client._port)
817+
self._thread_local.host, self._thread_local.port = host
818+
self._thread_local.baseurl = "{0}://{1}:{2}".format(
819+
self._client._scheme,
820+
self._client._host,
821+
self._client._port
822+
)
823+
824+
def _get_baseurl(self):
825+
return self._thread_local.baseurl
826+
827+
def _get_host(self):
828+
return self._thread_local.host
798829

799830
def _make_func(self, orig_func):
800831

801832
@wraps(orig_func)
802833
def func(*args, **kwargs):
803-
if self.shuffle:
804-
random.shuffle(self.hosts)
834+
with self._hosts_lock:
835+
if self.shuffle:
836+
random.shuffle(self.hosts)
837+
838+
hosts = self.hosts + self.bad_hosts
805839

806-
hosts = self.hosts + self.bad_hosts
807840
for h in hosts:
808841
bad_host = False
809842
try:
@@ -815,13 +848,15 @@ def func(*args, **kwargs):
815848
except Exception as e:
816849
# Errors that might caused by server failure, try another
817850
bad_host = True
818-
if h in self.hosts:
819-
self.hosts.remove(h)
820-
self.bad_hosts.append(h)
851+
with self._hosts_lock:
852+
if h in self.hosts:
853+
self.hosts.remove(h)
854+
self.bad_hosts.append(h)
821855
finally:
822-
if not bad_host and h in self.bad_hosts:
823-
self.bad_hosts.remove(h)
824-
self.hosts.append(h)
856+
with self._hosts_lock:
857+
if not bad_host and h in self.bad_hosts:
858+
self.bad_hosts.remove(h)
859+
self.hosts.append(h)
825860

826861
raise InfluxDBServerError("InfluxDB: no viable server!")
827862

0 commit comments

Comments
 (0)