Skip to content

Commit e1e6b57

Browse files
author
quantmind
committed
synchronous pusub working
1 parent 8f36e52 commit e1e6b57

File tree

10 files changed

+138
-115
lines changed

10 files changed

+138
-115
lines changed

stdnet/apps/pubsub.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22

33
from inspect import isclass
4-
from collections import deque
54

65
from stdnet import getdb, AsyncObject
76
from stdnet.utils.encoders import Json
@@ -58,19 +57,33 @@ def punsubscribe(self, *channels):
5857

5958
def message_callback(self, command, channel, message=None):
6059
if command == 'subscribe':
61-
self.channels[channel] = deque()
60+
self.channels[channel] = []
6261
elif command == 'unsubscribe':
6362
self.channels.pop(channel, None)
6463
elif channel in self.channels:
65-
self.channels.append(message)
64+
self.channels[channel].append(self.pickler.loads(message))
6665
else:
6766
logger.warn('Got message for unsubscribed channel "%s"' % channel)
6867

69-
def pool(self, timeout=3):
68+
def get_all(self, channel=None):
69+
if channel is None:
70+
channels = {}
71+
for channel in self.channels:
72+
data = self.channels[channel]
73+
if data:
74+
channels[channel] = data
75+
self.channels[channel] = []
76+
return channels
77+
elif channel in self.channels:
78+
data = self.channels[channel]
79+
self.channels[channel] = []
80+
return data
81+
82+
def pool(self):
7083
'''Pull data from subscribed channels.
7184
7285
:param timeout: Pool timeout in seconds'''
73-
return self._subscriber.pool(timeout)
86+
return self._subscriber.pool()
7487

7588
def channel_list(self, channels):
7689
ch = []

stdnet/backends/base.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,9 @@ def basekey(self, meta, *args):
302302
:parameter args: optional list of strings which are attached to the basekey.
303303
:rtype: a native string
304304
"""
305-
key = '{0}{1}'.format(self.namespace,meta.modelkey)
305+
key = '%s%s' % (self.namespace, meta.modelkey)
306306
postfix = ':'.join((str(p) for p in args if p is not None))
307-
return '{0}:{1}'.format(key,postfix) if postfix else key
307+
return '%s:%s' % (key, postfix) if postfix else key
308308

309309
# PURE VIRTUAL METHODS
310310

@@ -338,7 +338,8 @@ def flush(self, meta=None, pattern=None):
338338
raise NotImplementedError()
339339

340340
def publish(self, channel, message):
341-
'''Publish a message to a *channel*'''
341+
'''Publish a *message* to a *channel*. The backend must support pub/sub
342+
paradigm.'''
342343
raise NotImplementedError('This backend cannot publish messages')
343344

344345
def subscriber(self, **kwargs):

stdnet/backends/main.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
}
1818

1919

20-
def parse_backend_uri(backend_uri):
21-
"""Converts the "backend_uri" into the database connection parameters.
20+
def parse_backend(backend):
21+
"""Converts the "backend" into the database connection parameters.
2222
It returns a (scheme, host, params) tuple."""
23-
r = urlparse.urlsplit(backend_uri)
23+
r = urlparse.urlsplit(backend)
2424
scheme, host = r.scheme, r.netloc
2525
if scheme not in ('https', 'http'):
2626
query = r.path
@@ -62,24 +62,24 @@ def get_connection_string(scheme, address, params):
6262
return scheme + '://' + address
6363

6464

65-
def getdb(backend_uri=None, **kwargs):
65+
def getdb(backend=None, **kwargs):
6666
'''get a backend database'''
67-
if isinstance(backend_uri, BackendDataServer):
68-
return backend_uri
69-
backend_uri = backend_uri or settings.DEFAULT_BACKEND
70-
if not backend_uri:
67+
if isinstance(backend, BackendDataServer):
68+
return backend
69+
backend = backend or settings.DEFAULT_BACKEND
70+
if not backend:
7171
return None
72-
scheme, address, params = parse_backend_uri(backend_uri)
72+
scheme, address, params = parse_backend(backend)
7373
params.update(kwargs)
74-
backend_uri = get_connection_string(scheme, address, params)
75-
params['connection_string'] = backend_uri
74+
backend = get_connection_string(scheme, address, params)
75+
params['connection_string'] = backend
7676
return _getdb(scheme, address, params)
7777

