|
5 | 5 | ----------
|
6 | 6 | 问题
|
7 | 7 | ----------
|
8 |
| -You have a program based on communicating threads and want them to implement |
9 |
| -publish/subscribe messaging. |
| 8 | +你有一个基于线程通信的程序,想让它们实现发布/订阅模式的消息通信。 |
10 | 9 |
|
11 | 10 | |
|
12 | 11 |
|
13 | 12 | ----------
|
14 | 13 | 解决方案
|
15 | 14 | ----------
|
16 |
| -To implement publish/subscribe messaging, you typically introduce a separate “ex‐ |
17 |
| -change” or “gateway” object that acts as an intermediary for all messages. That is, instead |
18 |
| -of directly sending a message from one task to another, a message is sent to the exchange |
19 |
| -and it delivers it to one or more attached tasks. Here is one example of a very simple |
20 |
| -exchange implementation: |
| 15 | +要实现发布/订阅的消息通信模式, |
| 16 | +你通常要引入一个单独的“交换机”或“网关”对象作为所有消息的中介。 |
| 17 | +也就是说,不直接将消息从一个任务发送到另一个,而是将其发送给交换机, |
| 18 | +然后由交换机将它发送给一个或多个被关联任务。下面是一个非常简单的交换机实现例子: |
21 | 19 |
|
22 |
| -from collections import defaultdict |
| 20 | +.. code-block:: python |
23 | 21 |
|
24 |
| -class Exchange: |
25 |
| - def __init__(self): |
26 |
| - self._subscribers = set() |
| 22 | + from collections import defaultdict |
27 | 23 |
|
28 |
| - def attach(self, task): |
29 |
| - self._subscribers.add(task) |
| 24 | + class Exchange: |
| 25 | + def __init__(self): |
| 26 | + self._subscribers = set() |
30 | 27 |
|
31 |
| - def detach(self, task): |
32 |
| - self._subscribers.remove(task) |
| 28 | + def attach(self, task): |
| 29 | + self._subscribers.add(task) |
33 | 30 |
|
34 |
| - def send(self, msg): |
35 |
| - for subscriber in self._subscribers: |
36 |
| - subscriber.send(msg) |
| 31 | + def detach(self, task): |
| 32 | + self._subscribers.remove(task) |
37 | 33 |
|
38 |
| -# Dictionary of all created exchanges |
39 |
| -_exchanges = defaultdict(Exchange) |
| 34 | + def send(self, msg): |
| 35 | + for subscriber in self._subscribers: |
| 36 | + subscriber.send(msg) |
40 | 37 |
|
41 |
| -# Return the Exchange instance associated with a given name |
42 |
| -def get_exchange(name): |
43 |
| - return _exchanges[name] |
| 38 | + # Dictionary of all created exchanges |
| 39 | + _exchanges = defaultdict(Exchange) |
44 | 40 |
|
45 |
| -An exchange is really nothing more than an object that keeps a set of active subscribers |
46 |
| -and provides methods for attaching, detaching, and sending messages. Each exchange |
47 |
| -is identified by a name, and the get_exchange() function simply returns the Ex |
48 |
| -change instance associated with a given name. |
49 |
| -Here is a simple example that shows how to use an exchange: |
| 41 | + # Return the Exchange instance associated with a given name |
| 42 | + def get_exchange(name): |
| 43 | + return _exchanges[name] |
50 | 44 |
|
51 |
| -# Example of a task. Any object with a send() method |
| 45 | +一个交换机就是一个普通对象,负责维护一个活跃的订阅者集合,并为绑定、解绑和发送消息提供相应的方法。 |
| 46 | +每个交换机通过一个名称定位,``get_exchange()`` 通过给定一个名称返回相应的 ``Exchange`` 实例。 |
52 | 47 |
|
53 |
| -class Task: |
54 |
| - ... |
55 |
| - def send(self, msg): |
| 48 | +下面是一个简单例子,演示了如何使用一个交换机: |
| 49 | + |
| 50 | +.. code-block:: python |
| 51 | +
|
| 52 | + # Example of a task. Any object with a send() method |
| 53 | +
|
| 54 | + class Task: |
56 | 55 | ...
|
| 56 | + def send(self, msg): |
| 57 | + ... |
57 | 58 |
|
58 |
| -task_a = Task() |
59 |
| -task_b = Task() |
| 59 | + task_a = Task() |
| 60 | + task_b = Task() |
60 | 61 |
|
61 |
| -# Example of getting an exchange |
62 |
| -exc = get_exchange('name') |
| 62 | + # Example of getting an exchange |
| 63 | + exc = get_exchange('name') |
63 | 64 |
|
64 |
| -# Examples of subscribing tasks to it |
65 |
| -exc.attach(task_a) |
66 |
| -exc.attach(task_b) |
| 65 | + # Examples of subscribing tasks to it |
| 66 | + exc.attach(task_a) |
| 67 | + exc.attach(task_b) |
67 | 68 |
|
68 |
| -# Example of sending messages |
69 |
| -exc.send('msg1') |
70 |
| -exc.send('msg2') |
| 69 | + # Example of sending messages |
| 70 | + exc.send('msg1') |
| 71 | + exc.send('msg2') |
71 | 72 |
|
72 |
| -# Example of unsubscribing |
73 |
| -exc.detach(task_a) |
74 |
| -exc.detach(task_b) |
| 73 | + # Example of unsubscribing |
| 74 | + exc.detach(task_a) |
| 75 | + exc.detach(task_b) |
75 | 76 |
|
76 |
| -Although there are many different variations on this theme, the overall idea is the same. |
77 |
| -Messages will be delivered to an exchange and the exchange will deliver them to attached |
78 |
| -subscribers. |
| 77 | +尽管对于这个问题有很多的变种,不过万变不离其宗。 |
| 78 | +消息会被发送给一个交换机,然后交换机会将它们发送给被绑定的订阅者。 |
79 | 79 |
|
80 | 80 | |
|
81 | 81 |
|
82 | 82 | ----------
|
83 | 83 | 讨论
|
84 | 84 | ----------
|
85 |
| -The concept of tasks or threads sending messages to one another (often via queues) is |
86 |
| -easy to implement and quite popular. However, the benefits of using a public/subscribe |
87 |
| -(pub/sub) model instead are often overlooked. |
88 |
| -First, the use of an exchange can simplify much of the plumbing involved in setting up |
89 |
| -communicating threads. Instead of trying to wire threads together across multiple pro‐ |
90 |
| -gram modules, you only worry about connecting them to a known exchange. In some |
91 |
| -sense, this is similar to how the logging library works. In practice, it can make it easier |
92 |
| -to decouple various tasks in the program. |
93 |
| -Second, the ability of the exchange to broadcast messages to multiple subscribers opens |
94 |
| -up new communication patterns. For example, you could implement systems with re‐ |
95 |
| -dundant tasks, broadcasting, or fan-out. You could also build debugging and diagnostic |
96 |
| -tools that attach themselves to exchanges as ordinary subscribers. For example, here is |
97 |
| -a simple diagnostic class that would display sent messages: |
98 |
| - |
99 |
| -class DisplayMessages: |
100 |
| - def __init__(self): |
101 |
| - self.count = 0 |
102 |
| - def send(self, msg): |
103 |
| - self.count += 1 |
104 |
| - print('msg[{}]: {!r}'.format(self.count, msg)) |
105 |
| - |
106 |
| -exc = get_exchange('name') |
107 |
| -d = DisplayMessages() |
108 |
| -exc.attach(d) |
109 |
| - |
110 |
| -Last, but not least, a notable aspect of the implementation is that it works with a variety |
111 |
| -of task-like objects. For example, the receivers of a message could be actors (as described |
112 |
| -in Recipe 12.10), coroutines, network connections, or just about anything that imple‐ |
113 |
| -ments a proper send() method. |
114 |
| -One potentially problematic aspect of an exchange concerns the proper attachment and |
115 |
| -detachment of subscribers. In order to properly manage resources, every subscriber that |
116 |
| -attaches must eventually detach. This leads to a programming model similar to this: |
117 |
| - |
118 |
| -exc = get_exchange('name') |
119 |
| -exc.attach(some_task) |
120 |
| -try: |
121 |
| - ... |
122 |
| -finally: |
123 |
| - exc.detach(some_task) |
124 |
| - |
125 |
| -In some sense, this is similar to the usage of files, locks, and similar objects. Experience |
126 |
| -has shown that it is quite easy to forget the final detach() step. To simplify this, you |
127 |
| -might consider the use of the context-management protocol. For example, adding a |
128 |
| -subscribe() method to the exchange like this: |
129 |
| - |
130 |
| -from contextlib import contextmanager |
131 |
| -from collections import defaultdict |
132 |
| - |
133 |
| -class Exchange: |
134 |
| - def __init__(self): |
135 |
| - self._subscribers = set() |
136 |
| - |
137 |
| - def attach(self, task): |
138 |
| - self._subscribers.add(task) |
139 |
| - |
140 |
| - def detach(self, task): |
141 |
| - self._subscribers.remove(task) |
142 |
| - |
143 |
| - @contextmanager |
144 |
| - def subscribe(self, *tasks): |
145 |
| - for task in tasks: |
146 |
| - self.attach(task) |
147 |
| - try: |
148 |
| - yield |
149 |
| - finally: |
150 |
| - for task in tasks: |
151 |
| - self.detach(task) |
| 85 | +通过队列发送消息的任务或线程的模式很容易被实现并且也非常普遍。 |
| 86 | +不过,使用发布/订阅模式的好处更加明显。 |
| 87 | + |
| 88 | +首先,使用一个交换机可以简化大部分涉及到线程通信的工作。 |
| 89 | +无需去写通过多进程模块来操作多个线程,你只需要使用这个交换机来连接它们。 |
| 90 | +某种程度上,这个就跟日志模块的工作原理类似。 |
| 91 | +实际上,它可以轻松的解耦程序中多个任务。 |
| 92 | + |
| 93 | +其次,交换机广播消息给多个订阅者的能力带来了一个全新的通信模式。 |
| 94 | +例如,你可以使用多任务系统、广播或扇出。 |
| 95 | +你还可以通过以普通订阅者身份绑定来构建调试和诊断工具。 |
| 96 | +例如,下面是一个简单的诊断类,可以显示被发送的消息: |
| 97 | + |
| 98 | +.. code-block:: python |
| 99 | +
|
| 100 | + class DisplayMessages: |
| 101 | + def __init__(self): |
| 102 | + self.count = 0 |
| 103 | + def send(self, msg): |
| 104 | + self.count += 1 |
| 105 | + print('msg[{}]: {!r}'.format(self.count, msg)) |
152 | 106 |
|
153 |
| - def send(self, msg): |
154 |
| - for subscriber in self._subscribers: |
155 |
| - subscriber.send(msg) |
| 107 | + exc = get_exchange('name') |
| 108 | + d = DisplayMessages() |
| 109 | + exc.attach(d) |
156 | 110 |
|
157 |
| -# Dictionary of all created exchanges |
158 |
| -_exchanges = defaultdict(Exchange) |
| 111 | +最后,该实现的一个重要特点是它能兼容多个“task-like”对象。 |
| 112 | +例如,消息接受者可以是actor(12.10小节介绍)、协程、网络连接或任何实现了正确的 ``send()`` 方法的东西。 |
159 | 113 |
|
160 |
| -# Return the Exchange instance associated with a given name |
161 |
| -def get_exchange(name): |
162 |
| - return _exchanges[name] |
| 114 | +关于交换机的一个可能问题是对于订阅者的正确绑定和解绑。 |
| 115 | +为了正确的管理资源,每一个绑定的订阅者必须最终要解绑。 |
| 116 | +在代码中通常会是像下面这样的模式: |
163 | 117 |
|
164 |
| -# Example of using the subscribe() method |
165 |
| -exc = get_exchange('name') |
166 |
| -with exc.subscribe(task_a, task_b): |
167 |
| - ... |
168 |
| - exc.send('msg1') |
169 |
| - exc.send('msg2') |
170 |
| - ... |
| 118 | +.. code-block:: python |
171 | 119 |
|
172 |
| -# task_a and task_b detached here |
| 120 | + exc = get_exchange('name') |
| 121 | + exc.attach(some_task) |
| 122 | + try: |
| 123 | + ... |
| 124 | + finally: |
| 125 | + exc.detach(some_task) |
| 126 | +
|
| 127 | +某种意义上,这个和使用文件、锁和类似对象很像。 |
| 128 | +通常很容易会忘记最后的 ``detach()`` 步骤。 |
| 129 | +为了简化这个,你可以考虑使用上下文管理器协议。 |
| 130 | +例如,在交换机对象上增加一个 ``subscribe()`` 方法,如下: |
| 131 | + |
| 132 | +.. code-block:: python |
| 133 | +
|
| 134 | + from contextlib import contextmanager |
| 135 | + from collections import defaultdict |
173 | 136 |
|
174 |
| -Finally, it should be noted that there are numerous possible extensions to the exchange |
175 |
| -idea. For example, exchanges could implement an entire collection of message channels |
| 137 | + class Exchange: |
| 138 | + def __init__(self): |
| 139 | + self._subscribers = set() |
| 140 | +
|
| 141 | + def attach(self, task): |
| 142 | + self._subscribers.add(task) |
| 143 | +
|
| 144 | + def detach(self, task): |
| 145 | + self._subscribers.remove(task) |
| 146 | +
|
| 147 | + @contextmanager |
| 148 | + def subscribe(self, *tasks): |
| 149 | + for task in tasks: |
| 150 | + self.attach(task) |
| 151 | + try: |
| 152 | + yield |
| 153 | + finally: |
| 154 | + for task in tasks: |
| 155 | + self.detach(task) |
| 156 | +
|
| 157 | + def send(self, msg): |
| 158 | + for subscriber in self._subscribers: |
| 159 | + subscriber.send(msg) |
| 160 | +
|
| 161 | + # Dictionary of all created exchanges |
| 162 | + _exchanges = defaultdict(Exchange) |
| 163 | +
|
| 164 | + # Return the Exchange instance associated with a given name |
| 165 | + def get_exchange(name): |
| 166 | + return _exchanges[name] |
| 167 | +
|
| 168 | + # Example of using the subscribe() method |
| 169 | + exc = get_exchange('name') |
| 170 | + with exc.subscribe(task_a, task_b): |
| 171 | + ... |
| 172 | + exc.send('msg1') |
| 173 | + exc.send('msg2') |
| 174 | + ... |
| 175 | +
|
| 176 | + # task_a and task_b detached here |
| 177 | +
|
| 178 | +最后还应该注意的是关于交换机的思想有很多种的扩展实现。 |
| 179 | +例如,交换机可以实现一整个消息通道集合或提供交换机名称的模式匹配规则。 |
| 180 | +交换机还可以被扩展到分布式计算程序中(比如,将消息路由到不同机器上面的任务中去)。 |
176 | 181 |
|
177 |
| -or apply pattern matching rules to exchange names. Exchanges can also be extended |
178 |
| -into distributed computing applications (e.g., routing messages to tasks on different |
179 |
| -machines, etc.). |
|
0 commit comments