@@ -140,36 +140,10 @@ def from_DSN(dsn, **kwargs):
140
140
additional `udp_port` parameter (cf. examples).
141
141
"""
142
142
143
- init_args = {}
144
- conn_params = urlparse (dsn )
145
- scheme_info = conn_params .scheme .split ('+' )
146
- if len (scheme_info ) == 1 :
147
- scheme = scheme_info [0 ]
148
- modifier = None
149
- else :
150
- modifier , scheme = scheme_info
151
-
152
- if scheme != 'influxdb' :
153
- raise ValueError ('Unknown scheme "{}".' .format (scheme ))
154
- if modifier :
155
- if modifier == 'udp' :
156
- init_args ['use_udp' ] = True
157
- elif modifier == 'https' :
158
- init_args ['ssl' ] = True
159
- else :
160
- raise ValueError ('Unknown modifier "{}".' .format (modifier ))
161
-
162
- if conn_params .hostname :
163
- init_args ['host' ] = conn_params .hostname
164
- if conn_params .port :
165
- init_args ['port' ] = conn_params .port
166
- if conn_params .username :
167
- init_args ['username' ] = conn_params .username
168
- if conn_params .password :
169
- init_args ['password' ] = conn_params .password
170
- if conn_params .path and len (conn_params .path ) > 1 :
171
- init_args ['database' ] = conn_params .path [1 :]
172
-
143
+ init_args = parse_dsn (dsn )
144
+ host , port = init_args .pop ('hosts' )[0 ]
145
+ init_args ['host' ] = host
146
+ init_args ['port' ] = port
173
147
init_args .update (kwargs )
174
148
175
149
return InfluxDBClient (** init_args )
@@ -732,8 +706,8 @@ def send_packet(self, packet):
732
706
733
707
class InfluxDBClusterClient (object ):
734
708
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
735
- to a cluster of InfluxDB servers. It's basically a proxy to multiple
736
- InfluxDBClients .
709
+ to a cluster of InfluxDB servers. Each query hits different host from the
710
+ list of hosts .
737
711
738
712
:param hosts: all hosts to be included in the cluster, each of which
739
713
should be in the format (address, port),
@@ -743,7 +717,7 @@ class InfluxDBClusterClient(object):
743
717
:param shuffle: whether the queries should hit servers evenly(randomly),
744
718
defaults to True
745
719
:type shuffle: bool
746
- :param client_base_class: the base class for all clients in the cluster.
720
+ :param client_base_class: the base class for the cluster client .
747
721
This parameter is used to enable the support of different client
748
722
types. Defaults to :class:`~.InfluxDBClient`
749
723
"""
@@ -761,26 +735,27 @@ def __init__(self,
761
735
shuffle = True ,
762
736
client_base_class = InfluxDBClient ,
763
737
):
764
- self .clients = []
765
- self .bad_clients = [] # Corresponding server has failures in history
738
+ self .clients = [self ] # Keep it backwards compatible
739
+ self .hosts = hosts
740
+ self .bad_hosts = [] # Corresponding server has failures in history
766
741
self .shuffle = shuffle
767
- for h in hosts :
768
- self .clients .append (client_base_class (host = h [0 ], port = h [1 ],
769
- username = username ,
770
- password = password ,
771
- database = database ,
772
- ssl = ssl ,
773
- verify_ssl = verify_ssl ,
774
- timeout = timeout ,
775
- use_udp = use_udp ,
776
- udp_port = udp_port ))
742
+ host , port = self .hosts [0 ]
743
+ self ._client = client_base_class (host = host ,
744
+ port = port ,
745
+ username = username ,
746
+ password = password ,
747
+ database = database ,
748
+ ssl = ssl ,
749
+ verify_ssl = verify_ssl ,
750
+ timeout = timeout ,
751
+ use_udp = use_udp ,
752
+ udp_port = udp_port )
777
753
for method in dir (client_base_class ):
778
- if method .startswith ('_' ):
779
- continue
780
- orig_func = getattr (client_base_class , method )
781
- if not callable (orig_func ):
754
+ orig_attr = getattr (client_base_class , method , '' )
755
+ if method .startswith ('_' ) or not callable (orig_attr ):
782
756
continue
783
- setattr (self , method , self ._make_func (orig_func ))
757
+
758
+ setattr (self , method , self ._make_func (orig_attr ))
784
759
785
760
@staticmethod
786
761
def from_DSN (dsn , client_base_class = InfluxDBClient ,
@@ -803,53 +778,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
803
778
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
804
779
>> type(cluster)
805
780
<class 'influxdb.client.InfluxDBClusterClient'>
806
- >> cluster.clients
807
- [<influxdb.client.InfluxDBClient at 0x7feb480295d0>,
781
+ >> cluster.hosts
782
+ [('host1', 8086), ('host2', 8086)]
783
+ >> cluster._client
808
784
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
809
785
"""
810
- conn_params = urlparse (dsn )
811
- netlocs = conn_params .netloc .split (',' )
812
- cluster_client = InfluxDBClusterClient (
813
- hosts = [],
814
- client_base_class = client_base_class ,
815
- shuffle = shuffle ,
816
- ** kwargs )
817
- for netloc in netlocs :
818
- single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
819
- {'scheme' : conn_params .scheme ,
820
- 'netloc' : netloc ,
821
- 'path' : conn_params .path }
822
- )
823
- cluster_client .clients .append (client_base_class .from_DSN (
824
- single_dsn ,
825
- ** kwargs ))
786
+ init_args = parse_dsn (dsn )
787
+ init_args .update (** kwargs )
788
+ init_args ['shuffle' ] = shuffle
789
+ init_args ['client_base_class' ] = client_base_class
790
+ cluster_client = InfluxDBClusterClient (** init_args )
826
791
return cluster_client
827
792
793
+ def _update_client_host (self , host ):
794
+ self ._client ._host , self ._client ._port = host
795
+ self ._client ._baseurl = "{0}://{1}:{2}" .format (self ._client ._scheme ,
796
+ self ._client ._host ,
797
+ self ._client ._port )
798
+
828
799
def _make_func (self , orig_func ):
829
800
830
801
@wraps (orig_func )
831
802
def func (* args , ** kwargs ):
832
803
if self .shuffle :
833
- random .shuffle (self .clients )
834
- clients = self .clients + self .bad_clients
835
- for c in clients :
836
- bad_client = False
804
+ random .shuffle (self .hosts )
805
+
806
+ hosts = self .hosts + self .bad_hosts
807
+ for h in hosts :
808
+ bad_host = False
837
809
try :
838
- return orig_func (c , * args , ** kwargs )
810
+ self ._update_client_host (h )
811
+ return orig_func (self ._client , * args , ** kwargs )
839
812
except InfluxDBClientError as e :
840
813
# Errors caused by user's requests, re-raise
841
814
raise e
842
815
except Exception as e :
843
816
# Errors that might caused by server failure, try another
844
- bad_client = True
845
- if c in self .clients :
846
- self .clients .remove (c )
847
- self .bad_clients .append (c )
817
+ bad_host = True
818
+ if h in self .hosts :
819
+ self .hosts .remove (h )
820
+ self .bad_hosts .append (h )
848
821
finally :
849
- if not bad_client and c in self .bad_clients :
850
- self .bad_clients .remove (c )
851
- self .clients .append (c )
822
+ if not bad_host and h in self .bad_hosts :
823
+ self .bad_hosts .remove (h )
824
+ self .hosts .append (h )
852
825
853
826
raise InfluxDBServerError ("InfluxDB: no viable server!" )
854
827
855
828
return func
829
+
830
+
831
+ def parse_dsn (dsn ):
832
+ conn_params = urlparse (dsn )
833
+ init_args = {}
834
+ scheme_info = conn_params .scheme .split ('+' )
835
+ if len (scheme_info ) == 1 :
836
+ scheme = scheme_info [0 ]
837
+ modifier = None
838
+ else :
839
+ modifier , scheme = scheme_info
840
+
841
+ if scheme != 'influxdb' :
842
+ raise ValueError ('Unknown scheme "{}".' .format (scheme ))
843
+
844
+ if modifier :
845
+ if modifier == 'udp' :
846
+ init_args ['use_udp' ] = True
847
+ elif modifier == 'https' :
848
+ init_args ['ssl' ] = True
849
+ else :
850
+ raise ValueError ('Unknown modifier "{}".' .format (modifier ))
851
+
852
+ netlocs = conn_params .netloc .split (',' )
853
+
854
+ init_args ['hosts' ] = []
855
+ for netloc in netlocs :
856
+ parsed = _parse_netloc (netloc )
857
+ init_args ['hosts' ].append ((parsed ['host' ], int (parsed ['port' ])))
858
+ init_args ['username' ] = parsed ['username' ]
859
+ init_args ['password' ] = parsed ['password' ]
860
+
861
+ if conn_params .path and len (conn_params .path ) > 1 :
862
+ init_args ['database' ] = conn_params .path [1 :]
863
+
864
+ return init_args
865
+
866
+
867
+ def _parse_netloc (netloc ):
868
+ import re
869
+ parsed = re .findall (r'(\w*):(\w*)@(\w*):(\d*)' , netloc )
870
+ if not parsed :
871
+ raise ValueError ('Invalid netloc "{}".' .format (netloc ))
872
+
873
+ info = parsed [0 ]
874
+ return {'username' : info [0 ] or None ,
875
+ 'password' : info [1 ] or None ,
876
+ 'host' : info [2 ] or 'localhost' ,
877
+ 'port' : info [3 ] or 8086 }
0 commit comments