Skip to content

Commit 831fe06

Browse files
committed
12.11小节完成
1 parent b5d9d93 commit 831fe06

File tree

1 file changed

+138
-136
lines changed

1 file changed

+138
-136
lines changed

source/c12/p11_implement publish_subscribe_messaging.rst

+138-136
Original file line numberDiff line numberDiff line change
@@ -5,175 +5,177 @@
55
----------
66
问题
77
----------
8-
You have a program based on communicating threads and want them to implement
9-
publish/subscribe messaging.
8+
你有一个基于线程通信的程序,想让它们实现发布/订阅模式的消息通信。
109

1110
|
1211
1312
----------
1413
解决方案
1514
----------
16-
To implement publish/subscribe messaging, you typically introduce a separate “ex‐
17-
change” or “gateway” object that acts as an intermediary for all messages. That is, instead
18-
of directly sending a message from one task to another, a message is sent to the exchange
19-
and it delivers it to one or more attached tasks. Here is one example of a very simple
20-
exchange implementation:
15+
要实现发布/订阅的消息通信模式,
16+
你通常要引入一个单独的“交换机”或“网关”对象作为所有消息的中介。
17+
也就是说,不直接将消息从一个任务发送到另一个,而是将其发送给交换机,
18+
然后由交换机将它发送给一个或多个被关联任务。下面是一个非常简单的交换机实现例子:
2119

22-
from collections import defaultdict
20+
.. code-block:: python
2321
24-
class Exchange:
25-
def __init__(self):
26-
self._subscribers = set()
22+
from collections import defaultdict
2723
28-
def attach(self, task):
29-
self._subscribers.add(task)
24+
class Exchange:
25+
def __init__(self):
26+
self._subscribers = set()
3027
31-
def detach(self, task):
32-
self._subscribers.remove(task)
28+
def attach(self, task):
29+
self._subscribers.add(task)
3330
34-
def send(self, msg):
35-
for subscriber in self._subscribers:
36-
subscriber.send(msg)
31+
def detach(self, task):
32+
self._subscribers.remove(task)
3733
38-
# Dictionary of all created exchanges
39-
_exchanges = defaultdict(Exchange)
34+
def send(self, msg):
35+
for subscriber in self._subscribers:
36+
subscriber.send(msg)
4037
41-
# Return the Exchange instance associated with a given name
42-
def get_exchange(name):
43-
return _exchanges[name]
38+
# Dictionary of all created exchanges
39+
_exchanges = defaultdict(Exchange)
4440
45-
An exchange is really nothing more than an object that keeps a set of active subscribers
46-
and provides methods for attaching, detaching, and sending messages. Each exchange
47-
is identified by a name, and the get_exchange() function simply returns the Ex
48-
change instance associated with a given name.
49-
Here is a simple example that shows how to use an exchange:
41+
# Return the Exchange instance associated with a given name
42+
def get_exchange(name):
43+
return _exchanges[name]
5044
51-
# Example of a task. Any object with a send() method
45+
一个交换机就是一个普通对象,负责维护一个活跃的订阅者集合,并为绑定、解绑和发送消息提供相应的方法。
46+
每个交换机通过一个名称定位,``get_exchange()`` 通过给定一个名称返回相应的 ``Exchange`` 实例。
5247

53-
class Task:
54-
...
55-
def send(self, msg):
48+
下面是一个简单例子,演示了如何使用一个交换机:
49+
50+
.. code-block:: python
51+
52+
# Example of a task. Any object with a send() method
53+
54+
class Task:
5655
...
56+
def send(self, msg):
57+
...
5758
58-
task_a = Task()
59-
task_b = Task()
59+
task_a = Task()
60+
task_b = Task()
6061
61-
# Example of getting an exchange
62-
exc = get_exchange('name')
62+
# Example of getting an exchange
63+
exc = get_exchange('name')
6364
64-
# Examples of subscribing tasks to it
65-
exc.attach(task_a)
66-
exc.attach(task_b)
65+
# Examples of subscribing tasks to it
66+
exc.attach(task_a)
67+
exc.attach(task_b)
6768
68-
# Example of sending messages
69-
exc.send('msg1')
70-
exc.send('msg2')
69+
# Example of sending messages
70+
exc.send('msg1')
71+
exc.send('msg2')
7172
72-
# Example of unsubscribing
73-
exc.detach(task_a)
74-
exc.detach(task_b)
73+
# Example of unsubscribing
74+
exc.detach(task_a)
75+
exc.detach(task_b)
7576
76-
Although there are many different variations on this theme, the overall idea is the same.
77-
Messages will be delivered to an exchange and the exchange will deliver them to attached
78-
subscribers.
77+
尽管对于这个问题有很多的变种,不过万变不离其宗。
78+
消息会被发送给一个交换机,然后交换机会将它们发送给被绑定的订阅者。
7979

