|
5 | 5 | ----------
|
6 | 6 | 问题
|
7 | 7 | ----------
|
8 |
| -You have a collection of thread queues, and you would like to be able to poll them for |
9 |
| -incoming items, much in the same way as you might poll a collection of network con‐ |
10 |
| -nections for incoming data. |
| 8 | +你有一个线程队列集合,想为到来的元素轮询它们, |
| 9 | +就跟你为一个客户端请求去轮询一个网络连接集合的方式一样。 |
11 | 10 |
|
12 | 11 | |
|
13 | 12 |
|
14 | 13 | ----------
|
15 | 14 | 解决方案
|
16 | 15 | ----------
|
17 |
| -A common solution to polling problems involves a little-known trick involving a hidden |
18 |
| -loopback network connection. Essentially, the idea is as follows: for each queue (or any |
19 |
| -object) that you want to poll, you create a pair of connected sockets. You then write on |
20 |
| -one of the sockets to signal the presence of data. The other sockect is then passed to |
21 |
| -select() or a similar function to poll for the arrival of data. Here is some sample code |
22 |
| -that illustrates this idea: |
23 |
| - |
24 |
| -import queue |
25 |
| -import socket |
26 |
| -import os |
27 |
| - |
28 |
| -class PollableQueue(queue.Queue): |
29 |
| - def __init__(self): |
30 |
| - super().__init__() |
31 |
| - # Create a pair of connected sockets |
32 |
| - if os.name == 'posix': |
33 |
| - self._putsocket, self._getsocket = socket.socketpair() |
34 |
| - else: |
35 |
| - # Compatibility on non-POSIX systems |
36 |
| - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
37 |
| - server.bind(('127.0.0.1', 0)) |
38 |
| - server.listen(1) |
39 |
| - self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
40 |
| - self._putsocket.connect(server.getsockname()) |
41 |
| - self._getsocket, _ = server.accept() |
42 |
| - server.close() |
43 |
| - |
44 |
| - def fileno(self): |
45 |
| - return self._getsocket.fileno() |
46 |
| - |
47 |
| - def put(self, item): |
48 |
| - super().put(item) |
49 |
| - self._putsocket.send(b'x') |
50 |
| - |
51 |
| - def get(self): |
52 |
| - self._getsocket.recv(1) |
53 |
| - return super().get() |
54 |
| - |
55 |
| -In this code, a new kind of Queue instance is defined where there is an underlying pair |
56 |
| -of connected sockets. The socketpair() function on Unix machines can establish such |
57 |
| -sockets easily. On Windows, you have to fake it using code similar to that shown (it |
58 |
| -looks a bit weird, but a server socket is created and a client immediately connects to it |
59 |
| -afterward). The normal get() and put() methods are then redefined slightly to perform |
60 |
| -a small bit of I/O on these sockets. The put() method writes a single byte of data to one |
61 |
| -of the sockets after putting data on the queue. The get() method reads a single byte of |
62 |
| -data from the other socket when removing an item from the queue. |
63 |
| - |
64 |
| -The fileno() method is what makes the queue pollable using a function such as se |
65 |
| -lect(). Essentially, it just exposes the underlying file descriptor of the socket used by |
66 |
| -the get() function. |
67 |
| -Here is an example of some code that defines a consumer which monitors multiple |
68 |
| -queues for incoming items: |
69 |
| - |
70 |
| -import select |
71 |
| -import threading |
72 |
| - |
73 |
| -def consumer(queues): |
74 |
| - ''' |
75 |
| - Consumer that reads data on multiple queues simultaneously |
76 |
| - ''' |
77 |
| - while True: |
78 |
| - can_read, _, _ = select.select(queues,[],[]) |
79 |
| - for r in can_read: |
80 |
| - item = r.get() |
81 |
| - print('Got:', item) |
82 |
| - |
83 |
| -q1 = PollableQueue() |
84 |
| -q2 = PollableQueue() |
85 |
| -q3 = PollableQueue() |
86 |
| -t = threading.Thread(target=consumer, args=([q1,q2,q3],)) |
87 |
| -t.daemon = True |
88 |
| -t.start() |
89 |
| - |
90 |
| -# Feed data to the queues |
91 |
| -q1.put(1) |
92 |
| -q2.put(10) |
93 |
| -q3.put('hello') |
94 |
| -q2.put(15) |
95 |
| -... |
96 |
| - |
97 |
| -If you try it, you’ll find that the consumer indeed receives all of the put items, regardless |
98 |
| -of which queues they are placed in. |
| 16 | +对于轮询问题的一个常见解决方案中有个很少有人知道的技巧,包含了一个隐藏的回路网络连接。 |
| 17 | +本质上讲其思想就是:对于每个你想要轮询的队列,你创建一对连接的套接字。 |
| 18 | +然后你在其中一个套接字上面编写代码来标识存在的数据, |
| 19 | +另外一个套接字被传给 ``select()`` 或类似的一个轮询数据到达的函数。下面的例子演示了这个思想: |
| 20 | + |
| 21 | +.. code-block:: python |
| 22 | +
|
| 23 | + import queue |
| 24 | + import socket |
| 25 | + import os |
| 26 | +
|
| 27 | + class PollableQueue(queue.Queue): |
| 28 | + def __init__(self): |
| 29 | + super().__init__() |
| 30 | + # Create a pair of connected sockets |
| 31 | + if os.name == 'posix': |
| 32 | + self._putsocket, self._getsocket = socket.socketpair() |
| 33 | + else: |
| 34 | + # Compatibility on non-POSIX systems |
| 35 | + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 36 | + server.bind(('127.0.0.1', 0)) |
| 37 | + server.listen(1) |
| 38 | + self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 39 | + self._putsocket.connect(server.getsockname()) |
| 40 | + self._getsocket, _ = server.accept() |
| 41 | + server.close() |
| 42 | +
|
| 43 | + def fileno(self): |
| 44 | + return self._getsocket.fileno() |
| 45 | +
|
| 46 | + def put(self, item): |
| 47 | + super().put(item) |
| 48 | + self._putsocket.send(b'x') |
| 49 | +
|
| 50 | + def get(self): |
| 51 | + self._getsocket.recv(1) |
| 52 | + return super().get() |
| 53 | +
|
| 54 | +在这个代码中,一个新的 ``Queue`` 实例类型被定义,底层是一个被连接套接字对。 |
| 55 | +在Unix机器上的 ``socketpair()`` 函数能轻松的创建这样的套接字。 |
| 56 | +在Windows上面,你必须使用类似代码来模拟它。 |
| 57 | +然后定义普通的 ``get()`` 和 ``put()`` 方法在这些套接字上面来执行I/O操作。 |
| 58 | +``put()`` 方法再将数据放入队列后会写一个单字节到某个套接字中去。 |
| 59 | +而 ``get()`` 方法在从队列中移除一个元素时会从另外一个套接字中读取到这个单字节数据。 |
| 60 | + |
| 61 | +``fileno()`` 方法使用一个函数比如 ``select()`` 来让这个队列可以被轮询。 |
| 62 | +它仅仅只是暴露了底层被 ``get()`` 函数使用到的socket的文件描述符而已。 |
| 63 | + |
| 64 | +下面是一个例子,定义了一个为到来的元素监控多个队列的消费者: |
| 65 | + |
| 66 | +.. code-block:: python |
| 67 | +
|
| 68 | + import select |
| 69 | + import threading |
| 70 | +
|
| 71 | + def consumer(queues): |
| 72 | + ''' |
| 73 | + Consumer that reads data on multiple queues simultaneously |
| 74 | + ''' |
| 75 | + while True: |
| 76 | + can_read, _, _ = select.select(queues,[],[]) |
| 77 | + for r in can_read: |
| 78 | + item = r.get() |
| 79 | + print('Got:', item) |
| 80 | +
|
| 81 | + q1 = PollableQueue() |
| 82 | + q2 = PollableQueue() |
| 83 | + q3 = PollableQueue() |
| 84 | + t = threading.Thread(target=consumer, args=([q1,q2,q3],)) |
| 85 | + t.daemon = True |
| 86 | + t.start() |
| 87 | +
|
| 88 | + # Feed data to the queues |
| 89 | + q1.put(1) |
| 90 | + q2.put(10) |
| 91 | + q3.put('hello') |
| 92 | + q2.put(15) |
| 93 | + ... |
| 94 | +
|
| 95 | +如果你试着运行它,你会发现这个消费者会接受到所有的被放入的元素,不管元素被放进了哪个队列中。 |
99 | 96 |
|
100 | 97 | |
|
101 | 98 |
|
102 | 99 | ----------
|
103 | 100 | 讨论
|
104 | 101 | ----------
|
105 |
| -The problem of polling non-file-like objects, such as queues, is often a lot trickier than |
106 |
| -it looks. For instance, if you don’t use the socket technique shown, your only option is |
107 |
| -to write code that cycles through the queues and uses a timer, like this: |
108 |
| - |
109 |
| -import time |
110 |
| -def consumer(queues): |
111 |
| - while True: |
112 |
| - for q in queues: |
113 |
| - if not q.empty(): |
114 |
| - item = q.get() |
115 |
| - print('Got:', item) |
116 |
| - |
117 |
| - # Sleep briefly to avoid 100% CPU |
118 |
| - time.sleep(0.01) |
119 |
| - |
120 |
| -This might work for certain kinds of problems, but it’s clumsy and introduces other |
121 |
| -weird performance problems. For example, if new data is added to a queue, it won’t be |
122 |
| -detected for as long as 10 milliseconds (an eternity on a modern processor). |
123 |
| -You run into even further problems if the preceding polling is mixed with the polling |
124 |
| -of other objects, such as network sockets. For example, if you want to poll both sockets |
125 |
| -and queues at the same time, you might have to use code like this: |
126 |
| - |
127 |
| -import select |
128 |
| - |
129 |
| -def event_loop(sockets, queues): |
130 |
| - while True: |
131 |
| - # polling with a timeout |
132 |
| - can_read, _, _ = select.select(sockets, [], [], 0.01) |
133 |
| - for r in can_read: |
134 |
| - handle_read(r) |
135 |
| - for q in queues: |
136 |
| - if not q.empty(): |
137 |
| - item = q.get() |
138 |
| - print('Got:', item) |
| 102 | +对于轮询非类文件对象,比如队列通常都是比较棘手的问题。 |
| 103 | +例如,如果你不使用上面的套接字技术, |
| 104 | +你唯一的选择就是编写代码来循环遍历这些队列并使用一个定时器。像下面这样: |
| 105 | + |
| 106 | +.. code-block:: python |
| 107 | +
|
| 108 | + import time |
| 109 | + def consumer(queues): |
| 110 | + while True: |
| 111 | + for q in queues: |
| 112 | + if not q.empty(): |
| 113 | + item = q.get() |
| 114 | + print('Got:', item) |
| 115 | +
|
| 116 | + # Sleep briefly to avoid 100% CPU |
| 117 | + time.sleep(0.01) |
| 118 | +
|
| 119 | +这样做其实不合理,还会引入其他的性能问题。 |
| 120 | +例如,如果新的数据被加入到一个队列中,至少要花10毫秒才能被发现。 |
| 121 | +如果你之前的轮询还要去轮询其他对象,比如网络套接字那还会有更多问题。 |
| 122 | +例如,如果你想同时轮询套接字和队列,你可能要像下面这样使用: |
| 123 | + |
| 124 | +.. code-block:: python |
| 125 | +
|
| 126 | + import select |
| 127 | +
|
| 128 | + def event_loop(sockets, queues): |
| 129 | + while True: |
| 130 | + # polling with a timeout |
| 131 | + can_read, _, _ = select.select(sockets, [], [], 0.01) |
| 132 | + for r in can_read: |
| 133 | + handle_read(r) |
| 134 | + for q in queues: |
| 135 | + if not q.empty(): |
| 136 | + item = q.get() |
| 137 | + print('Got:', item) |
| 138 | +
|
| 139 | +这个方案通过将队列和套接字等同对待来解决了大部分的问题。 |
| 140 | +一个单独的 ``select()`` 调用可被同时用来轮询。 |
| 141 | +使用超时或其他基于时间的机制来执行周期性检查并没有必要。 |
| 142 | +甚至,如果数据被加入到一个队列,消费者几乎可以实时的被通知。 |
| 143 | +尽管会有一点点底层的I/O损耗,使用它通常会获得更好的响应时间并简化编程。 |
139 | 144 |
|
140 |
| -The solution shown solves a lot of these problems by simply putting queues on equal |
141 |
| -status with sockets. A single select() call can be used to poll for activity on both. It is |
142 |
| -not necessary to use timeouts or other time-based hacks to periodically check. More‐ |
143 |
| -over, if data gets added to a queue, the consumer will be notified almost instantaneously. |
144 |
| -Although there is a tiny amount of overhead associated with the underlying I/O, it often |
145 |
| -is worth it to have better response time and simplified coding. |
0 commit comments