Skip to content

Commit e25576f

Browse files
committed
Refactor in InfluxDBClusterClient.
Use a single client and maintain a list of hosts instead of list of clients. Also, to avoid code duplication, parsing DSNs is now moved to a separate function that is called from both InfluxDBClient.fromDSN and InfluxDBClusterClient.fromDSN classmethods
1 parent 579748d commit e25576f

File tree

2 files changed

+138
-146
lines changed

2 files changed

+138
-146
lines changed

influxdb/client.py

Lines changed: 102 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -132,36 +132,10 @@ def from_DSN(dsn, **kwargs):
132132
additional `udp_port` parameter (cf. examples).
133133
"""
134134

135-
init_args = {}
136-
conn_params = urlparse(dsn)
137-
scheme_info = conn_params.scheme.split('+')
138-
if len(scheme_info) == 1:
139-
scheme = scheme_info[0]
140-
modifier = None
141-
else:
142-
modifier, scheme = scheme_info
143-
144-
if scheme != 'influxdb':
145-
raise ValueError('Unknown scheme "{}".'.format(scheme))
146-
if modifier:
147-
if modifier == 'udp':
148-
init_args['use_udp'] = True
149-
elif modifier == 'https':
150-
init_args['ssl'] = True
151-
else:
152-
raise ValueError('Unknown modifier "{}".'.format(modifier))
153-
154-
if conn_params.hostname:
155-
init_args['host'] = conn_params.hostname
156-
if conn_params.port:
157-
init_args['port'] = conn_params.port
158-
if conn_params.username:
159-
init_args['username'] = conn_params.username
160-
if conn_params.password:
161-
init_args['password'] = conn_params.password
162-
if conn_params.path and len(conn_params.path) > 1:
163-
init_args['database'] = conn_params.path[1:]
164-
135+
init_args = parse_dsn(dsn)
136+
host, port = init_args.pop('hosts')[0]
137+
init_args['host'] = host
138+
init_args['port'] = port
165139
init_args.update(kwargs)
166140

167141
return InfluxDBClient(**init_args)
@@ -720,8 +694,8 @@ def send_packet(self, packet):
720694

721695
class InfluxDBClusterClient(object):
722696
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
723-
to a cluster of InfluxDB servers. It's basically a proxy to multiple
724-
InfluxDBClients.
697+
to a cluster of InfluxDB servers. Each query hits different host from the
698+
list of hosts.
725699
726700
:param hosts: all hosts to be included in the cluster, each of which
727701
should be in the format (address, port),
@@ -731,7 +705,7 @@ class InfluxDBClusterClient(object):
731705
:param shuffle: whether the queries should hit servers evenly(randomly),
732706
defaults to True
733707
:type shuffle: bool
734-
:param client_base_class: the base class for all clients in the cluster.
708+
:param client_base_class: the base class for the cluster client.
735709
This parameter is used to enable the support of different client
736710
types. Defaults to :class:`~.InfluxDBClient`
737711
"""
@@ -749,26 +723,27 @@ def __init__(self,
749723
shuffle=True,
750724
client_base_class=InfluxDBClient,
751725
):
752-
self.clients = []
753-
self.bad_clients = [] # Corresponding server has failures in history
726+
self.clients = [self] # Keep it backwards compatible
727+
self.hosts = hosts
728+
self.bad_hosts = [] # Corresponding server has failures in history
754729
self.shuffle = shuffle
755-
for h in hosts:
756-
self.clients.append(client_base_class(host=h[0], port=h[1],
757-
username=username,
758-
password=password,
759-
database=database,
760-
ssl=ssl,
761-
verify_ssl=verify_ssl,
762-
timeout=timeout,
763-
use_udp=use_udp,
764-
udp_port=udp_port))
730+
host, port = self.hosts[0]
731+
self._client = client_base_class(host=host,
732+
port=port,
733+
username=username,
734+
password=password,
735+
database=database,
736+
ssl=ssl,
737+
verify_ssl=verify_ssl,
738+
timeout=timeout,
739+
use_udp=use_udp,
740+
udp_port=udp_port)
765741
for method in dir(client_base_class):
766-
if method.startswith('_'):
767-
continue
768-
orig_func = getattr(client_base_class, method)
769-
if not callable(orig_func):
742+
orig_attr = getattr(client_base_class, method, '')
743+
if method.startswith('_') or not callable(orig_attr):
770744
continue
771-
setattr(self, method, self._make_func(orig_func))
745+
746+
setattr(self, method, self._make_func(orig_attr))
772747

