Skip to content

Commit 4f25a1d

Browse files
committed
Merge pull request influxdata#253 from georgijd/f-refacor-clusterclient
Refactor InfluxDBClusterClient (Thanks @georgijd !)
2 parents 7e3f7ab + 8d2ec24 commit 4f25a1d

File tree

2 files changed

+144
-151
lines changed

2 files changed

+144
-151
lines changed

influxdb/client.py

Lines changed: 102 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -140,36 +140,10 @@ def from_DSN(dsn, **kwargs):
140140
additional `udp_port` parameter (cf. examples).
141141
"""
142142

143-
init_args = {}
144-
conn_params = urlparse(dsn)
145-
scheme_info = conn_params.scheme.split('+')
146-
if len(scheme_info) == 1:
147-
scheme = scheme_info[0]
148-
modifier = None
149-
else:
150-
modifier, scheme = scheme_info
151-
152-
if scheme != 'influxdb':
153-
raise ValueError('Unknown scheme "{}".'.format(scheme))
154-
if modifier:
155-
if modifier == 'udp':
156-
init_args['use_udp'] = True
157-
elif modifier == 'https':
158-
init_args['ssl'] = True
159-
else:
160-
raise ValueError('Unknown modifier "{}".'.format(modifier))
161-
162-
if conn_params.hostname:
163-
init_args['host'] = conn_params.hostname
164-
if conn_params.port:
165-
init_args['port'] = conn_params.port
166-
if conn_params.username:
167-
init_args['username'] = conn_params.username
168-
if conn_params.password:
169-
init_args['password'] = conn_params.password
170-
if conn_params.path and len(conn_params.path) > 1:
171-
init_args['database'] = conn_params.path[1:]
172-
143+
init_args = parse_dsn(dsn)
144+
host, port = init_args.pop('hosts')[0]
145+
init_args['host'] = host
146+
init_args['port'] = port
173147
init_args.update(kwargs)
174148

175149
return InfluxDBClient(**init_args)
@@ -732,8 +706,8 @@ def send_packet(self, packet):
732706

733707
class InfluxDBClusterClient(object):
734708
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
735-
to a cluster of InfluxDB servers. It's basically a proxy to multiple
736-
InfluxDBClients.
709+
to a cluster of InfluxDB servers. Each query hits different host from the
710+
list of hosts.
737711
738712
:param hosts: all hosts to be included in the cluster, each of which
739713
should be in the format (address, port),
@@ -743,7 +717,7 @@ class InfluxDBClusterClient(object):
743717
:param shuffle: whether the queries should hit servers evenly(randomly),
744718
defaults to True
745719
:type shuffle: bool
746-
:param client_base_class: the base class for all clients in the cluster.
720+
:param client_base_class: the base class for the cluster client.
747721
This parameter is used to enable the support of different client
748722
types. Defaults to :class:`~.InfluxDBClient`
749723
"""
@@ -761,26 +735,27 @@ def __init__(self,
761735
shuffle=True,
762736
client_base_class=InfluxDBClient,
763737
):
764-
self.clients = []
765-
self.bad_clients = [] # Corresponding server has failures in history
738+
self.clients = [self] # Keep it backwards compatible
739+
self.hosts = hosts
740+
self.bad_hosts = [] # Corresponding server has failures in history
766741
self.shuffle = shuffle
767-
for h in hosts:
768-
self.clients.append(client_base_class(host=h[0], port=h[1],
769-
username=username,
770-
password=password,
771-
database=database,
772-
ssl=ssl,
773-
verify_ssl=verify_ssl,
774-
timeout=timeout,
775-
use_udp=use_udp,
776-
udp_port=udp_port))
742+
host, port = self.hosts[0]
743+
self._client = client_base_class(host=host,
744+
port=port,
745+
username=username,
746+
password=password,
747+
database=database,
748+
ssl=ssl,
749+
verify_ssl=verify_ssl,
750+
timeout=timeout,
751+
use_udp=use_udp,
752+
udp_port=udp_port)
777753
for method in dir(client_base_class):
778-
if method.startswith('_'):
779-
continue
780-
orig_func = getattr(client_base_class, method)
781-
if not callable(orig_func):
754+
orig_attr = getattr(client_base_class, method, '')
755+
if method.startswith('_') or not callable(orig_attr):
782756
continue
783-
setattr(self, method, self._make_func(orig_func))
757+
758+
setattr(self, method, self._make_func(orig_attr))
784759

