1
+ import logging
1
2
import pickle
3
+ import time
2
4
3
5
try :
4
6
import redis
7
9
8
10
from .pubsub_manager import PubSubManager
9
11
12
+ logger = logging .getLogger ('socketio' )
13
+
10
14
11
15
class RedisManager (PubSubManager ): # pragma: no cover
12
16
"""Redis based client manager.
@@ -38,8 +42,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
38
42
raise RuntimeError ('Redis package is not installed '
39
43
'(Run "pip install redis" in your '
40
44
'virtualenv).' )
41
- self .redis = redis . Redis . from_url ( url )
42
- self .pubsub = self . redis . pubsub ()
45
+ self .redis_url = url
46
+ self ._redis_connect ()
43
47
super (RedisManager , self ).__init__ (channel = channel ,
44
48
write_only = write_only )
45
49
@@ -58,13 +62,48 @@ def initialize(self):
58
62
'Redis requires a monkey patched socket library to work '
59
63
'with ' + self .server .async_mode )
60
64
65
+ def _redis_connect (self ):
66
+ self .redis = redis .Redis .from_url (self .redis_url )
67
+ self .pubsub = self .redis .pubsub ()
68
+
61
69
def _publish (self , data ):
62
- return self .redis .publish (self .channel , pickle .dumps (data ))
70
+ retry = True
71
+ while True :
72
+ try :
73
+ if not retry :
74
+ self ._redis_connect ()
75
+ return self .redis .publish (self .channel , pickle .dumps (data ))
76
+ except redis .exceptions .ConnectionError :
77
+ if retry :
78
+ logger .error ('Cannot publish to redis... retrying' )
79
+ retry = False
80
+ else :
81
+ logger .error ('Cannot publish to redis... giving up' )
82
+ break
83
+
84
+ def _redis_listen_with_retries (self ):
85
+ retry_sleep = 1
86
+ connect = False
87
+ while True :
88
+ try :
89
+ if connect :
90
+ self ._redis_connect ()
91
+ self .pubsub .subscribe (self .channel )
92
+ for message in self .pubsub .listen ():
93
+ yield message
94
+ except redis .exceptions .ConnectionError :
95
+ logger .error ('Cannot receive from redis... '
96
+ 'retrying in {} secs' .format (retry_sleep ))
97
+ connect = True
98
+ time .sleep (retry_sleep )
99
+ retry_sleep *= 2
100
+ if retry_sleep > 60 :
101
+ retry_sleep = 60
63
102
64
103
def _listen (self ):
65
104
channel = self .channel .encode ('utf-8' )
66
105
self .pubsub .subscribe (self .channel )
67
- for message in self .pubsub . listen ():
106
+ for message in self ._redis_listen_with_retries ():
68
107
if message ['channel' ] == channel and \
69
108
message ['type' ] == 'message' and 'data' in message :
70
109
yield message ['data' ]
0 commit comments