7878

79-
def getcache(backend_uri=None, encoder = encoders.PythonPickle, **kwargs):
79+
def getcache(backend=None, encoder = encoders.PythonPickle, **kwargs):
8080
if isclass(encoder):
8181
encoder = encoder()
82-
db = getdb(backend_uri = backend_uri, pickler = encoder, **kwargs)
82+
db = getdb(backend=backend, pickler=encoder, **kwargs)
8383
return db.as_cache()
8484

8585

stdnet/backends/redisb.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,6 @@ class numberarray_pushback(redis.RedisScript):
740740
################################################################################
741741
class BackendDataServer(stdnet.BackendDataServer):
742742
Query = RedisQuery
743-
connection_pools = {}
744743
_redis_clients = {}
745744
struct_map = {'set':Set,
746745
'list':List,
@@ -754,15 +753,10 @@ def setup_connection(self, address):
754753
addr = address.split(':')
755754
if len(addr) == 2:
756755
try:
757-
address = (addr[0],int(addr[1]))
756+
address = (addr[0], int(addr[1]))
758757
except:
759758
pass
760-
cp = redis.ConnectionPool(address, **self.params)
761-
if cp in self.connection_pools:
762-
cp = self.connection_pools[cp]
763-
else:
764-
self.connection_pools[cp] = cp
765-
rpy = redis.Redis(connection_pool=cp)
759+
rpy = redis.Redis(address=address, **self.params)
766760
self.execute_command = rpy.execute_command
767761
self.clear = rpy.flushdb
768762
#self.keys = rpy.keys

stdnet/lib/redis/async.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,16 @@ def _send(self, result=None):
5454
for response in self.read_response():
5555
yield response
5656

57-
57+
@async(max_errors=1)
58+
def pool(self):
59+
if not self.pooling:
60+
self.pooling = True
61+
yield self
62+
while self.pooling and self.connection.sock:
63+
yield self.read_response()
64+
self.pooling = False
65+
66+
5867
class RedisConnection(connection.Connection):
5968
request_class = RedisRequest
6069

@@ -79,5 +88,4 @@ def on_connect(self, request, counter):
7988
return cmnd
8089

8190
def connection_error(self, failure, msg):
82-
raise ConnectionError(msg)
83-
91+
raise ConnectionError(msg)

stdnet/lib/redis/client.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -225,24 +225,9 @@ class Redis(object):
225225

226226
_STATUS = ''
227227

228-
def __init__(self, address = None,
229-
db=0, password=None,
230-
socket_timeout=None,
231-
connection_pool=None,
232-
encoding = 'utf-8',
233-
prefix = '',
234-
**kwargs):
235-
if not connection_pool:
236-
kwargs.update({
237-
'db': db,
238-
'password': password,
239-
'socket_timeout': socket_timeout,
240-
'encoding': encoding
241-
})
242-
connection_pool = ConnectionPool(address, **kwargs)
243-
self.prefix = prefix
228+
def __init__(self, connection_pool=None, **connection_kwargs):
229+
connection_pool = connection_pool or ConnectionPool(**connection_kwargs)
244230
self.connection_pool = connection_pool
245-
self.encoding = self.connection_pool.encoding
246231
self.response_callbacks = self.RESPONSE_CALLBACKS.copy()
247232
self.response_errbacks = self.RESPONSE_ERRBACKS.copy()
248233

@@ -253,6 +238,10 @@ def client(self):
253238
@property
254239
def pipelined(self):
255240
return self.client is not self
241+
242+
@property
243+
def encoding(self):
244+
return self.connection_pool.encoding
256245

257246
def _get_db(self):
258247
return self.connection_pool.db

0 commit comments

Comments
 (0)