@@ -76,22 +76,21 @@ def __init__(self, settings):
76
76
smcls = load_object (settings ['SPIDER_MANAGER_CLASS' ])
77
77
self .spiders = smcls .from_settings (settings .frozencopy ())
78
78
self .crawlers = set ()
79
- self .crawl_deferreds = set ()
79
+ self ._active = set ()
80
80
81
81
def crawl (self , spidercls , * args , ** kwargs ):
82
- crawler = self ._create_logged_crawler (spidercls )
82
+ crawler = self ._create_crawler (spidercls )
83
+ self ._setup_crawler_logging (crawler )
83
84
self .crawlers .add (crawler )
84
-
85
85
d = crawler .crawl (* args , ** kwargs )
86
- self .crawl_deferreds .add (d )
87
- return d
86
+ self ._active .add (d )
88
87
89
- def _create_logged_crawler ( self , spidercls ):
90
- crawler = self ._create_crawler ( spidercls )
91
- log_observer = log . start_from_crawler ( crawler )
92
- if log_observer :
93
- crawler . signals . connect ( log_observer . stop , signals . engine_stopped )
94
- return crawler
88
+ def _done ( result ):
89
+ self .crawlers . discard ( crawler )
90
+ self . _active . discard ( d )
91
+ return result
92
+
93
+ return d . addBoth ( _done )
95
94
96
95
def _create_crawler (self , spidercls ):
97
96
if isinstance (spidercls , six .string_types ):
@@ -100,13 +99,22 @@ def _create_crawler(self, spidercls):
100
99
crawler_settings = self .settings .copy ()
101
100
spidercls .update_settings (crawler_settings )
102
101
crawler_settings .freeze ()
102
+ return Crawler (spidercls , crawler_settings )
103
103
104
- crawler = Crawler (spidercls , crawler_settings )
105
- return crawler
104
+ def _setup_crawler_logging (self , crawler ):
105
+ log_observer = log .start_from_crawler (crawler )
106
+ if log_observer :
107
+ crawler .signals .connect (log_observer .stop , signals .engine_stopped )
106
108
107
109
def stop (self ):
108
110
return defer .DeferredList (c .stop () for c in self .crawlers )
109
111
112
+ @defer .inlineCallbacks
113
+ def join (self ):
114
+ """Wait for all managed crawlers to complete"""
115
+ while self ._active :
116
+ yield defer .DeferredList (self ._active )
117
+
110
118
111
119
class CrawlerProcess (CrawlerRunner ):
112
120
"""A class to run multiple scrapy crawlers in a process simultaneously"""
@@ -135,13 +143,15 @@ def _signal_kill(self, signum, _):
135
143
136
144
def start (self , stop_after_crawl = True ):
137
145
if stop_after_crawl :
138
- d = defer .DeferredList (self .crawl_deferreds )
146
+ d = self .join ()
147
+ # Don't start the reactor if the deferreds are already fired
139
148
if d .called :
140
- # Don't start the reactor if the deferreds are already fired
141
149
return
142
150
d .addBoth (lambda _ : self ._stop_reactor ())
151
+
143
152
if self .settings .getbool ('DNSCACHE_ENABLED' ):
144
153
reactor .installResolver (CachingThreadedResolver (reactor ))
154
+
145
155
reactor .addSystemEventTrigger ('before' , 'shutdown' , self .stop )
146
156
reactor .run (installSignalHandlers = False ) # blocking call
147
157
0 commit comments