Skip to content

Commit 5ea31e7

Browse files
committed
12.13小节完成
1 parent c618c99 commit 5ea31e7

File tree

1 file changed

+124
-125
lines changed

1 file changed

+124
-125
lines changed

source/c12/p13_polling_multiple_thread_queues.rst

+124-125
Original file line numberDiff line numberDiff line change
@@ -5,141 +5,140 @@
55
----------
66
问题
77
----------
8-
You have a collection of thread queues, and you would like to be able to poll them for
9-
incoming items, much in the same way as you might poll a collection of network con‐
10-
nections for incoming data.
8+
你有一个线程队列集合,想为到来的元素轮询它们,
9+
就跟你为一个客户端请求去轮询一个网络连接集合的方式一样。
1110

1211
|
1312
1413
----------
1514
解决方案
1615
----------
17-
A common solution to polling problems involves a little-known trick involving a hidden
18-
loopback network connection. Essentially, the idea is as follows: for each queue (or any
19-
object) that you want to poll, you create a pair of connected sockets. You then write on
20-
one of the sockets to signal the presence of data. The other sockect is then passed to
21-
select() or a similar function to poll for the arrival of data. Here is some sample code
22-
that illustrates this idea:
23-
24-
import queue
25-
import socket
26-
import os
27-
28-
class PollableQueue(queue.Queue):
29-
def __init__(self):
30-
super().__init__()
31-
# Create a pair of connected sockets
32-
if os.name == 'posix':
33-
self._putsocket, self._getsocket = socket.socketpair()
34-
else:
35-
# Compatibility on non-POSIX systems
36-
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
37-
server.bind(('127.0.0.1', 0))
38-
server.listen(1)
39-
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
40-
self._putsocket.connect(server.getsockname())
41-
self._getsocket, _ = server.accept()
42-
server.close()
43-
44-
def fileno(self):
45-
return self._getsocket.fileno()
46-
47-
def put(self, item):
48-
super().put(item)
49-
self._putsocket.send(b'x')
50-
51-
def get(self):
52-
self._getsocket.recv(1)
53-
return super().get()
54-
55-
In this code, a new kind of Queue instance is defined where there is an underlying pair
56-
of connected sockets. The socketpair() function on Unix machines can establish such
57-
sockets easily. On Windows, you have to fake it using code similar to that shown (it
58-
looks a bit weird, but a server socket is created and a client immediately connects to it
59-
afterward). The normal get() and put() methods are then redefined slightly to perform
60-
a small bit of I/O on these sockets. The put() method writes a single byte of data to one
61-
of the sockets after putting data on the queue. The get() method reads a single byte of
62-
data from the other socket when removing an item from the queue.
63-
64-
The fileno() method is what makes the queue pollable using a function such as se
65-
lect(). Essentially, it just exposes the underlying file descriptor of the socket used by
66-
the get() function.
67-
Here is an example of some code that defines a consumer which monitors multiple
68-
queues for incoming items:
69-
70-
import select
71-
import threading
72-
73-
def consumer(queues):
74-
'''
75-
Consumer that reads data on multiple queues simultaneously
76-
'''
77-
while True:
78-
can_read, _, _ = select.select(queues,[],[])
79-
for r in can_read:
80-
item = r.get()
81-
print('Got:', item)
82-
83-
q1 = PollableQueue()
84-
q2 = PollableQueue()
85-
q3 = PollableQueue()
86-
t = threading.Thread(target=consumer, args=([q1,q2,q3],))
87-
t.daemon = True
88-
t.start()
89-
90-
# Feed data to the queues
91-
q1.put(1)
92-
q2.put(10)
93-
q3.put('hello')
94-
q2.put(15)
95-
...
96-
97-
If you try it, you’ll find that the consumer indeed receives all of the put items, regardless
98-
of which queues they are placed in.
16+
对于轮询问题的一个常见解决方案中有个很少有人知道的技巧,包含了一个隐藏的回路网络连接。
17+
本质上讲其思想就是:对于每个你想要轮询的队列,你创建一对连接的套接字。
18+
然后你在其中一个套接字上面编写代码来标识存在的数据,
19+
另外一个套接字被传给 ``select()`` 或类似的一个轮询数据到达的函数。下面的例子演示了这个思想:
20+
21+
.. code-block:: python
22+
23+
import queue
24+
import socket
25+
import os
26+
27+
class PollableQueue(queue.Queue):
28+
def __init__(self):
29+
super().__init__()
30+
# Create a pair of connected sockets
31+
if os.name == 'posix':
32+
self._putsocket, self._getsocket = socket.socketpair()
33+
else:
34+
# Compatibility on non-POSIX systems
35+
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
36+
server.bind(('127.0.0.1', 0))
37+
server.listen(1)
38+
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
39+
self._putsocket.connect(server.getsockname())
40+
self._getsocket, _ = server.accept()
41+
server.close()
42+
43+
def fileno(self):
44+
return self._getsocket.fileno()
45+
46+
def put(self, item):
47+
super().put(item)
48+
self._putsocket.send(b'x')
49+
50+
def get(self):
51+
self._getsocket.recv(1)
52+
return super().get()
53+
54+
在这个代码中,一个新的 ``Queue`` 实例类型被定义,底层是一个被连接套接字对。
55+
在Unix机器上的 ``socketpair()`` 函数能轻松的创建这样的套接字。
56+
在Windows上面,你必须使用类似代码来模拟它。
57+
然后定义普通的 ``get()`` 和 ``put()`` 方法在这些套接字上面来执行I/O操作。
58+
``put()`` 方法再将数据放入队列后会写一个单字节到某个套接字中去。
59+
而 ``get()`` 方法在从队列中移除一个元素时会从另外一个套接字中读取到这个单字节数据。
60+
61+
``fileno()`` 方法使用一个函数比如 ``select()`` 来让这个队列可以被轮询。
62+
它仅仅只是暴露了底层被 ``get()`` 函数使用到的socket的文件描述符而已。
63+
64+
下面是一个例子,定义了一个为到来的元素监控多个队列的消费者:
65+
66+
.. code-block:: python
67+
68+
import select
69+
import threading
70+
71+
def consumer(queues):
72+
'''
73+
Consumer that reads data on multiple queues simultaneously
74+
'''
75+
while True:
76+
can_read, _, _ = select.select(queues,[],[])
77+
for r in can_read:
78+
item = r.get()
79+
print('Got:', item)
80+
81+
q1 = PollableQueue()
82+
q2 = PollableQueue()
83+
q3 = PollableQueue()
84+
t = threading.Thread(target=consumer, args=([q1,q2,q3],))
85+
t.daemon = True
86+
t.start()
87+
88+
# Feed data to the queues
89+
q1.put(1)
90+
q2.put(10)
91+
q3.put('hello')
92+
q2.put(15)
93+
...
94+
95+
如果你试着运行它,你会发现这个消费者会接受到所有的被放入的元素,不管元素被放进了哪个队列中。
9996