8080
|
8181
8282
----------
8383
讨论
8484
----------
85-
The concept of tasks or threads sending messages to one another (often via queues) is
86-
easy to implement and quite popular. However, the benefits of using a public/subscribe
87-
(pub/sub) model instead are often overlooked.
88-
First, the use of an exchange can simplify much of the plumbing involved in setting up
89-
communicating threads. Instead of trying to wire threads together across multiple pro‐
90-
gram modules, you only worry about connecting them to a known exchange. In some
91-
sense, this is similar to how the logging library works. In practice, it can make it easier
92-
to decouple various tasks in the program.
93-
Second, the ability of the exchange to broadcast messages to multiple subscribers opens
94-
up new communication patterns. For example, you could implement systems with re‐
95-
dundant tasks, broadcasting, or fan-out. You could also build debugging and diagnostic
96-
tools that attach themselves to exchanges as ordinary subscribers. For example, here is
97-
a simple diagnostic class that would display sent messages:
98-
99-
class DisplayMessages:
100-
def __init__(self):
101-
self.count = 0
102-
def send(self, msg):
103-
self.count += 1
104-
print('msg[{}]: {!r}'.format(self.count, msg))
105-
106-
exc = get_exchange('name')
107-
d = DisplayMessages()
108-
exc.attach(d)
109-
110-
Last, but not least, a notable aspect of the implementation is that it works with a variety
111-
of task-like objects. For example, the receivers of a message could be actors (as described
112-
in Recipe 12.10), coroutines, network connections, or just about anything that imple‐
113-
ments a proper send() method.
114-
One potentially problematic aspect of an exchange concerns the proper attachment and
115-
detachment of subscribers. In order to properly manage resources, every subscriber that
116-
attaches must eventually detach. This leads to a programming model similar to this:
117-
118-
exc = get_exchange('name')
119-
exc.attach(some_task)
120-
try:
121-
...
122-
finally:
123-
exc.detach(some_task)
124-
125-
In some sense, this is similar to the usage of files, locks, and similar objects. Experience
126-
has shown that it is quite easy to forget the final detach() step. To simplify this, you
127-
might consider the use of the context-management protocol. For example, adding a
128-
subscribe() method to the exchange like this:
129-
130-
from contextlib import contextmanager
131-
from collections import defaultdict
132-
133-
class Exchange:
134-
def __init__(self):
135-
self._subscribers = set()
136-
137-
def attach(self, task):
138-
self._subscribers.add(task)
139-
140-
def detach(self, task):
141-
self._subscribers.remove(task)
142-
143-
@contextmanager
144-
def subscribe(self, *tasks):
145-
for task in tasks:
146-
self.attach(task)
147-
try:
148-
yield
149-
finally:
150-
for task in tasks:
151-
self.detach(task)
85+
通过队列发送消息的任务或线程的模式很容易被实现并且也非常普遍。
86+
不过,使用发布/订阅模式的好处更加明显。
87+
88+
首先,使用一个交换机可以简化大部分涉及到线程通信的工作。
89+
无需去写通过多进程模块来操作多个线程,你只需要使用这个交换机来连接它们。
90+
某种程度上,这个就跟日志模块的工作原理类似。
91+
实际上,它可以轻松的解耦程序中多个任务。
92+
93+
其次,交换机广播消息给多个订阅者的能力带来了一个全新的通信模式。
94+
例如,你可以使用多任务系统、广播或扇出。
95+
你还可以通过以普通订阅者身份绑定来构建调试和诊断工具。
96+
例如,下面是一个简单的诊断类,可以显示被发送的消息:
97+
98+
.. code-block:: python
99+
100+
class DisplayMessages:
101+
def __init__(self):
102+
self.count = 0
103+
def send(self, msg):
104+
self.count += 1
105+
print('msg[{}]: {!r}'.format(self.count, msg))
152106
153-
def send(self, msg):
154-
for subscriber in self._subscribers:
155-
subscriber.send(msg)
107+
exc = get_exchange('name')
108+
d = DisplayMessages()
109+
exc.attach(d)
156110
157-
# Dictionary of all created exchanges
158-
_exchanges = defaultdict(Exchange)
111+
最后,该实现的一个重要特点是它能兼容多个“task-like”对象。
112+
例如,消息接受者可以是actor(12.10小节介绍)、协程、网络连接或任何实现了正确的 ``send()`` 方法的东西。
159113

