@@ -36,24 +36,208 @@ Python3.2带来了`concurrent.futures` 模块,这个模块包含了线程池
36
36
1 . Python 2.2:第一次提出了生成器(最初称之为迭代器)的概念(PEP 255)。
37
37
2 . Python 2.5:引入了将对象发送回暂停了的生成器这一特性即生成器的` send() ` 方法(PEP 342)。
38
38
3 . Python 3.3:添加了` yield from ` 特性,允许从迭代器中返回任何值(注意生成器本身也是迭代器),这样我们就可以串联生成器并且重构出更好的生成器。
39
- 4 . Python 3.4:引入` asyncio.coroutine ` 装饰器用来标记作为协程的函数,协程函数和` asyncio ` 及其事件循环一起使用,来实现异步I/O操作。
39
+ 4 . Python 3.4:引入` asyncio.coroutine ` 装饰器用来标记作为协程的函数,协程函数和` asyncio ` 及其事件循环一起使用,来实现异步I/O操作。
40
40
5 . Python 3.5:引入了` async ` 和` await ` ,可以使用` async def ` 来定义一个协程函数,这个函数中不能包含任何形式的` yield ` 语句,但是可以使用` return ` 或` await ` 从协程中返回值。
41
41
42
-
42
+ #### 示例代码
43
+
44
+ 1 . 生成器 - 数据的生产者。
45
+
46
+ ``` Python
47
+
48
+ from time import sleep
49
+
50
+
51
+ # 倒计数生成器
52
+ def countdown (n ):
53
+ while n > 0 :
54
+ yield n
55
+ n -= 1
56
+
57
+
58
+ def main ():
59
+ for num in countdown(5 ):
60
+ print (f ' Countdown: { num} ' )
61
+ sleep(1 )
62
+ print (' Countdown Over!' )
63
+
64
+
65
+ if __name__ == ' __main__' :
66
+ main()
67
+
68
+ ```
69
+
70
+ 生成器还可以叠加来组成生成器管道,代码如下所示。
71
+
72
+ ``` Python
73
+
74
+ # Fibonacci数生成器
75
+ def fib ():
76
+ a, b = 0 , 1
77
+ while True :
78
+ a, b = b, a + b
79
+ yield a
80
+
81
+
82
+ # 偶数生成器
83
+ def even (gen ):
84
+ for val in gen:
85
+ if val % 2 == 0 :
86
+ yield val
87
+
88
+
89
+ def main ():
90
+ gen = even(fib())
91
+ for _ in range (10 ):
92
+ print (next (gen))
93
+
94
+
95
+ if __name__ == ' __main__' :
96
+ main()
97
+
98
+ ```
99
+
100
+ 2 . 协程 - 数据的消费者。
101
+
102
+ ``` Python
103
+
104
+ from time import sleep
105
+
106
+
107
+ # 生成器 - 数据生产者
108
+ def countdown_gen (n , consumer ):
109
+ consumer.send(None )
110
+ while n > 0 :
111
+ consumer.send(n)
112
+ n -= 1
113
+ consumer.send(None )
114
+
115
+
116
+ # 协程 - 数据消费者
117
+ def countdown_con ():
118
+ while True :
119
+ n = yield
120
+ if n:
121
+ print (f ' Countdown { n} ' )
122
+ sleep(1 )
123
+ else :
124
+ print (' Countdown Over!' )
125
+
126
+
127
+ def main ():
128
+ countdown_gen(5 , countdown_con())
129
+
130
+
131
+ if __name__ == ' __main__' :
132
+ main()
133
+
134
+ ```
135
+
136
+ > 说明:上面代码中countdown_gen函数中的第1行consumer.send(None)是为了激活生成器,通俗的说就是让生成器执行到有yield关键字的地方挂起,当然也可以通过next(consumer)来达到同样的效果。如果不愿意每次都用这样的代码来“预激”生成器,可以写一个包装器来完成该操作,代码如下所示。
137
+
138
+ ``` Python
139
+
140
+ from functools import wraps
141
+
142
+
143
+ def coroutine (fn ):
144
+
145
+ @wraps (fn)
146
+ def wrapper (* args , ** kwargs ):
147
+ gen = fn(* args, ** kwargs)
148
+ next (gen)
149
+ return gen
150
+
151
+ return wrapper
152
+ ```
153
+
154
+ 这样就可以使用` @coroutine ` 装饰器对协程进行预激操作,不需要再写重复代码来激活协程。
155
+
156
+ 3 . 异步I/O - 非阻塞式I/O操作。
157
+
158
+ ``` Python
159
+
160
+ import asyncio
161
+
162
+
163
+ @asyncio.coroutine
164
+ def countdown (name , n ):
165
+ while n > 0 :
166
+ print (f ' Countdown[ { name} ]: { n} ' )
167
+ yield from asyncio.sleep(1 )
168
+ n -= 1
169
+
170
+
171
+ def main ():
172
+ loop = asyncio.get_event_loop()
173
+ tasks = [
174
+ countdown(" A" , 10 ), countdown(" B" , 5 ),
175
+ ]
176
+ loop.run_until_complete(asyncio.wait(tasks))
177
+ loop.close()
178
+
179
+
180
+ if __name__ == ' __main__' :
181
+ main()
182
+
183
+ ```
184
+
185
+ 4 . ` async ` 和` await ` 。
186
+
187
+ ``` Python
188
+
189
+ import asyncio
190
+ import aiohttp
191
+
192
+
193
+ async def download (url ):
194
+ print (' Fetch:' , url)
195
+ async with aiohttp.ClientSession() as session:
196
+ async with session.get(url) as resp:
197
+ print (url, ' --->' , resp.status)
198
+ print (url, ' --->' , resp.cookies)
199
+ print (' \n\n ' , await resp.text())
200
+
201
+
202
+ def main ():
203
+ loop = asyncio.get_event_loop()
204
+ urls = [
205
+ ' https://www.baidu.com' ,
206
+ ' http://www.sohu.com/' ,
207
+ ' http://www.sina.com.cn/' ,
208
+ ' https://www.taobao.com/' ,
209
+ ' https://www.jd.com/'
210
+ ]
211
+ tasks = [download(url) for url in urls]
212
+ loop.run_until_complete(asyncio.wait(tasks))
213
+ loop.close()
214
+
215
+
216
+ if __name__ == ' __main__' :
217
+ main()
218
+
219
+ ```
220
+
221
+ 上面的代码使用了[ AIOHTTP] ( https://github.com/aio-libs/aiohttp ) 这个非常著名的第三方库,它实现了HTTP客户端和HTTP服务器的功能,对异步操作提供了非常好的支持,有兴趣可以阅读它的[ 官方文档] ( https://aiohttp.readthedocs.io/en/stable/ ) 。
43
222
44
223
### 实例 - 多线程爬取“手机搜狐网”所有页面。
45
224
46
225
``` Python
47
226
227
+ import pickle
228
+ import zlib
48
229
from enum import Enum, unique
49
- from queue import Queue
230
+ from hashlib import sha1
50
231
from random import random
51
- from threading import Thread, current_thread
232
+ from threading import Thread, current_thread, local
52
233
from time import sleep
53
234
from urllib.parse import urlparse
54
235
236
+ import pymongo
237
+ import redis
55
238
import requests
56
239
from bs4 import BeautifulSoup
240
+ from bson import Binary
57
241
58
242
59
243
@unique
@@ -113,7 +297,6 @@ class Spider(object):
113
297
114
298
def parse (self , html_page , * , domain = ' m.sohu.com' ):
115
299
soup = BeautifulSoup(html_page, ' lxml' )
116
- url_links = []
117
300
for a_tag in soup.body.select(' a[href]' ):
118
301
parser = urlparse(a_tag.attrs[' href' ])
119
302
scheme = parser.scheme or ' http'
@@ -122,34 +305,51 @@ class Spider(object):
122
305
path = parser.path
123
306
query = ' ?' + parser.query if parser.query else ' '
124
307
full_url = f ' { scheme} :// { netloc}{ path}{ query} '
125
- if full_url not in visited_urls:
126
- url_links.append( full_url)
127
- return url_links
308
+ redis_client = thread_local.redis_client
309
+ if not redis_client.sismember( ' visited_urls ' , full_url):
310
+ redis_client.rpush( ' m_sohu_task ' , full_url)
128
311
129
312
def extract (self , html_page ):
130
313
pass
131
314
132
315
def store (self , data_dict ):
316
+ # redis_client = thread_local.redis_client
317
+ # mongo_db = thread_local.mongo_db
133
318
pass
134
319
135
320
136
321
class SpiderThread (Thread ):
137
322
138
- def __init__ (self , name , spider , tasks_queue ):
323
+ def __init__ (self , name , spider ):
139
324
super ().__init__ (name = name, daemon = True )
140
325
self .spider = spider
141
- self .tasks_queue = tasks_queue
142
326
143
327
def run (self ):
328
+ redis_client = redis.Redis(host = ' 1.2.3.4' , port = 6379 , password = ' 1qaz2wsx' )
329
+ mongo_client = pymongo.MongoClient(host = ' 1.2.3.4' , port = 27017 )
330
+ thread_local.redis_client = redis_client
331
+ thread_local.mongo_db = mongo_client.msohu
144
332
while True :
145
- current_url = self .tasks_queue.get()
146
- visited_urls.add(current_url)
333
+ current_url = redis_client.lpop(' m_sohu_task' )
334
+ while not current_url:
335
+ current_url = redis_client.lpop(' m_sohu_task' )
147
336
self .spider.status = SpiderStatus.WORKING
148
- html_page = self .spider.fetch(current_url)
149
- if html_page not in [None , ' ' ]:
150
- url_links = self .spider.parse(html_page)
151
- for url_link in url_links:
152
- self .tasks_queue.put(url_link)
337
+ current_url = current_url.decode(' utf-8' )
338
+ if not redis_client.sismember(' visited_urls' , current_url):
339
+ redis_client.sadd(' visited_urls' , current_url)
340
+ html_page = self .spider.fetch(current_url)
341
+ if html_page not in [None , ' ' ]:
342
+ hasher = hasher_proto.copy()
343
+ hasher.update(current_url.encode(' utf-8' ))
344
+ doc_id = hasher.hexdigest()
345
+ sohu_data_coll = mongo_client.msohu.webpages
346
+ if not sohu_data_coll.find_one({' _id' : doc_id}):
347
+ sohu_data_coll.insert_one({
348
+ ' _id' : doc_id,
349
+ ' url' : current_url,
350
+ ' page' : Binary(zlib.compress(pickle.dumps(html_page)))
351
+ })
352
+ self .spider.parse(html_page)
153
353
self .spider.status = SpiderStatus.IDLE
154
354
155
355
@@ -158,19 +358,22 @@ def is_any_alive(spider_threads):
158
358
for spider_thread in spider_threads])
159
359
160
360
161
- visited_urls = set ()
361
+ thread_local = local()
362
+ hasher_proto = sha1()
162
363
163
364
164
365
def main ():
165
- task_queue = Queue()
166
- task_queue.put(' http://m.sohu.com/' )
167
- spider_threads = [SpiderThread(' thread-%d ' % i, Spider(), task_queue)
366
+ redis_client = redis.Redis(host = ' 1.2.3.4' , port = 6379 , password = ' 1qaz2wsx' )
367
+ if not redis_client.exists(' m_sohu_task' ):
368
+ redis_client.rpush(' m_sohu_task' , ' http://m.sohu.com/' )
369
+
370
+ spider_threads = [SpiderThread(' thread-%d ' % i, Spider())
168
371
for i in range (10 )]
169
372
for spider_thread in spider_threads:
170
373
spider_thread.start()
171
374
172
- while not task_queue.empty( ) or is_any_alive(spider_threads):
173
- sleep( 5 )
375
+ while redis_client.exists( ' m_sohu_task ' ) or is_any_alive(spider_threads):
376
+ pass
174
377
175
378
print (' Over!' )
176
379
0 commit comments