10097
|
10198
10299
----------
103100
讨论
104101
----------
105-
The problem of polling non-file-like objects, such as queues, is often a lot trickier than
106-
it looks. For instance, if you don’t use the socket technique shown, your only option is
107-
to write code that cycles through the queues and uses a timer, like this:
108-
109-
import time
110-
def consumer(queues):
111-
while True:
112-
for q in queues:
113-
if not q.empty():
114-
item = q.get()
115-
print('Got:', item)
116-
117-
# Sleep briefly to avoid 100% CPU
118-
time.sleep(0.01)
119-
120-
This might work for certain kinds of problems, but it’s clumsy and introduces other
121-
weird performance problems. For example, if new data is added to a queue, it won’t be
122-
detected for as long as 10 milliseconds (an eternity on a modern processor).
123-
You run into even further problems if the preceding polling is mixed with the polling
124-
of other objects, such as network sockets. For example, if you want to poll both sockets
125-
and queues at the same time, you might have to use code like this:
126-
127-
import select
128-
129-
def event_loop(sockets, queues):
130-
while True:
131-
# polling with a timeout
132-
can_read, _, _ = select.select(sockets, [], [], 0.01)
133-
for r in can_read:
134-
handle_read(r)
135-
for q in queues:
136-
if not q.empty():
137-
item = q.get()
138-
print('Got:', item)
102+
对于轮询非类文件对象,比如队列通常都是比较棘手的问题。
103+
例如,如果你不使用上面的套接字技术,
104+
你唯一的选择就是编写代码来循环遍历这些队列并使用一个定时器。像下面这样:
105+
106+
.. code-block:: python
107+
108+
import time
109+
def consumer(queues):
110+
while True:
111+
for q in queues:
112+
if not q.empty():
113+
item = q.get()
114+
print('Got:', item)
115+
116+
# Sleep briefly to avoid 100% CPU
117+
time.sleep(0.01)
118+
119+
这样做其实不合理,还会引入其他的性能问题。
120+
例如,如果新的数据被加入到一个队列中,至少要花10毫秒才能被发现。
121+
如果你之前的轮询还要去轮询其他对象,比如网络套接字那还会有更多问题。
122+
例如,如果你想同时轮询套接字和队列,你可能要像下面这样使用:
123+
124+
.. code-block:: python
125+
126+
import select
127+
128+
def event_loop(sockets, queues):
129+
while True:
130+
# polling with a timeout
131+
can_read, _, _ = select.select(sockets, [], [], 0.01)
132+
for r in can_read:
133+
handle_read(r)
134+
for q in queues:
135+
if not q.empty():
136+
item = q.get()
137+
print('Got:', item)
138+
139+
这个方案通过将队列和套接字等同对待来解决了大部分的问题。
140+
一个单独的 ``select()`` 调用可被同时用来轮询。
141+
使用超时或其他基于时间的机制来执行周期性检查并没有必要。
142+
甚至,如果数据被加入到一个队列,消费者几乎可以实时的被通知。
143+
尽管会有一点点底层的I/O损耗,使用它通常会获得更好的响应时间并简化编程。
139144

140-
The solution shown solves a lot of these problems by simply putting queues on equal
141-
status with sockets. A single select() call can be used to poll for activity on both. It is
142-
not necessary to use timeouts or other time-based hacks to periodically check. More‐
143-
over, if data gets added to a queue, the consumer will be notified almost instantaneously.
144-
Although there is a tiny amount of overhead associated with the underlying I/O, it often
145-
is worth it to have better response time and simplified coding.

0 commit comments

Comments
 (0)