Skip to content

Commit 900100a

Browse files
author
yidao620c
committed
12.8小节完成
1 parent e7460ca commit 900100a

File tree

1 file changed

+174
-178
lines changed

1 file changed

+174
-178
lines changed

source/c12/p08_perform_simple_parallel_programming.rst

+174-178
Original file line numberDiff line numberDiff line change
@@ -5,216 +5,212 @@
55
----------
66
问题
77
----------
8-
You have a program that performs a lot of CPU-intensive work, and you want to make
9-
it run faster by having it take advantage of multiple CPUs.
8+
你有个程序要执行CPU密集型工作,你想让他利用多核CPU的优势来运行的快一点。
109

1110
|
1211
1312
----------
1413
解决方案
1514
----------
16-
The concurrent.futures library provides a ProcessPoolExecutor class that can be
17-
used to execute computationally intensive functions in a separately running instance of
18-
the Python interpreter. However, in order to use it, you first need to have some com‐
19-
putationally intensive work. Let’s illustrate with a simple yet practical example.
20-
Suppose you have an entire directory of gzip-compressed Apache web server logs:
21-
22-
logs/
23-
20120701.log.gz
24-
20120702.log.gz
25-
20120703.log.gz
26-
20120704.log.gz
27-
20120705.log.gz
28-
20120706.log.gz
29-
...
30-
31-
Further suppose each log file contains lines like this:
32-
33-
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
34-
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
35-
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
36-
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
37-
...
38-
39-
Here is a simple script that takes this data and identifies all hosts that have accessed the
40-
robots.txt file:
41-
42-
# findrobots.py
43-
44-
import gzip
45-
import io
46-
import glob
47-
48-
def find_robots(filename):
49-
'''
50-
Find all of the hosts that access robots.txt in a single log file
51-
'''
52-
robots = set()
53-
with gzip.open(filename) as f:
54-
for line in io.TextIOWrapper(f,encoding='ascii'):
55-
fields = line.split()
56-
if fields[6] == '/robots.txt':
57-
robots.add(fields[0])
58-
return robots
59-
60-
def find_all_robots(logdir):
61-
'''
62-
Find all hosts across and entire sequence of files
63-
'''
64-
files = glob.glob(logdir+'/*.log.gz')
65-
all_robots = set()
66-
for robots in map(find_robots, files):
67-
all_robots.update(robots)
68-
return all_robots
69-
70-
if __name__ == '__main__':
71-
robots = find_all_robots('logs')
72-
for ipaddr in robots:
73-
print(ipaddr)
74-
75-
The preceding program is written in the commonly used map-reduce style. The function
76-
find_robots() is mapped across a collection of filenames and the results are combined
77-
into a single result (the all_robots set in the find_all_robots() function).
78-
Now, suppose you want to modify this program to use multiple CPUs. It turns out to
79-
be easy—simply replace the map() operation with a similar operation carried out on a
80-
process pool from the concurrent.futures library. Here is a slightly modified version
81-
of the code:
82-
83-
# findrobots.py
84-
85-
import gzip
86-
import io
87-
import glob
88-
from concurrent import futures
89-
90-
def find_robots(filename):
91-
'''
92-
Find all of the hosts that access robots.txt in a single log file
93-
94-
'''
95-
robots = set()
96-
with gzip.open(filename) as f:
97-
for line in io.TextIOWrapper(f,encoding='ascii'):
98-
fields = line.split()
99-
if fields[6] == '/robots.txt':
100-
robots.add(fields[0])
101-
return robots
102-
103-
def find_all_robots(logdir):
104-
'''
105-
Find all hosts across and entire sequence of files
106-
'''
107-
files = glob.glob(logdir+'/*.log.gz')
108-
all_robots = set()
109-
with futures.ProcessPoolExecutor() as pool:
110-
for robots in pool.map(find_robots, files):
111-
all_robots.update(robots)
112-
return all_robots
15+
``concurrent.futures`` 库提供了一个 ``ProcessPoolExecutor`` 类,
16+
可被用来在一个单独的Python解释器中执行计算密集型函数。
17+
不过,要使用它,你首先要有一些计算密集型的任务。
18+
我们通过一个简单而实际的例子来演示它。假定你有个Apache web服务器日志目录的gzip压缩包:
19+
20+
::
21+
22+
logs/
23+
20120701.log.gz
24+
20120702.log.gz
25+
20120703.log.gz
26+
20120704.log.gz
27+
20120705.log.gz
28+
20120706.log.gz
29+
...
30+
31+
32+
进一步假设每个日志文件内容类似下面这样:
11333

114-
if __name__ == '__main__':
115-
robots = find_all_robots('logs')
116-
for ipaddr in robots:
117-
print(ipaddr)
34+
::
11835

