@@ -132,36 +132,10 @@ def from_DSN(dsn, **kwargs):
132
132
additional `udp_port` parameter (cf. examples).
133
133
"""
134
134
135
- init_args = {}
136
- conn_params = urlparse (dsn )
137
- scheme_info = conn_params .scheme .split ('+' )
138
- if len (scheme_info ) == 1 :
139
- scheme = scheme_info [0 ]
140
- modifier = None
141
- else :
142
- modifier , scheme = scheme_info
143
-
144
- if scheme != 'influxdb' :
145
- raise ValueError ('Unknown scheme "{}".' .format (scheme ))
146
- if modifier :
147
- if modifier == 'udp' :
148
- init_args ['use_udp' ] = True
149
- elif modifier == 'https' :
150
- init_args ['ssl' ] = True
151
- else :
152
- raise ValueError ('Unknown modifier "{}".' .format (modifier ))
153
-
154
- if conn_params .hostname :
155
- init_args ['host' ] = conn_params .hostname
156
- if conn_params .port :
157
- init_args ['port' ] = conn_params .port
158
- if conn_params .username :
159
- init_args ['username' ] = conn_params .username
160
- if conn_params .password :
161
- init_args ['password' ] = conn_params .password
162
- if conn_params .path and len (conn_params .path ) > 1 :
163
- init_args ['database' ] = conn_params .path [1 :]
164
-
135
+ init_args = parse_dsn (dsn )
136
+ host , port = init_args .pop ('hosts' )[0 ]
137
+ init_args ['host' ] = host
138
+ init_args ['port' ] = port
165
139
init_args .update (kwargs )
166
140
167
141
return InfluxDBClient (** init_args )
@@ -720,8 +694,8 @@ def send_packet(self, packet):
720
694
721
695
class InfluxDBClusterClient (object ):
722
696
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
723
- to a cluster of InfluxDB servers. It's basically a proxy to multiple
724
- InfluxDBClients .
697
+ to a cluster of InfluxDB servers. Each query hits different host from the
698
+ list of hosts .
725
699
726
700
:param hosts: all hosts to be included in the cluster, each of which
727
701
should be in the format (address, port),
@@ -731,7 +705,7 @@ class InfluxDBClusterClient(object):
731
705
:param shuffle: whether the queries should hit servers evenly(randomly),
732
706
defaults to True
733
707
:type shuffle: bool
734
- :param client_base_class: the base class for all clients in the cluster.
708
+ :param client_base_class: the base class for the cluster client .
735
709
This parameter is used to enable the support of different client
736
710
types. Defaults to :class:`~.InfluxDBClient`
737
711
"""
@@ -749,26 +723,27 @@ def __init__(self,
749
723
shuffle = True ,
750
724
client_base_class = InfluxDBClient ,
751
725
):
752
- self .clients = []
753
- self .bad_clients = [] # Corresponding server has failures in history
726
+ self .clients = [self ] # Keep it backwards compatible
727
+ self .hosts = hosts
728
+ self .bad_hosts = [] # Corresponding server has failures in history
754
729
self .shuffle = shuffle
755
- for h in hosts :
756
- self .clients .append (client_base_class (host = h [0 ], port = h [1 ],
757
- username = username ,
758
- password = password ,
759
- database = database ,
760
- ssl = ssl ,
761
- verify_ssl = verify_ssl ,
762
- timeout = timeout ,
763
- use_udp = use_udp ,
764
- udp_port = udp_port ))
730
+ host , port = self .hosts [0 ]
731
+ self ._client = client_base_class (host = host ,
732
+ port = port ,
733
+ username = username ,
734
+ password = password ,
735
+ database = database ,
736
+ ssl = ssl ,
737
+ verify_ssl = verify_ssl ,
738
+ timeout = timeout ,
739
+ use_udp = use_udp ,
740
+ udp_port = udp_port )
765
741
for method in dir (client_base_class ):
766
- if method .startswith ('_' ):
767
- continue
768
- orig_func = getattr (client_base_class , method )
769
- if not callable (orig_func ):
742
+ orig_attr = getattr (client_base_class , method , '' )
743
+ if method .startswith ('_' ) or not callable (orig_attr ):
770
744
continue
771
- setattr (self , method , self ._make_func (orig_func ))
745
+
746
+ setattr (self , method , self ._make_func (orig_attr ))
772
747
773
748
@staticmethod
774
749
def from_DSN (dsn , client_base_class = InfluxDBClient ,
@@ -791,53 +766,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
791
766
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
792
767
>> type(cluster)
793
768
<class 'influxdb.client.InfluxDBClusterClient'>
794
- >> cluster.clients
795
- [<influxdb.client.InfluxDBClient at 0x7feb480295d0>,
769
+ >> cluster.hosts
770
+ [('host1', 8086), ('host2', 8086)]
771
+ >> cluster._client
796
772
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
797
773
"""
798
- conn_params = urlparse (dsn )
799
- netlocs = conn_params .netloc .split (',' )
800
- cluster_client = InfluxDBClusterClient (
801
- hosts = [],
802
- client_base_class = client_base_class ,
803
- shuffle = shuffle ,
804
- ** kwargs )
805
- for netloc in netlocs :
806
- single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
807
- {'scheme' : conn_params .scheme ,
808
- 'netloc' : netloc ,
809
- 'path' : conn_params .path }
810
- )
811
- cluster_client .clients .append (client_base_class .from_DSN (
812
- single_dsn ,
813
- ** kwargs ))
774
+ init_args = parse_dsn (dsn )
775
+ init_args .update (** kwargs )
776
+ init_args ['shuffle' ] = shuffle
777
+ init_args ['client_base_class' ] = client_base_class
778
+ cluster_client = InfluxDBClusterClient (** init_args )
814
779
return cluster_client
815
780
781
+ def _update_client_host (self , host ):
782
+ self ._client ._host , self ._client ._port = host
783
+ self ._client ._baseurl = "{0}://{1}:{2}" .format (self ._client ._scheme ,
784
+ self ._client ._host ,
785
+ self ._client ._port )
786
+
816
787
def _make_func (self , orig_func ):
817
788
818
789
@wraps (orig_func )
819
790
def func (* args , ** kwargs ):
820
791
if self .shuffle :
821
- random .shuffle (self .clients )
822
- clients = self .clients + self .bad_clients
823
- for c in clients :
824
- bad_client = False
792
+ random .shuffle (self .hosts )
793
+
794
+ hosts = self .hosts + self .bad_hosts
795
+ for h in hosts :
796
+ bad_host = False
825
797
try :
826
- return orig_func (c , * args , ** kwargs )
798
+ self ._update_client_host (h )
799
+ return orig_func (self ._client , * args , ** kwargs )
827
800
except InfluxDBClientError as e :
828
801
# Errors caused by user's requests, re-raise
829
802
raise e
830
803
except Exception as e :
831
804
# Errors that might caused by server failure, try another
832
- bad_client = True
833
- if c in self .clients :
834
- self .clients .remove (c )
835
- self .bad_clients .append (c )
805
+ bad_host = True
806
+ if h in self .hosts :
807
+ self .hosts .remove (h )
808
+ self .bad_hosts .append (h )
836
809
finally :
837
- if not bad_client and c in self .bad_clients :
838
- self .bad_clients .remove (c )
839
- self .clients .append (c )
810
+ if not bad_host and h in self .bad_hosts :
811
+ self .bad_hosts .remove (h )
812
+ self .hosts .append (h )
840
813
841
814
raise InfluxDBServerError ("InfluxDB: no viable server!" )
842
815
843
816
return func
817
+
818
+
819
+ def parse_dsn (dsn ):
820
+ conn_params = urlparse (dsn )
821
+ init_args = {}
822
+ scheme_info = conn_params .scheme .split ('+' )
823
+ if len (scheme_info ) == 1 :
824
+ scheme = scheme_info [0 ]
825
+ modifier = None
826
+ else :
827
+ modifier , scheme = scheme_info
828
+
829
+ if scheme != 'influxdb' :
830
+ raise ValueError ('Unknown scheme "{}".' .format (scheme ))
831
+
832
+ if modifier :
833
+ if modifier == 'udp' :
834
+ init_args ['use_udp' ] = True
835
+ elif modifier == 'https' :
836
+ init_args ['ssl' ] = True
837
+ else :
838
+ raise ValueError ('Unknown modifier "{}".' .format (modifier ))
839
+
840
+ netlocs = conn_params .netloc .split (',' )
841
+
842
+ init_args ['hosts' ] = []
843
+ for netloc in netlocs :
844
+ parsed = _parse_netloc (netloc )
845
+ init_args ['hosts' ].append ((parsed ['host' ], int (parsed ['port' ])))
846
+ init_args ['username' ] = parsed ['username' ]
847
+ init_args ['password' ] = parsed ['password' ]
848
+
849
+ if conn_params .path and len (conn_params .path ) > 1 :
850
+ init_args ['database' ] = conn_params .path [1 :]
851
+
852
+ return init_args
853
+
854
+
855
+ def _parse_netloc (netloc ):
856
+ import re
857
+ parsed = re .findall (r'(\w*):(\w*)@(\w*):(\d*)' , netloc )
858
+ if not parsed :
859
+ raise ValueError ('Invalid netloc "{}".' .format (netloc ))
860
+
861
+ info = parsed [0 ]
862
+ return {'username' : info [0 ] or None ,
863
+ 'password' : info [1 ] or None ,
864
+ 'host' : info [2 ] or 'localhost' ,
865
+ 'port' : info [3 ] or 8086 }
0 commit comments