Skip to content

Commit 7dbc470

Browse files
Pass additional options to Redis and Kombu managers (Fixes miguelgrinberg#307)
1 parent f36fa88 commit 7dbc470

File tree

2 files changed

+30
-9
lines changed

2 files changed

+30
-9
lines changed

socketio/kombu_manager.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,21 @@ class KombuManager(PubSubManager): # pragma: no cover
3434
:param write_only: If set ot ``True``, only initialize to emit events. The
3535
default of ``False`` initializes the class for emitting
3636
and receiving.
37+
:param connection_options: additional keyword arguments to be passed to
38+
``kombu.Connection()``.
39+
:param exchange_options: additional keyword arguments to be passed to
40+
``kombu.Exchange()``.
41+
:param queue_options: additional keyword arguments to be passed to
42+
``kombu.Queue()``.
43+
:param producer_options: additional keyword arguments to be passed to
44+
``kombu.Producer()``.
3745
"""
3846
name = 'kombu'
3947

4048
def __init__(self, url='amqp://guest:guest@localhost:5672//',
41-
channel='socketio', write_only=False, logger=None):
49+
channel='socketio', write_only=False, logger=None,
50+
connection_options=None, exchange_options=None,
51+
queue_options=None, producer_options=None):
4252
if kombu is None:
4353
raise RuntimeError('Kombu package is not installed '
4454
'(Run "pip install kombu" in your '
@@ -47,6 +57,10 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
4757
write_only=write_only,
4858
logger=logger)
4959
self.url = url
60+
self.connection_options = connection_options or {}
61+
self.exchange_options = exchange_options or {}
62+
self.queue_options = queue_options or {}
63+
self.producer_options = producer_options or {}
5064
self.producer = self._producer()
5165

5266
def initialize(self):
@@ -65,19 +79,22 @@ def initialize(self):
6579
'with ' + self.server.async_mode)
6680

6781
def _connection(self):
68-
return kombu.Connection(self.url)
82+
return kombu.Connection(self.url, **self.connection_options)
6983

7084
def _exchange(self):
71-
return kombu.Exchange(self.channel, type='fanout', durable=False)
85+
options = {'type': 'fanout', 'durable': False}
86+
options.update(self.exchange_options)
87+
return kombu.Exchange(self.channel, **options)
7288

7389
def _queue(self):
7490
queue_name = 'flask-socketio.' + str(uuid.uuid4())
75-
return kombu.Queue(queue_name, self._exchange(),
76-
durable=False,
77-
queue_arguments={'x-expires': 300000})
91+
options = {'durable': False, 'queue_arguments': {'x-expires': 300000}}
92+
options.update(self.queue_options)
93+
return kombu.Queue(queue_name, self._exchange(), **options)
7894

7995
def _producer(self):
80-
return self._connection().Producer(exchange=self._exchange())
96+
return self._connection().Producer(exchange=self._exchange(),
97+
**self.producer_options)
8198

8299
def __error_callback(self, exception, interval):
83100
self._get_logger().exception('Sleeping {}s'.format(interval))

socketio/redis_manager.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,19 @@ class RedisManager(PubSubManager): # pragma: no cover
3333
:param write_only: If set ot ``True``, only initialize to emit events. The
3434
default of ``False`` initializes the class for emitting
3535
and receiving.
36+
:param redis_options: additional keyword arguments to be passed to
37+
``Redis.from_url()``.
3638
"""
3739
name = 'redis'
3840

3941
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
40-
write_only=False, logger=None):
42+
write_only=False, logger=None, redis_options=None):
4143
if redis is None:
4244
raise RuntimeError('Redis package is not installed '
4345
'(Run "pip install redis" in your '
4446
'virtualenv).')
4547
self.redis_url = url
48+
self.redis_options = redis_options or {}
4649
self._redis_connect()
4750
super(RedisManager, self).__init__(channel=channel,
4851
write_only=write_only,
@@ -64,7 +67,8 @@ def initialize(self):
6467
'with ' + self.server.async_mode)
6568

6669
def _redis_connect(self):
67-
self.redis = redis.Redis.from_url(self.redis_url)
70+
self.redis = redis.Redis.from_url(self.redis_url,
71+
**self.redis_options)
6872
self.pubsub = self.redis.pubsub()
6973

7074
def _publish(self, data):

0 commit comments

Comments
 (0)