Skip to content

Commit dbff74e

Browse files
committed
Merge pull request influxdata#267 from PierreF/cluster-healing
Added healing feature on InfluxClusterClient (Thanks @PierreF !)
2 parents 996d009 + 9f893b5 commit dbff74e

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

influxdb/client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from functools import wraps
77
import json
88
import socket
9+
import time
910
import threading
1011
import random
1112
import requests
@@ -737,6 +738,9 @@ class InfluxDBClusterClient(object):
737738
:param client_base_class: the base class for the cluster client.
738739
This parameter is used to enable the support of different client
739740
types. Defaults to :class:`~.InfluxDBClient`
741+
:param healing_delay: the delay in seconds, counting from last failure of
742+
a server, before re-adding server to the list of working servers.
743+
Defaults to 15 minutes (900 seconds)
740744
"""
741745

742746
def __init__(self,
@@ -751,11 +755,14 @@ def __init__(self,
751755
udp_port=4444,
752756
shuffle=True,
753757
client_base_class=InfluxDBClient,
758+
healing_delay=900,
754759
):
755760
self.clients = [self] # Keep it backwards compatible
756761
self.hosts = hosts
757762
self.bad_hosts = [] # Corresponding server has failures in history
758763
self.shuffle = shuffle
764+
self.healing_delay = healing_delay
765+
self._last_healing = time.time()
759766
host, port = self.hosts[0]
760767
self._hosts_lock = threading.Lock()
761768
self._thread_local = threading.local()
@@ -831,7 +838,14 @@ def _make_func(self, orig_func):
831838

832839
@wraps(orig_func)
833840
def func(*args, **kwargs):
841+
now = time.time()
834842
with self._hosts_lock:
843+
if (self.bad_hosts
844+
and self._last_healing + self.healing_delay < now):
845+
h = self.bad_hosts.pop(0)
846+
self.hosts.append(h)
847+
self._last_healing = now
848+
835849
if self.shuffle:
836850
random.shuffle(self.hosts)
837851

@@ -852,6 +866,7 @@ def func(*args, **kwargs):
852866
if h in self.hosts:
853867
self.hosts.remove(h)
854868
self.bad_hosts.append(h)
869+
self._last_healing = now
855870
finally:
856871
with self._hosts_lock:
857872
if not bad_host and h in self.bad_hosts:

influxdb/tests/client_test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import requests
1919
import requests.exceptions
2020
import socket
21+
import time
2122
import unittest
2223
import requests_mock
2324
import random
@@ -804,6 +805,24 @@ def test_recovery(self):
804805
self.assertEqual(1, len(cluster.hosts))
805806
self.assertEqual(2, len(cluster.bad_hosts))
806807

808+
def test_healing(self):
809+
cluster = InfluxDBClusterClient(hosts=self.hosts,
810+
database='database',
811+
shuffle=True,
812+
healing_delay=1,
813+
client_base_class=FakeClient)
814+
with self.assertRaises(InfluxDBServerError):
815+
cluster.query('Fail')
816+
self.assertEqual('Success', cluster.query(''))
817+
time.sleep(1.1)
818+
self.assertEqual('Success', cluster.query(''))
819+
self.assertEqual(2, len(cluster.hosts))
820+
self.assertEqual(1, len(cluster.bad_hosts))
821+
time.sleep(1.1)
822+
self.assertEqual('Success', cluster.query(''))
823+
self.assertEqual(3, len(cluster.hosts))
824+
self.assertEqual(0, len(cluster.bad_hosts))
825+
807826
def test_dsn(self):
808827
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
809828
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)

0 commit comments

Comments
 (0)