|
5 | 5 | ----------
|
6 | 6 | 问题
|
7 | 7 | ----------
|
8 |
| -You have a program that performs a lot of CPU-intensive work, and you want to make |
9 |
| -it run faster by having it take advantage of multiple CPUs. |
| 8 | +你有个程序要执行CPU密集型工作,你想让他利用多核CPU的优势来运行的快一点。 |
10 | 9 |
|
11 | 10 | |
|
12 | 11 |
|
13 | 12 | ----------
|
14 | 13 | 解决方案
|
15 | 14 | ----------
|
16 |
| -The concurrent.futures library provides a ProcessPoolExecutor class that can be |
17 |
| -used to execute computationally intensive functions in a separately running instance of |
18 |
| -the Python interpreter. However, in order to use it, you first need to have some com‐ |
19 |
| -putationally intensive work. Let’s illustrate with a simple yet practical example. |
20 |
| -Suppose you have an entire directory of gzip-compressed Apache web server logs: |
21 |
| - |
22 |
| -logs/ |
23 |
| - 20120701.log.gz |
24 |
| - 20120702.log.gz |
25 |
| - 20120703.log.gz |
26 |
| - 20120704.log.gz |
27 |
| - 20120705.log.gz |
28 |
| - 20120706.log.gz |
29 |
| - ... |
30 |
| - |
31 |
| -Further suppose each log file contains lines like this: |
32 |
| - |
33 |
| -124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71 |
34 |
| -210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875 |
35 |
| -210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369 |
36 |
| -61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 - |
37 |
| -... |
38 |
| - |
39 |
| -Here is a simple script that takes this data and identifies all hosts that have accessed the |
40 |
| -robots.txt file: |
41 |
| - |
42 |
| -# findrobots.py |
43 |
| - |
44 |
| -import gzip |
45 |
| -import io |
46 |
| -import glob |
47 |
| - |
48 |
| -def find_robots(filename): |
49 |
| - ''' |
50 |
| - Find all of the hosts that access robots.txt in a single log file |
51 |
| - ''' |
52 |
| - robots = set() |
53 |
| - with gzip.open(filename) as f: |
54 |
| - for line in io.TextIOWrapper(f,encoding='ascii'): |
55 |
| - fields = line.split() |
56 |
| - if fields[6] == '/robots.txt': |
57 |
| - robots.add(fields[0]) |
58 |
| - return robots |
59 |
| - |
60 |
| -def find_all_robots(logdir): |
61 |
| - ''' |
62 |
| - Find all hosts across and entire sequence of files |
63 |
| - ''' |
64 |
| - files = glob.glob(logdir+'/*.log.gz') |
65 |
| - all_robots = set() |
66 |
| - for robots in map(find_robots, files): |
67 |
| - all_robots.update(robots) |
68 |
| - return all_robots |
69 |
| -
|
70 |
| -if __name__ == '__main__': |
71 |
| - robots = find_all_robots('logs') |
72 |
| - for ipaddr in robots: |
73 |
| - print(ipaddr) |
74 |
| - |
75 |
| -The preceding program is written in the commonly used map-reduce style. The function |
76 |
| -find_robots() is mapped across a collection of filenames and the results are combined |
77 |
| -into a single result (the all_robots set in the find_all_robots() function). |
78 |
| -Now, suppose you want to modify this program to use multiple CPUs. It turns out to |
79 |
| -be easy—simply replace the map() operation with a similar operation carried out on a |
80 |
| -process pool from the concurrent.futures library. Here is a slightly modified version |
81 |
| -of the code: |
82 |
| - |
83 |
| -# findrobots.py |
84 |
| - |
85 |
| -import gzip |
86 |
| -import io |
87 |
| -import glob |
88 |
| -from concurrent import futures |
89 |
| - |
90 |
| -def find_robots(filename): |
91 |
| - ''' |
92 |
| - Find all of the hosts that access robots.txt in a single log file |
93 |
| - |
94 |
| - ''' |
95 |
| - robots = set() |
96 |
| - with gzip.open(filename) as f: |
97 |
| - for line in io.TextIOWrapper(f,encoding='ascii'): |
98 |
| - fields = line.split() |
99 |
| - if fields[6] == '/robots.txt': |
100 |
| - robots.add(fields[0]) |
101 |
| - return robots |
102 |
| - |
103 |
| -def find_all_robots(logdir): |
104 |
| - ''' |
105 |
| - Find all hosts across and entire sequence of files |
106 |
| - ''' |
107 |
| - files = glob.glob(logdir+'/*.log.gz') |
108 |
| - all_robots = set() |
109 |
| - with futures.ProcessPoolExecutor() as pool: |
110 |
| - for robots in pool.map(find_robots, files): |
111 |
| - all_robots.update(robots) |
112 |
| - return all_robots |
| 15 | +``concurrent.futures`` 库提供了一个 ``ProcessPoolExecutor`` 类, |
| 16 | +可被用来在一个单独的Python解释器中执行计算密集型函数。 |
| 17 | +不过,要使用它,你首先要有一些计算密集型的任务。 |
| 18 | +我们通过一个简单而实际的例子来演示它。假定你有个Apache web服务器日志目录的gzip压缩包: |
| 19 | + |
| 20 | +:: |
| 21 | + |
| 22 | + logs/ |
| 23 | + 20120701.log.gz |
| 24 | + 20120702.log.gz |
| 25 | + 20120703.log.gz |
| 26 | + 20120704.log.gz |
| 27 | + 20120705.log.gz |
| 28 | + 20120706.log.gz |
| 29 | + ... |
| 30 | + |
| 31 | + |
| 32 | +进一步假设每个日志文件内容类似下面这样: |
113 | 33 |
|
114 |
| -if __name__ == '__main__': |
115 |
| - robots = find_all_robots('logs') |
116 |
| - for ipaddr in robots: |
117 |
| - print(ipaddr) |
| 34 | +:: |
118 | 35 |
|
119 |
| -With this modification, the script produces the same result but runs about 3.5 times |
120 |
| -faster on our quad-core machine. The actual performance will vary according to the |
121 |
| -number of CPUs available on your machine. |
| 36 | + 124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71 |
| 37 | + 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875 |
| 38 | + 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369 |
| 39 | + 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 - |
| 40 | + ... |
| 41 | + |
| 42 | +下面是一个脚本,在这些日志文件中查找出所有访问过robots.txt文件的主机: |
| 43 | + |
| 44 | +.. code-block:: python |
| 45 | +
|
| 46 | + # findrobots.py |
| 47 | +
|
| 48 | + import gzip |
| 49 | + import io |
| 50 | + import glob |
| 51 | +
|
| 52 | + def find_robots(filename): |
| 53 | + ''' |
| 54 | + Find all of the hosts that access robots.txt in a single log file |
| 55 | + ''' |
| 56 | + robots = set() |
| 57 | + with gzip.open(filename) as f: |
| 58 | + for line in io.TextIOWrapper(f,encoding='ascii'): |
| 59 | + fields = line.split() |
| 60 | + if fields[6] == '/robots.txt': |
| 61 | + robots.add(fields[0]) |
| 62 | + return robots |
| 63 | +
|
| 64 | + def find_all_robots(logdir): |
| 65 | + ''' |
| 66 | + Find all hosts across and entire sequence of files |
| 67 | + ''' |
| 68 | + files = glob.glob(logdir+'/*.log.gz') |
| 69 | + all_robots = set() |
| 70 | + for robots in map(find_robots, files): |
| 71 | + all_robots.update(robots) |
| 72 | + return all_robots |
| 73 | +
|
| 74 | + if __name__ == '__main__': |
| 75 | + robots = find_all_robots('logs') |
| 76 | + for ipaddr in robots: |
| 77 | + print(ipaddr) |
| 78 | +
|
| 79 | +前面的程序使用了通常的map-reduce风格来编写。 |
| 80 | +函数 ``find_robots()`` 在一个文件名集合上做map操作,并将结果汇总为一个单独的结果, |
| 81 | +也就是 ``find_all_robots()`` 函数中的 ``all_robots`` 集合。 |
| 82 | +现在,假设你想要修改这个程序让它使用多核CPU。 |
| 83 | +很简单——只需要将map()操作替换为一个 ``concurrent.futures`` 库中生成的类似操作即可。 |
| 84 | +下面是一个简单修改版本: |
| 85 | + |
| 86 | +.. code-block:: python |
| 87 | +
|
| 88 | + # findrobots.py |
| 89 | +
|
| 90 | + import gzip |
| 91 | + import io |
| 92 | + import glob |
| 93 | + from concurrent import futures |
| 94 | +
|
| 95 | + def find_robots(filename): |
| 96 | + ''' |
| 97 | + Find all of the hosts that access robots.txt in a single log file |
| 98 | +
|
| 99 | + ''' |
| 100 | + robots = set() |
| 101 | + with gzip.open(filename) as f: |
| 102 | + for line in io.TextIOWrapper(f,encoding='ascii'): |
| 103 | + fields = line.split() |
| 104 | + if fields[6] == '/robots.txt': |
| 105 | + robots.add(fields[0]) |
| 106 | + return robots |
| 107 | +
|
| 108 | + def find_all_robots(logdir): |
| 109 | + ''' |
| 110 | + Find all hosts across and entire sequence of files |
| 111 | + ''' |
| 112 | + files = glob.glob(logdir+'/*.log.gz') |
| 113 | + all_robots = set() |
| 114 | + with futures.ProcessPoolExecutor() as pool: |
| 115 | + for robots in pool.map(find_robots, files): |
| 116 | + all_robots.update(robots) |
| 117 | + return all_robots |
| 118 | +
|
| 119 | + if __name__ == '__main__': |
| 120 | + robots = find_all_robots('logs') |
| 121 | + for ipaddr in robots: |
| 122 | + print(ipaddr) |
| 123 | +
|
| 124 | +通过这个修改后,运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。 |
| 125 | +实际的性能优化效果根据你的机器CPU数量的不同而不同。 |
122 | 126 |
|
123 | 127 | |
|
124 | 128 |
|
125 | 129 | ----------
|
126 | 130 | 讨论
|
127 | 131 | ----------
|
128 |
| -Typical usage of a ProcessPoolExecutor is as follows: |
129 |
| -from concurrent.futures import ProcessPoolExecutor |
| 132 | +``ProcessPoolExecutor`` 的典型用法如下: |
130 | 133 |
|
131 |
| -with ProcessPoolExecutor() as pool: |
132 |
| - ... |
133 |
| - do work in parallel using pool |
134 |
| - ... |
| 134 | +.. code-block:: python |
135 | 135 |
|
136 |
| -Under the covers, a ProcessPoolExecutor creates N independent running Python in‐ |
137 |
| -terpreters where N is the number of available CPUs detected on the system. You can |
138 |
| -change the number of processes created by supplying an optional argument to Proces |
139 |
| -sPoolExecutor(N). The pool runs until the last statement in the with block is executed, |
140 |
| -at which point the process pool is shut down. However, the program will wait until all |
141 |
| -submitted work has been processed. |
142 |
| -Work to be submitted to a pool must be defined in a function. There are two methods |
143 |
| -for submission. If you are are trying to parallelize a list comprehension or a map() |
144 |
| -operation, you use pool.map(): |
145 |
| - |
146 |
| -# A function that performs a lot of work |
147 |
| -def work(x): |
148 |
| - ... |
149 |
| - return result |
| 136 | + from concurrent.futures import ProcessPoolExecutor |
150 | 137 |
|
151 |
| -# Nonparallel code |
152 |
| -results = map(work, data) |
| 138 | + with ProcessPoolExecutor() as pool: |
| 139 | + ... |
| 140 | + do work in parallel using pool |
| 141 | + ... |
153 | 142 |
|
154 |
| -# Parallel implementation |
155 |
| -with ProcessPoolExecutor() as pool: |
156 |
| - results = pool.map(work, data) |
| 143 | +其原理是,一个 ``ProcessPoolExecutor`` 创建N个独立的Python解释器, |
| 144 | +N是系统上面可用CPU的个数。你可以通过提供可选参数给 ``ProcessPoolExecutor(N)`` 来修改 |
| 145 | +处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, |
| 146 | +然后处理池被关闭。不过,程序会一直等待直到所有提交的工作被处理完成。 |
157 | 147 |
|
158 |
| -Alternatively, you can manually submit single tasks using the pool.submit() method: |
| 148 | +被提交到池中的工作必须被定义为一个函数。有两种方法去提交。 |
| 149 | +如果你想让一个列表推导或一个 ``map()`` 操作并行执行的话,可使用 ``pool.map()`` : |
159 | 150 |
|
160 |
| -# Some function |
161 |
| -def work(x): |
162 |
| - ... |
163 |
| - return result |
| 151 | +.. code-block:: python |
164 | 152 |
|
165 |
| -with ProcessPoolExecutor() as pool: |
166 |
| - ... |
167 |
| - # Example of submitting work to the pool |
168 |
| - future_result = pool.submit(work, arg) |
| 153 | + # A function that performs a lot of work |
| 154 | + def work(x): |
| 155 | + ... |
| 156 | + return result |
169 | 157 |
|
170 |
| - # Obtaining the result (blocks until done) |
171 |
| - r = future_result.result() |
172 |
| - ... |
| 158 | + # Nonparallel code |
| 159 | + results = map(work, data) |
| 160 | +
|
| 161 | + # Parallel implementation |
| 162 | + with ProcessPoolExecutor() as pool: |
| 163 | + results = pool.map(work, data) |
| 164 | +
|
| 165 | +另外,你可以使用 ``pool.submit()`` 来手动的提交单个任务: |
| 166 | + |
| 167 | +.. code-block:: python |
| 168 | +
|
| 169 | + # Some function |
| 170 | + def work(x): |
| 171 | + ... |
| 172 | + return result |
| 173 | +
|
| 174 | + with ProcessPoolExecutor() as pool: |
| 175 | + ... |
| 176 | + # Example of submitting work to the pool |
| 177 | + future_result = pool.submit(work, arg) |
173 | 178 |
|
174 |
| -If you manually submit a job, the result is an instance of Future. To obtain the actual |
175 |
| -result, you call its result() method. This blocks until the result is computed and re‐ |
176 |
| -turned by the pool. |
177 |
| -Instead of blocking, you can also arrange to have a callback function triggered upon |
178 |
| -completion instead. For example: |
| 179 | + # Obtaining the result (blocks until done) |
| 180 | + r = future_result.result() |
| 181 | + ... |
179 | 182 |
|
180 |
| -def when_done(r): |
181 |
| - print('Got:', r.result()) |
| 183 | +如果你手动提交一个任务,结果是一个 ``Future`` 实例。 |
| 184 | +要获取最终结果,你需要调用它的 ``result()`` 方法。 |
| 185 | +它会阻塞进程直到结果被返回来。 |
182 | 186 |
|
183 |
| -with ProcessPoolExecutor() as pool: |
184 |
| - future_result = pool.submit(work, arg) |
185 |
| - future_result.add_done_callback(when_done) |
| 187 | +如果不想阻塞,你还可以使用一个回调函数,例如: |
186 | 188 |
|
187 |
| -The user-supplied callback function receives an instance of Future that must be used |
188 |
| -to obtain the actual result (i.e., by calling its result() method). |
189 |
| -Although process pools can be easy to use, there are a number of important consider‐ |
190 |
| -ations to be made in designing larger programs. In no particular order: |
| 189 | +.. code-block:: python |
191 | 190 |
|
192 |
| -• This technique for parallelization only works well for problems that can be trivially |
| 191 | + def when_done(r): |
| 192 | + print('Got:', r.result()) |
193 | 193 |
|
194 |
| -decomposed into independent parts. |
| 194 | + with ProcessPoolExecutor() as pool: |
| 195 | + future_result = pool.submit(work, arg) |
| 196 | + future_result.add_done_callback(when_done) |
195 | 197 |
|
196 |
| -• Work must be submitted in the form of simple functions. Parallel execution of |
| 198 | +回调函数接受一个 ``Future`` 实例,被用来获取最终的结果(比如通过调用它的result()方法)。 |
| 199 | +尽管处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方,如下几点: |
197 | 200 |
|
198 |
| -instance methods, closures, or other kinds of constructs are not supported. |
| 201 | +• 这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。 |
199 | 202 |
|
200 |
| -• Function arguments and return values must be compatible with pickle. Work is |
201 |
| -carried out in a separate interpreter using interprocess communication. Thus, data |
202 |
| -exchanged between interpreters has to be serialized. |
| 203 | +• 被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。 |
203 | 204 |
|
204 |
| -• Functions submitted for work should not maintain persistent state or have side |
205 |
| -effects. With the exception of simple things such as logging, you don’t really have |
206 |
| -any control over the behavior of child processes once started. Thus, to preserve your |
207 |
| -sanity, it is probably best to keep things simple and carry out work in pure-functions |
208 |
| -that don’t alter their environment. |
| 205 | +• 函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化 |
209 | 206 |
|
210 |
| -• Process pools are created by calling the fork() system call on Unix. This makes a |
211 |
| -clone of the Python interpreter, including all of the program state at the time of the |
212 |
| -fork. On Windows, an independent copy of the interpreter that does not clone state |
213 |
| -is launched. The actual forking process does not occur until the first pool.map() |
214 |
| -or pool.submit() method is called. |
| 207 | +• 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情, |
| 208 | +一旦启动你不能控制子进程的任何行为,因此最好保持简单和纯洁——函数不要去修改环境。 |
215 | 209 |
|
216 |
| -• Great care should be made when combining process pools and programs that use |
217 |
| -threads. In particular, you should probably create and launch process pools prior |
218 |
| -to the creation of any threads (e.g., create the pool in the main thread at program |
219 |
| -startup). |
| 210 | +• 在Unix上进程池通过调用 ``fork()`` 系统调用被创建, |
| 211 | +它会克隆Python解释器,包括fork时的所有程序状态。 |
| 212 | +而在Windows上,克隆解释器时不会克隆状态。 |
| 213 | +实际的fork操作会在第一次调用 ``pool.map()`` 或 ``pool.submit()`` 后发生。 |
220 | 214 |
|
| 215 | +• 当你混合使用进程池和多线程的时候要特别小心。 |
| 216 | +你应该在创建任何线程之前先创建并激活进程池(比如在程序启动的main线程中创建进程池)。 |
0 commit comments