119-
With this modification, the script produces the same result but runs about 3.5 times
120-
faster on our quad-core machine. The actual performance will vary according to the
121-
number of CPUs available on your machine.
36+
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
37+
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
38+
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
39+
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
40+
...
41+
42+
下面是一个脚本,在这些日志文件中查找出所有访问过robots.txt文件的主机:
43+
44+
.. code-block:: python
45+
46+
# findrobots.py
47+
48+
import gzip
49+
import io
50+
import glob
51+
52+
def find_robots(filename):
53+
'''
54+
Find all of the hosts that access robots.txt in a single log file
55+
'''
56+
robots = set()
57+
with gzip.open(filename) as f:
58+
for line in io.TextIOWrapper(f,encoding='ascii'):
59+
fields = line.split()
60+
if fields[6] == '/robots.txt':
61+
robots.add(fields[0])
62+
return robots
63+
64+
def find_all_robots(logdir):
65+
'''
66+
Find all hosts across and entire sequence of files
67+
'''
68+
files = glob.glob(logdir+'/*.log.gz')
69+
all_robots = set()
70+
for robots in map(find_robots, files):
71+
all_robots.update(robots)
72+
return all_robots
73+
74+
if __name__ == '__main__':
75+
robots = find_all_robots('logs')
76+
for ipaddr in robots:
77+
print(ipaddr)
78+
79+
前面的程序使用了通常的map-reduce风格来编写。
80+
函数 ``find_robots()`` 在一个文件名集合上做map操作,并将结果汇总为一个单独的结果,
81+
也就是 ``find_all_robots()`` 函数中的 ``all_robots`` 集合。
82+
现在,假设你想要修改这个程序让它使用多核CPU。
83+
很简单——只需要将map()操作替换为一个 ``concurrent.futures`` 库中生成的类似操作即可。
84+
下面是一个简单修改版本:
85+
86+
.. code-block:: python
87+
88+
# findrobots.py
89+
90+
import gzip
91+
import io
92+
import glob
93+
from concurrent import futures
94+
95+
def find_robots(filename):
96+
'''
97+
Find all of the hosts that access robots.txt in a single log file
98+
99+
'''
100+
robots = set()
101+
with gzip.open(filename) as f:
102+
for line in io.TextIOWrapper(f,encoding='ascii'):
103+
fields = line.split()
104+
if fields[6] == '/robots.txt':
105+
robots.add(fields[0])
106+
return robots
107+
108+
def find_all_robots(logdir):
109+
'''
110+
Find all hosts across and entire sequence of files
111+
'''
112+
files = glob.glob(logdir+'/*.log.gz')
113+
all_robots = set()
114+
with futures.ProcessPoolExecutor() as pool:
115+
for robots in pool.map(find_robots, files):
116+
all_robots.update(robots)
117+
return all_robots
118+
119+
if __name__ == '__main__':
120+
robots = find_all_robots('logs')
121+
for ipaddr in robots:
122+
print(ipaddr)
123+
124+
通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。
125+
实际的性能优化效果根据你的机器CPU数量的不同而不同。
122126

123127
|
124128
125129
----------
126130
讨论
127131
----------
128-
Typical usage of a ProcessPoolExecutor is as follows:
129-
from concurrent.futures import ProcessPoolExecutor
132+
``ProcessPoolExecutor`` 的典型用法如下:
130133

131-
with ProcessPoolExecutor() as pool:
132-
...
133-
do work in parallel using pool
134-
...
134+
.. code-block:: python
135135
136-
Under the covers, a ProcessPoolExecutor creates N independent running Python in‐
137-
terpreters where N is the number of available CPUs detected on the system. You can
138-
change the number of processes created by supplying an optional argument to Proces
139-
sPoolExecutor(N). The pool runs until the last statement in the with block is executed,
140-
at which point the process pool is shut down. However, the program will wait until all
141-
submitted work has been processed.
142-
Work to be submitted to a pool must be defined in a function. There are two methods
143-
for submission. If you are are trying to parallelize a list comprehension or a map()
144-
operation, you use pool.map():
145-
146-
# A function that performs a lot of work
147-
def work(x):
148-
...
149-
return result
136+
from concurrent.futures import ProcessPoolExecutor
150137
151-
# Nonparallel code
152-
results = map(work, data)
138+
with ProcessPoolExecutor() as pool:
139+
...
140+
do work in parallel using pool
141+
...
153142
154-
# Parallel implementation
155-
with ProcessPoolExecutor() as pool:
156-
results = pool.map(work, data)
143+
其原理是,一个 ``ProcessPoolExecutor`` 创建N个独立的Python解释器,
144+
N是系统上面可用CPU的个数。你可以通过提供可选参数给 ``ProcessPoolExecutor(N)`` 来修改
145+
处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成,
146+
然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。
157147