160-
# Return the Exchange instance associated with a given name
161-
def get_exchange(name):
162-
return _exchanges[name]
114+
关于交换机的一个可能问题是对于订阅者的正确绑定和解绑。
115+
为了正确的管理资源,每一个绑定的订阅者必须最终要解绑。
116+
在代码中通常会是像下面这样的模式:
163117

164-
# Example of using the subscribe() method
165-
exc = get_exchange('name')
166-
with exc.subscribe(task_a, task_b):
167-
...
168-
exc.send('msg1')
169-
exc.send('msg2')
170-
...
118+
.. code-block:: python
171119
172-
# task_a and task_b detached here
120+
exc = get_exchange('name')
121+
exc.attach(some_task)
122+
try:
123+
...
124+
finally:
125+
exc.detach(some_task)
126+
127+
某种意义上,这个和使用文件、锁和类似对象很像。
128+
通常很容易会忘记最后的 ``detach()`` 步骤。
129+
为了简化这个,你可以考虑使用上下文管理器协议。
130+
例如,在交换机对象上增加一个 ``subscribe()`` 方法,如下:
131+
132+
.. code-block:: python
133+
134+
from contextlib import contextmanager
135+
from collections import defaultdict
173136
174-
Finally, it should be noted that there are numerous possible extensions to the exchange
175-
idea. For example, exchanges could implement an entire collection of message channels
137+
class Exchange:
138+
def __init__(self):
139+
self._subscribers = set()
140+
141+
def attach(self, task):
142+
self._subscribers.add(task)
143+
144+
def detach(self, task):
145+
self._subscribers.remove(task)
146+
147+
@contextmanager
148+
def subscribe(self, *tasks):
149+
for task in tasks:
150+
self.attach(task)
151+
try:
152+
yield
153+
finally:
154+
for task in tasks:
155+
self.detach(task)
156+
157+
def send(self, msg):
158+
for subscriber in self._subscribers:
159+
subscriber.send(msg)
160+
161+
# Dictionary of all created exchanges
162+
_exchanges = defaultdict(Exchange)
163+
164+
# Return the Exchange instance associated with a given name
165+
def get_exchange(name):
166+
return _exchanges[name]
167+
168+
# Example of using the subscribe() method
169+
exc = get_exchange('name')
170+
with exc.subscribe(task_a, task_b):
171+
...
172+
exc.send('msg1')
173+
exc.send('msg2')
174+
...
175+
176+
# task_a and task_b detached here
177+
178+
最后还应该注意的是关于交换机的思想有很多种的扩展实现。
179+
例如,交换机可以实现一整个消息通道集合或提供交换机名称的模式匹配规则。
180+
交换机还可以被扩展到分布式计算程序中(比如,将消息路由到不同机器上面的任务中去)。
176181

177-
or apply pattern matching rules to exchange names. Exchanges can also be extended
178-
into distributed computing applications (e.g., routing messages to tasks on different
179-
machines, etc.).

0 commit comments

Comments
 (0)