diff --git a/influxdb/client.py b/influxdb/client.py index d16c4937..b8bda92d 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -6,6 +6,7 @@ from functools import wraps import json import socket +import threading import random import requests import requests.exceptions @@ -73,7 +74,7 @@ def __init__(self, proxies=None, ): """Construct a new InfluxDBClient object.""" - self._host = host + self.__host = host self._port = port self._username = username self._password = password @@ -98,7 +99,7 @@ def __init__(self, else: self._proxies = proxies - self._baseurl = "{0}://{1}:{2}".format( + self.__baseurl = "{0}://{1}:{2}".format( self._scheme, self._host, self._port) @@ -108,6 +109,22 @@ def __init__(self, 'Accept': 'text/plain' } + # _baseurl and _host are properties to allow InfluxDBClusterClient + # to override them with thread-local variables + @property + def _baseurl(self): + return self._get_baseurl() + + def _get_baseurl(self): + return self.__baseurl + + @property + def _host(self): + return self._get_host() + + def _get_host(self): + return self.__host + @staticmethod def from_DSN(dsn, **kwargs): """Return an instance of :class:`~.InfluxDBClient` from the provided @@ -740,6 +757,8 @@ def __init__(self, self.bad_hosts = [] # Corresponding server has failures in history self.shuffle = shuffle host, port = self.hosts[0] + self._hosts_lock = threading.Lock() + self._thread_local = threading.local() self._client = client_base_class(host=host, port=port, username=username, @@ -757,6 +776,10 @@ def __init__(self, setattr(self, method, self._make_func(orig_attr)) + self._client._get_host = self._get_host + self._client._get_baseurl = self._get_baseurl + self._update_client_host(self.hosts[0]) + @staticmethod def from_DSN(dsn, client_base_class=InfluxDBClient, shuffle=True, **kwargs): @@ -791,19 +814,29 @@ def from_DSN(dsn, client_base_class=InfluxDBClient, return cluster_client def _update_client_host(self, host): - self._client._host, self._client._port = host - self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme, - self._client._host, - self._client._port) + self._thread_local.host, self._thread_local.port = host + self._thread_local.baseurl = "{0}://{1}:{2}".format( + self._client._scheme, + self._client._host, + self._client._port + ) + + def _get_baseurl(self): + return self._thread_local.baseurl + + def _get_host(self): + return self._thread_local.host def _make_func(self, orig_func): @wraps(orig_func) def func(*args, **kwargs): - if self.shuffle: - random.shuffle(self.hosts) + with self._hosts_lock: + if self.shuffle: + random.shuffle(self.hosts) + + hosts = self.hosts + self.bad_hosts - hosts = self.hosts + self.bad_hosts for h in hosts: bad_host = False try: @@ -815,13 +848,15 @@ def func(*args, **kwargs): except Exception as e: # Errors that might caused by server failure, try another bad_host = True - if h in self.hosts: - self.hosts.remove(h) - self.bad_hosts.append(h) + with self._hosts_lock: + if h in self.hosts: + self.hosts.remove(h) + self.bad_hosts.append(h) finally: - if not bad_host and h in self.bad_hosts: - self.bad_hosts.remove(h) - self.hosts.append(h) + with self._hosts_lock: + if not bad_host and h in self.bad_hosts: + self.bad_hosts.remove(h) + self.hosts.append(h) raise InfluxDBServerError("InfluxDB: no viable server!")