773748
@staticmethod
774749
def from_DSN(dsn, client_base_class=InfluxDBClient,
@@ -791,53 +766,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
791766
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
792767
>> type(cluster)
793768
<class 'influxdb.client.InfluxDBClusterClient'>
794-
>> cluster.clients
795-
[<influxdb.client.InfluxDBClient at 0x7feb480295d0>,
769+
>> cluster.hosts
770+
[('host1', 8086), ('host2', 8086)]
771+
>> cluster._client
796772
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
797773
"""
798-
conn_params = urlparse(dsn)
799-
netlocs = conn_params.netloc.split(',')
800-
cluster_client = InfluxDBClusterClient(
801-
hosts=[],
802-
client_base_class=client_base_class,
803-
shuffle=shuffle,
804-
**kwargs)
805-
for netloc in netlocs:
806-
single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
807-
{'scheme': conn_params.scheme,
808-
'netloc': netloc,
809-
'path': conn_params.path}
810-
)
811-
cluster_client.clients.append(client_base_class.from_DSN(
812-
single_dsn,
813-
**kwargs))
774+
init_args = parse_dsn(dsn)
775+
init_args.update(**kwargs)
776+
init_args['shuffle'] = shuffle
777+
init_args['client_base_class'] = client_base_class
778+
cluster_client = InfluxDBClusterClient(**init_args)
814779
return cluster_client
815780

781+
def _update_client_host(self, host):
782+
self._client._host, self._client._port = host
783+
self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme,
784+
self._client._host,
785+
self._client._port)
786+
816787
def _make_func(self, orig_func):
817788

818789
@wraps(orig_func)
819790
def func(*args, **kwargs):
820791
if self.shuffle:
821-
random.shuffle(self.clients)
822-
clients = self.clients + self.bad_clients
823-
for c in clients:
824-
bad_client = False
792+
random.shuffle(self.hosts)
793+
794+
hosts = self.hosts + self.bad_hosts
795+
for h in hosts:
796+
bad_host = False
825797
try:
826-
return orig_func(c, *args, **kwargs)
798+
self._update_client_host(h)
799+
return orig_func(self._client, *args, **kwargs)
827800
except InfluxDBClientError as e:
828801
# Errors caused by user's requests, re-raise
829802
raise e
830803
except Exception as e:
831804
# Errors that might caused by server failure, try another
832-
bad_client = True
833-
if c in self.clients:
834-
self.clients.remove(c)
835-
self.bad_clients.append(c)
805+
bad_host = True
806+
if h in self.hosts:
807+
self.hosts.remove(h)
808+
self.bad_hosts.append(h)
836809
finally:
837-
if not bad_client and c in self.bad_clients:
838-
self.bad_clients.remove(c)
839-
self.clients.append(c)
810+
if not bad_host and h in self.bad_hosts:
811+
self.bad_hosts.remove(h)
812+
self.hosts.append(h)
840813

841814
raise InfluxDBServerError("InfluxDB: no viable server!")
842815

843816
return func
817+
818+
819+
def parse_dsn(dsn):
820+
conn_params = urlparse(dsn)
821+
init_args = {}
822+
scheme_info = conn_params.scheme.split('+')
823+
if len(scheme_info) == 1:
824+
scheme = scheme_info[0]
825+
modifier = None
826+
else:
827+
modifier, scheme = scheme_info
828+
829+
if scheme != 'influxdb':
830+
raise ValueError('Unknown scheme "{}".'.format(scheme))
831+
832+
if modifier:
833+
if modifier == 'udp':
834+
init_args['use_udp'] = True
835+
elif modifier == 'https':
836+
init_args['ssl'] = True
837+
else:
838+
raise ValueError('Unknown modifier "{}".'.format(modifier))
839+
840+
netlocs = conn_params.netloc.split(',')
841+
842+
init_args['hosts'] = []
843+
for netloc in netlocs:
844+
parsed = _parse_netloc(netloc)
845+
init_args['hosts'].append((parsed['host'], int(parsed['port'])))
846+
init_args['username'] = parsed['username']
847+
init_args['password'] = parsed['password']
848+
849+
if conn_params.path and len(conn_params.path) > 1:
850+
init_args['database'] = conn_params.path[1:]
851+
852+
return init_args
853+
854+
855+
def _parse_netloc(netloc):
856+
import re
857+
parsed = re.findall(r'(\w*):(\w*)@(\w*):(\d*)', netloc)
858+
if not parsed:
859+
raise ValueError('Invalid netloc "{}".'.format(netloc))
860+
861+
info = parsed[0]
862+
return {'username': info[0] or None,
863+
'password': info[1] or None,
864+
'host': info[2] or 'localhost',
865+
'port': info[3] or 8086}

0 commit comments

Comments
 (0)