@@ -30,6 +30,14 @@ def is_weakrefable(obj):
30
30
return False
31
31
32
32
33
+ try :
34
+ TimeoutError = TimeoutError
35
+ except NameError :
36
+ # Python 2 backward compat
37
+ class TimeoutError (OSError ):
38
+ pass
39
+
40
+
33
41
class _WeakKeyDictionary :
34
42
"""A variant of weakref.WeakKeyDictionary for unhashable objects.
35
43
@@ -102,12 +110,24 @@ def __reduce__(self):
102
110
return Batch , (self .tasks ,)
103
111
104
112
113
+ def _joblib_probe_task ():
114
+ # Noop used by the joblib connector to probe when workers are ready.
115
+ pass
116
+
117
+
105
118
class DaskDistributedBackend (ParallelBackendBase , AutoBatchingMixin ):
106
119
MIN_IDEAL_BATCH_DURATION = 0.2
107
120
MAX_IDEAL_BATCH_DURATION = 1.0
108
121
109
122
def __init__ (self , scheduler_host = None , scatter = None ,
110
- client = None , loop = None , ** submit_kwargs ):
123
+ client = None , loop = None , wait_for_workers_timeout = 10 ,
124
+ ** submit_kwargs ):
125
+ if distributed is None :
126
+ msg = ("You are trying to use 'dask' as a joblib parallel backend "
127
+ "but dask is not installed. Please install dask "
128
+ "to fix this error." )
129
+ raise ValueError (msg )
130
+
111
131
if client is None :
112
132
if scheduler_host :
113
133
client = Client (scheduler_host , loop = loop ,
@@ -139,6 +159,7 @@ def __init__(self, scheduler_host=None, scatter=None,
139
159
self ._scatter = []
140
160
self .data_futures = {}
141
161
self .task_futures = set ()
162
+ self .wait_for_workers_timeout = wait_for_workers_timeout
142
163
self .submit_kwargs = submit_kwargs
143
164
144
165
def __reduce__ (self ):
@@ -159,6 +180,26 @@ def stop_call(self):
159
180
self .call_data_futures .clear ()
160
181
161
182
def effective_n_jobs (self , n_jobs ):
183
+ effective_n_jobs = sum (self .client .ncores ().values ())
184
+ if effective_n_jobs != 0 or not self .wait_for_workers_timeout :
185
+ return effective_n_jobs
186
+
187
+ # If there is no worker, schedule a probe task to wait for the workers
188
+ # to come up and be available. If the dask cluster is in adaptive mode
189
+ # task might cause the cluster to provision some workers.
190
+ try :
191
+ self .client .submit (_joblib_probe_task ).result (
192
+ timeout = self .wait_for_workers_timeout )
193
+ except gen .TimeoutError :
194
+ error_msg = (
195
+ "DaskDistributedBackend has no worker after {} seconds. "
196
+ "Make sure that workers are started and can properly connect "
197
+ "to the scheduler and increase the joblib/dask connection "
198
+ "timeout with:\n \n "
199
+ "parallel_backend('dask', wait_for_workers_timeout={})"
200
+ ).format (self .wait_for_workers_timeout ,
201
+ max (10 , 2 * self .wait_for_workers_timeout ))
202
+ raise TimeoutError (error_msg )
162
203
return sum (self .client .ncores ().values ())
163
204
164
205
def _to_func_args (self , func ):
0 commit comments