6
6
from functools import wraps
7
7
import json
8
8
import socket
9
+ import time
9
10
import threading
10
11
import random
11
12
import requests
@@ -737,6 +738,9 @@ class InfluxDBClusterClient(object):
737
738
:param client_base_class: the base class for the cluster client.
738
739
This parameter is used to enable the support of different client
739
740
types. Defaults to :class:`~.InfluxDBClient`
741
+ :param healing_delay: the delay in seconds, counting from last failure of
742
+ a server, before re-adding server to the list of working servers.
743
+ Defaults to 15 minutes (900 seconds)
740
744
"""
741
745
742
746
def __init__ (self ,
@@ -751,11 +755,14 @@ def __init__(self,
751
755
udp_port = 4444 ,
752
756
shuffle = True ,
753
757
client_base_class = InfluxDBClient ,
758
+ healing_delay = 900 ,
754
759
):
755
760
self .clients = [self ] # Keep it backwards compatible
756
761
self .hosts = hosts
757
762
self .bad_hosts = [] # Corresponding server has failures in history
758
763
self .shuffle = shuffle
764
+ self .healing_delay = healing_delay
765
+ self ._last_healing = time .time ()
759
766
host , port = self .hosts [0 ]
760
767
self ._hosts_lock = threading .Lock ()
761
768
self ._thread_local = threading .local ()
@@ -831,7 +838,14 @@ def _make_func(self, orig_func):
831
838
832
839
@wraps (orig_func )
833
840
def func (* args , ** kwargs ):
841
+ now = time .time ()
834
842
with self ._hosts_lock :
843
+ if (self .bad_hosts
844
+ and self ._last_healing + self .healing_delay < now ):
845
+ h = self .bad_hosts .pop (0 )
846
+ self .hosts .append (h )
847
+ self ._last_healing = now
848
+
835
849
if self .shuffle :
836
850
random .shuffle (self .hosts )
837
851
@@ -852,6 +866,7 @@ def func(*args, **kwargs):
852
866
if h in self .hosts :
853
867
self .hosts .remove (h )
854
868
self .bad_hosts .append (h )
869
+ self ._last_healing = now
855
870
finally :
856
871
with self ._hosts_lock :
857
872
if not bad_host and h in self .bad_hosts :
0 commit comments