158-
Alternatively, you can manually submit single tasks using the pool.submit() method:
148+
被提交到池中的工作必须被定义为一个函数。有两种方法去提交。
149+
如果你想让一个列表推导或一个 ``map()`` 操作并行执行的话,可使用 ``pool.map()`` :
159150

160-
# Some function
161-
def work(x):
162-
...
163-
return result
151+
.. code-block:: python
164152
165-
with ProcessPoolExecutor() as pool:
166-
...
167-
# Example of submitting work to the pool
168-
future_result = pool.submit(work, arg)
153+
# A function that performs a lot of work
154+
def work(x):
155+
...
156+
return result
169157
170-
# Obtaining the result (blocks until done)
171-
r = future_result.result()
172-
...
158+
# Nonparallel code
159+
results = map(work, data)
160+
161+
# Parallel implementation
162+
with ProcessPoolExecutor() as pool:
163+
results = pool.map(work, data)
164+
165+
另外,你可以使用 ``pool.submit()`` 来手动的提交单个任务:
166+
167+
.. code-block:: python
168+
169+
# Some function
170+
def work(x):
171+
...
172+
return result
173+
174+
with ProcessPoolExecutor() as pool:
175+
...
176+
# Example of submitting work to the pool
177+
future_result = pool.submit(work, arg)
173178
174-
If you manually submit a job, the result is an instance of Future. To obtain the actual
175-
result, you call its result() method. This blocks until the result is computed and re‐
176-
turned by the pool.
177-
Instead of blocking, you can also arrange to have a callback function triggered upon
178-
completion instead. For example:
179+
# Obtaining the result (blocks until done)
180+
r = future_result.result()
181+
...
179182
180-
def when_done(r):
181-
print('Got:', r.result())
183+
如果你手动提交一个任务,结果是一个 ``Future`` 实例。
184+
要获取最终结果,你需要调用它的 ``result()`` 方法。
185+
它会阻塞进程直到结果被返回来。
182186

183-
with ProcessPoolExecutor() as pool:
184-
future_result = pool.submit(work, arg)
185-
future_result.add_done_callback(when_done)
187+
如果不想阻塞,你还可以使用一个回调函数,例如:
186188

187-
The user-supplied callback function receives an instance of Future that must be used
188-
to obtain the actual result (i.e., by calling its result() method).
189-
Although process pools can be easy to use, there are a number of important consider‐
190-
ations to be made in designing larger programs. In no particular order:
189+
.. code-block:: python
191190
192-
• This technique for parallelization only works well for problems that can be trivially
191+
def when_done(r):
192+
print('Got:', r.result())
193193
194-
decomposed into independent parts.
194+
with ProcessPoolExecutor() as pool:
195+
future_result = pool.submit(work, arg)
196+
future_result.add_done_callback(when_done)
195197
196-
• Work must be submitted in the form of simple functions. Parallel execution of
198+
回调函数接受一个 ``Future`` 实例,被用来获取最终的结果(比如通过调用它的result()方法)。
199+
尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点:
197200

198-
instance methods, closures, or other kinds of constructs are not supported.
201+
• 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
199202

200-
• Function arguments and return values must be compatible with pickle. Work is
201-
carried out in a separate interpreter using interprocess communication. Thus, data
202-
exchanged between interpreters has to be serialized.
203+
• 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
203204

204-
• Functions submitted for work should not maintain persistent state or have side
205-
effects. With the exception of simple things such as logging, you don’t really have
206-
any control over the behavior of child processes once started. Thus, to preserve your
207-
sanity, it is probably best to keep things simple and carry out work in pure-functions
208-
that don’t alter their environment.
205+
• 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
209206

210-
• Process pools are created by calling the fork() system call on Unix. This makes a
211-
clone of the Python interpreter, including all of the program state at the time of the
212-
fork. On Windows, an independent copy of the interpreter that does not clone state
213-
is launched. The actual forking process does not occur until the first pool.map()
214-
or pool.submit() method is called.
207+
• 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情,
208+
一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。
215209

216-
Great care should be made when combining process pools and programs that use
217-
threads. In particular, you should probably create and launch process pools prior
218-
to the creation of any threads (e.g., create the pool in the main thread at program
219-
startup).
210+
在Unix上进程池通过调用 ``fork()`` 系统调用被创建,
211+
它会克隆Python解释器,包括fork时的所有程序状态。
212+
而在Windows上,克隆解释器时不会克隆状态。
213+
实际的fork操作会在第一次调用 ``pool.map()`` 或 ``pool.submit()`` 后发生。
220214

215+
• 当你混合使用进程池和多线程的时候要特别小心。
216+
你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。

0 commit comments

Comments
 (0)