6
6
from functools import wraps
7
7
import json
8
8
import socket
9
+ import threading
9
10
import random
10
11
import requests
11
12
import requests .exceptions
@@ -73,7 +74,7 @@ def __init__(self,
73
74
proxies = None ,
74
75
):
75
76
"""Construct a new InfluxDBClient object."""
76
- self ._host = host
77
+ self .__host = host
77
78
self ._port = port
78
79
self ._username = username
79
80
self ._password = password
@@ -98,7 +99,7 @@ def __init__(self,
98
99
else :
99
100
self ._proxies = proxies
100
101
101
- self ._baseurl = "{0}://{1}:{2}" .format (
102
+ self .__baseurl = "{0}://{1}:{2}" .format (
102
103
self ._scheme ,
103
104
self ._host ,
104
105
self ._port )
@@ -108,6 +109,22 @@ def __init__(self,
108
109
'Accept' : 'text/plain'
109
110
}
110
111
112
+ # _baseurl and _host are properties to allow InfluxDBClusterClient
113
+ # to override them with thread-local variables
114
+ @property
115
+ def _baseurl (self ):
116
+ return self ._get_baseurl ()
117
+
118
+ def _get_baseurl (self ):
119
+ return self .__baseurl
120
+
121
+ @property
122
+ def _host (self ):
123
+ return self ._get_host ()
124
+
125
+ def _get_host (self ):
126
+ return self .__host
127
+
111
128
@staticmethod
112
129
def from_DSN (dsn , ** kwargs ):
113
130
"""Return an instance of :class:`~.InfluxDBClient` from the provided
@@ -740,6 +757,8 @@ def __init__(self,
740
757
self .bad_hosts = [] # Corresponding server has failures in history
741
758
self .shuffle = shuffle
742
759
host , port = self .hosts [0 ]
760
+ self ._hosts_lock = threading .Lock ()
761
+ self ._thread_local = threading .local ()
743
762
self ._client = client_base_class (host = host ,
744
763
port = port ,
745
764
username = username ,
@@ -757,6 +776,10 @@ def __init__(self,
757
776
758
777
setattr (self , method , self ._make_func (orig_attr ))
759
778
779
+ self ._client ._get_host = self ._get_host
780
+ self ._client ._get_baseurl = self ._get_baseurl
781
+ self ._update_client_host (self .hosts [0 ])
782
+
760
783
@staticmethod
761
784
def from_DSN (dsn , client_base_class = InfluxDBClient ,
762
785
shuffle = True , ** kwargs ):
@@ -791,19 +814,29 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
791
814
return cluster_client
792
815
793
816
def _update_client_host (self , host ):
794
- self ._client ._host , self ._client ._port = host
795
- self ._client ._baseurl = "{0}://{1}:{2}" .format (self ._client ._scheme ,
796
- self ._client ._host ,
797
- self ._client ._port )
817
+ self ._thread_local .host , self ._thread_local .port = host
818
+ self ._thread_local .baseurl = "{0}://{1}:{2}" .format (
819
+ self ._client ._scheme ,
820
+ self ._client ._host ,
821
+ self ._client ._port
822
+ )
823
+
824
+ def _get_baseurl (self ):
825
+ return self ._thread_local .baseurl
826
+
827
+ def _get_host (self ):
828
+ return self ._thread_local .host
798
829
799
830
def _make_func (self , orig_func ):
800
831
801
832
@wraps (orig_func )
802
833
def func (* args , ** kwargs ):
803
- if self .shuffle :
804
- random .shuffle (self .hosts )
834
+ with self ._hosts_lock :
835
+ if self .shuffle :
836
+ random .shuffle (self .hosts )
837
+
838
+ hosts = self .hosts + self .bad_hosts
805
839
806
- hosts = self .hosts + self .bad_hosts
807
840
for h in hosts :
808
841
bad_host = False
809
842
try :
@@ -815,13 +848,15 @@ def func(*args, **kwargs):
815
848
except Exception as e :
816
849
# Errors that might caused by server failure, try another
817
850
bad_host = True
818
- if h in self .hosts :
819
- self .hosts .remove (h )
820
- self .bad_hosts .append (h )
851
+ with self ._hosts_lock :
852
+ if h in self .hosts :
853
+ self .hosts .remove (h )
854
+ self .bad_hosts .append (h )
821
855
finally :
822
- if not bad_host and h in self .bad_hosts :
823
- self .bad_hosts .remove (h )
824
- self .hosts .append (h )
856
+ with self ._hosts_lock :
857
+ if not bad_host and h in self .bad_hosts :
858
+ self .bad_hosts .remove (h )
859
+ self .hosts .append (h )
825
860
826
861
raise InfluxDBServerError ("InfluxDB: no viable server!" )
827
862
0 commit comments