@@ -34,11 +34,21 @@ class KombuManager(PubSubManager): # pragma: no cover
34
34
:param write_only: If set ot ``True``, only initialize to emit events. The
35
35
default of ``False`` initializes the class for emitting
36
36
and receiving.
37
+ :param connection_options: additional keyword arguments to be passed to
38
+ ``kombu.Connection()``.
39
+ :param exchange_options: additional keyword arguments to be passed to
40
+ ``kombu.Exchange()``.
41
+ :param queue_options: additional keyword arguments to be passed to
42
+ ``kombu.Queue()``.
43
+ :param producer_options: additional keyword arguments to be passed to
44
+ ``kombu.Producer()``.
37
45
"""
38
46
name = 'kombu'
39
47
40
48
def __init__ (self , url = 'amqp://guest:guest@localhost:5672//' ,
41
- channel = 'socketio' , write_only = False , logger = None ):
49
+ channel = 'socketio' , write_only = False , logger = None ,
50
+ connection_options = None , exchange_options = None ,
51
+ queue_options = None , producer_options = None ):
42
52
if kombu is None :
43
53
raise RuntimeError ('Kombu package is not installed '
44
54
'(Run "pip install kombu" in your '
@@ -47,6 +57,10 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
47
57
write_only = write_only ,
48
58
logger = logger )
49
59
self .url = url
60
+ self .connection_options = connection_options or {}
61
+ self .exchange_options = exchange_options or {}
62
+ self .queue_options = queue_options or {}
63
+ self .producer_options = producer_options or {}
50
64
self .producer = self ._producer ()
51
65
52
66
def initialize (self ):
@@ -65,19 +79,22 @@ def initialize(self):
65
79
'with ' + self .server .async_mode )
66
80
67
81
def _connection (self ):
68
- return kombu .Connection (self .url )
82
+ return kombu .Connection (self .url , ** self . connection_options )
69
83
70
84
def _exchange (self ):
71
- return kombu .Exchange (self .channel , type = 'fanout' , durable = False )
85
+ options = {'type' : 'fanout' , 'durable' : False }
86
+ options .update (self .exchange_options )
87
+ return kombu .Exchange (self .channel , ** options )
72
88
73
89
def _queue (self ):
74
90
queue_name = 'flask-socketio.' + str (uuid .uuid4 ())
75
- return kombu . Queue ( queue_name , self . _exchange (),
76
- durable = False ,
77
- queue_arguments = { 'x-expires' : 300000 } )
91
+ options = { 'durable' : False , 'queue_arguments' : { 'x-expires' : 300000 }}
92
+ options . update ( self . queue_options )
93
+ return kombu . Queue ( queue_name , self . _exchange (), ** options )
78
94
79
95
def _producer (self ):
80
- return self ._connection ().Producer (exchange = self ._exchange ())
96
+ return self ._connection ().Producer (exchange = self ._exchange (),
97
+ ** self .producer_options )
81
98
82
99
def __error_callback (self , exception , interval ):
83
100
self ._get_logger ().exception ('Sleeping {}s' .format (interval ))
0 commit comments