Skip to content

Commit af13ef0

Browse files
Recoonect to redis when connection is lost
Fixes miguelgrinberg#143
1 parent 96d4e5f commit af13ef0

File tree

2 files changed

+80
-15
lines changed

2 files changed

+80
-15
lines changed

socketio/asyncio_redis_manager.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
import logging
13
import pickle
24
from urllib.parse import urlparse
35

@@ -8,6 +10,8 @@
810

911
from .asyncio_pubsub_manager import AsyncPubSubManager
1012

13+
logger = logging.getLogger('socketio')
14+
1115

1216
def _parse_redis_url(url):
1317
p = urlparse(url)
@@ -59,17 +63,39 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
5963
super().__init__(channel=channel, write_only=write_only)
6064

6165
async def _publish(self, data):
62-
if self.pub is None:
63-
self.pub = await aioredis.create_redis((self.host, self.port),
64-
db=self.db,
65-
password=self.password)
66-
return await self.pub.publish(self.channel, pickle.dumps(data))
66+
retry = True
67+
while True:
68+
try:
69+
if self.pub is None:
70+
self.pub = await aioredis.create_redis(
71+
(self.host, self.port), db=self.db,
72+
password=self.password)
73+
return await self.pub.publish(self.channel,
74+
pickle.dumps(data))
75+
except (aioredis.RedisError, OSError):
76+
if retry:
77+
logger.error('Cannot publish to redis... retrying')
78+
self.pub = None
79+
retry = False
80+
else:
81+
logger.error('Cannot publish to redis... giving up')
82+
break
6783

6884
async def _listen(self):
69-
if self.sub is None:
70-
self.sub = await aioredis.create_redis((self.host, self.port),
71-
db=self.db,
72-
password=self.password)
73-
self.ch = (await self.sub.subscribe(self.channel))[0]
85+
retry_sleep = 1
7486
while True:
75-
return await self.ch.get()
87+
try:
88+
if self.sub is None:
89+
self.sub = await aioredis.create_redis(
90+
(self.host, self.port), db=self.db,
91+
password=self.password)
92+
self.ch = (await self.sub.subscribe(self.channel))[0]
93+
return await self.ch.get()
94+
except (aioredis.RedisError, OSError):
95+
logger.error('Cannot receive from redis... '
96+
'retrying in {} secs'.format(retry_sleep))
97+
self.sub = None
98+
await asyncio.sleep(retry_sleep)
99+
retry_sleep *= 2
100+
if retry_sleep > 60:
101+
retry_sleep = 60

socketio/redis_manager.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import logging
12
import pickle
3+
import time
24

35
try:
46
import redis
@@ -7,6 +9,8 @@
79

810
from .pubsub_manager import PubSubManager
911

12+
logger = logging.getLogger('socketio')
13+
1014

1115
class RedisManager(PubSubManager): # pragma: no cover
1216
"""Redis based client manager.
@@ -38,8 +42,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
3842
raise RuntimeError('Redis package is not installed '
3943
'(Run "pip install redis" in your '
4044
'virtualenv).')
41-
self.redis = redis.Redis.from_url(url)
42-
self.pubsub = self.redis.pubsub()
45+
self.redis_url = url
46+
self._redis_connect()
4347
super(RedisManager, self).__init__(channel=channel,
4448
write_only=write_only)
4549

@@ -58,13 +62,48 @@ def initialize(self):
5862
'Redis requires a monkey patched socket library to work '
5963
'with ' + self.server.async_mode)
6064

65+
def _redis_connect(self):
66+
self.redis = redis.Redis.from_url(self.redis_url)
67+
self.pubsub = self.redis.pubsub()
68+
6169
def _publish(self, data):
62-
return self.redis.publish(self.channel, pickle.dumps(data))
70+
retry = True
71+
while True:
72+
try:
73+
if not retry:
74+
self._redis_connect()
75+
return self.redis.publish(self.channel, pickle.dumps(data))
76+
except redis.exceptions.ConnectionError:
77+
if retry:
78+
logger.error('Cannot publish to redis... retrying')
79+
retry = False
80+
else:
81+
logger.error('Cannot publish to redis... giving up')
82+
break
83+
84+
def _redis_listen_with_retries(self):
85+
retry_sleep = 1
86+
connect = False
87+
while True:
88+
try:
89+
if connect:
90+
self._redis_connect()
91+
self.pubsub.subscribe(self.channel)
92+
for message in self.pubsub.listen():
93+
yield message
94+
except redis.exceptions.ConnectionError:
95+
logger.error('Cannot receive from redis... '
96+
'retrying in {} secs'.format(retry_sleep))
97+
connect = True
98+
time.sleep(retry_sleep)
99+
retry_sleep *= 2
100+
if retry_sleep > 60:
101+
retry_sleep = 60
63102

64103
def _listen(self):
65104
channel = self.channel.encode('utf-8')
66105
self.pubsub.subscribe(self.channel)
67-
for message in self.pubsub.listen():
106+
for message in self._redis_listen_with_retries():
68107
if message['channel'] == channel and \
69108
message['type'] == 'message' and 'data' in message:
70109
yield message['data']

0 commit comments

Comments
 (0)