785760
@staticmethod
786761
def from_DSN(dsn, client_base_class=InfluxDBClient,
@@ -803,53 +778,100 @@ def from_DSN(dsn, client_base_class=InfluxDBClient,
803778
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
804779
>> type(cluster)
805780
<class 'influxdb.client.InfluxDBClusterClient'>
806-
>> cluster.clients
807-
[<influxdb.client.InfluxDBClient at 0x7feb480295d0>,
781+
>> cluster.hosts
782+
[('host1', 8086), ('host2', 8086)]
783+
>> cluster._client
808784
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
809785
"""
810-
conn_params = urlparse(dsn)
811-
netlocs = conn_params.netloc.split(',')
812-
cluster_client = InfluxDBClusterClient(
813-
hosts=[],
814-
client_base_class=client_base_class,
815-
shuffle=shuffle,
816-
**kwargs)
817-
for netloc in netlocs:
818-
single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
819-
{'scheme': conn_params.scheme,
820-
'netloc': netloc,
821-
'path': conn_params.path}
822-
)
823-
cluster_client.clients.append(client_base_class.from_DSN(
824-
single_dsn,
825-
**kwargs))
786+
init_args = parse_dsn(dsn)
787+
init_args.update(**kwargs)
788+
init_args['shuffle'] = shuffle
789+
init_args['client_base_class'] = client_base_class
790+
cluster_client = InfluxDBClusterClient(**init_args)
826791
return cluster_client
827792

793+
def _update_client_host(self, host):
794+
self._client._host, self._client._port = host
795+
self._client._baseurl = "{0}://{1}:{2}".format(self._client._scheme,
796+
self._client._host,
797+
self._client._port)
798+
828799
def _make_func(self, orig_func):
829800

830801
@wraps(orig_func)
831802
def func(*args, **kwargs):
832803
if self.shuffle:
833-
random.shuffle(self.clients)
834-
clients = self.clients + self.bad_clients
835-
for c in clients:
836-
bad_client = False
804+
random.shuffle(self.hosts)
805+
806+
hosts = self.hosts + self.bad_hosts
807+
for h in hosts:
808+
bad_host = False
837809
try:
838-
return orig_func(c, *args, **kwargs)
810+
self._update_client_host(h)
811+
return orig_func(self._client, *args, **kwargs)
839812
except InfluxDBClientError as e:
840813
# Errors caused by user's requests, re-raise
841814
raise e
842815
except Exception as e:
843816
# Errors that might caused by server failure, try another
844-
bad_client = True
845-
if c in self.clients:
846-
self.clients.remove(c)
847-
self.bad_clients.append(c)
817+
bad_host = True
818+
if h in self.hosts:
819+
self.hosts.remove(h)
820+
self.bad_hosts.append(h)
848821
finally:
849-
if not bad_client and c in self.bad_clients:
850-
self.bad_clients.remove(c)
851-
self.clients.append(c)
822+
if not bad_host and h in self.bad_hosts:
823+
self.bad_hosts.remove(h)
824+
self.hosts.append(h)
852825

853826
raise InfluxDBServerError("InfluxDB: no viable server!")
854827

855828
return func
829+
830+
831+
def parse_dsn(dsn):
832+
conn_params = urlparse(dsn)
833+
init_args = {}
834+
scheme_info = conn_params.scheme.split('+')
835+
if len(scheme_info) == 1:
836+
scheme = scheme_info[0]
837+
modifier = None
838+
else:
839+
modifier, scheme = scheme_info
840+
841+
if scheme != 'influxdb':
842+
raise ValueError('Unknown scheme "{}".'.format(scheme))
843+
844+
if modifier:
845+
if modifier == 'udp':
846+
init_args['use_udp'] = True
847+
elif modifier == 'https':
848+
init_args['ssl'] = True
849+
else:
850+
raise ValueError('Unknown modifier "{}".'.format(modifier))
851+
852+
netlocs = conn_params.netloc.split(',')
853+
854+
init_args['hosts'] = []
855+
for netloc in netlocs:
856+
parsed = _parse_netloc(netloc)
857+
init_args['hosts'].append((parsed['host'], int(parsed['port'])))
858+
init_args['username'] = parsed['username']
859+
init_args['password'] = parsed['password']
860+
861+
if conn_params.path and len(conn_params.path) > 1:
862+
init_args['database'] = conn_params.path[1:]
863+
864+
return init_args
865+
866+
867+
def _parse_netloc(netloc):
868+
import re
869+
parsed = re.findall(r'(\w*):(\w*)@(\w*):(\d*)', netloc)
870+
if not parsed:
871+
raise ValueError('Invalid netloc "{}".'.format(netloc))
872+
873+
info = parsed[0]
874+
return {'username': info[0] or None,
875+
'password': info[1] or None,
876+
'host': info[2] or 'localhost',
877+
'port': info[3] or 8086}

0 commit comments

Comments
 (0)