Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Added healing feature on InfluxClusterClient #267

Merged
merged 1 commit into from
Nov 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from functools import wraps
import json
import socket
import time
import threading
import random
import requests
Expand Down Expand Up @@ -737,6 +738,9 @@ class InfluxDBClusterClient(object):
:param client_base_class: the base class for the cluster client.
This parameter is used to enable the support of different client
types. Defaults to :class:`~.InfluxDBClient`
:param healing_delay: the delay in seconds, counting from last failure of
a server, before re-adding server to the list of working servers.
Defaults to 15 minutes (900 seconds)
"""

def __init__(self,
Expand All @@ -751,11 +755,14 @@ def __init__(self,
udp_port=4444,
shuffle=True,
client_base_class=InfluxDBClient,
healing_delay=900,
):
self.clients = [self] # Keep it backwards compatible
self.hosts = hosts
self.bad_hosts = [] # Corresponding server has failures in history
self.shuffle = shuffle
self.healing_delay = healing_delay
self._last_healing = time.time()
host, port = self.hosts[0]
self._hosts_lock = threading.Lock()
self._thread_local = threading.local()
Expand Down Expand Up @@ -831,7 +838,14 @@ def _make_func(self, orig_func):

@wraps(orig_func)
def func(*args, **kwargs):
now = time.time()
with self._hosts_lock:
if (self.bad_hosts
and self._last_healing + self.healing_delay < now):
h = self.bad_hosts.pop(0)
self.hosts.append(h)
self._last_healing = now

if self.shuffle:
random.shuffle(self.hosts)

Expand All @@ -852,6 +866,7 @@ def func(*args, **kwargs):
if h in self.hosts:
self.hosts.remove(h)
self.bad_hosts.append(h)
self._last_healing = now
finally:
with self._hosts_lock:
if not bad_host and h in self.bad_hosts:
Expand Down
19 changes: 19 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import requests
import requests.exceptions
import socket
import time
import unittest
import requests_mock
import random
Expand Down Expand Up @@ -804,6 +805,24 @@ def test_recovery(self):
self.assertEqual(1, len(cluster.hosts))
self.assertEqual(2, len(cluster.bad_hosts))

def test_healing(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
healing_delay=1,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual('Success', cluster.query(''))
time.sleep(1.1)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(2, len(cluster.hosts))
self.assertEqual(1, len(cluster.bad_hosts))
time.sleep(1.1)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))

def test_dsn(self):
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)
Expand Down