diff --git a/influxdb/client.py b/influxdb/client.py index 83d04697..9f762fa8 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -6,6 +6,7 @@ from functools import wraps import json import socket +import time import threading import random import requests @@ -737,6 +738,9 @@ class InfluxDBClusterClient(object): :param client_base_class: the base class for the cluster client. This parameter is used to enable the support of different client types. Defaults to :class:`~.InfluxDBClient` + :param healing_delay: the delay in seconds, counting from last failure of + a server, before re-adding server to the list of working servers. + Defaults to 15 minutes (900 seconds) """ def __init__(self, @@ -751,11 +755,14 @@ def __init__(self, udp_port=4444, shuffle=True, client_base_class=InfluxDBClient, + healing_delay=900, ): self.clients = [self] # Keep it backwards compatible self.hosts = hosts self.bad_hosts = [] # Corresponding server has failures in history self.shuffle = shuffle + self.healing_delay = healing_delay + self._last_healing = time.time() host, port = self.hosts[0] self._hosts_lock = threading.Lock() self._thread_local = threading.local() @@ -831,7 +838,14 @@ def _make_func(self, orig_func): @wraps(orig_func) def func(*args, **kwargs): + now = time.time() with self._hosts_lock: + if (self.bad_hosts + and self._last_healing + self.healing_delay < now): + h = self.bad_hosts.pop(0) + self.hosts.append(h) + self._last_healing = now + if self.shuffle: random.shuffle(self.hosts) @@ -852,6 +866,7 @@ def func(*args, **kwargs): if h in self.hosts: self.hosts.remove(h) self.bad_hosts.append(h) + self._last_healing = now finally: with self._hosts_lock: if not bad_host and h in self.bad_hosts: diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 8df4bd64..621f3436 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -18,6 +18,7 @@ import requests import requests.exceptions import socket +import time import unittest import requests_mock import random @@ -804,6 +805,24 @@ def test_recovery(self): self.assertEqual(1, len(cluster.hosts)) self.assertEqual(2, len(cluster.bad_hosts)) + def test_healing(self): + cluster = InfluxDBClusterClient(hosts=self.hosts, + database='database', + shuffle=True, + healing_delay=1, + client_base_class=FakeClient) + with self.assertRaises(InfluxDBServerError): + cluster.query('Fail') + self.assertEqual('Success', cluster.query('')) + time.sleep(1.1) + self.assertEqual('Success', cluster.query('')) + self.assertEqual(2, len(cluster.hosts)) + self.assertEqual(1, len(cluster.bad_hosts)) + time.sleep(1.1) + self.assertEqual('Success', cluster.query('')) + self.assertEqual(3, len(cluster.hosts)) + self.assertEqual(0, len(cluster.bad_hosts)) + def test_dsn(self): cli = InfluxDBClusterClient.from_DSN(self.dsn_string) self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)