@@ -284,13 +284,14 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
284
284
A named 2-tuple of sets. The first set, named 'done', contains the
285
285
futures that completed (is finished or cancelled) before the wait
286
286
completed. The second set, named 'not_done', contains uncompleted
287
- futures.
287
+ futures. Duplicate futures given to *fs* are removed and will be
288
+ returned only once.
288
289
"""
290
+ fs = set (fs )
289
291
with _AcquireFutures (fs ):
290
- done = set (f for f in fs
291
- if f ._state in [CANCELLED_AND_NOTIFIED , FINISHED ])
292
- not_done = set (fs ) - done
293
-
292
+ done = {f for f in fs
293
+ if f ._state in [CANCELLED_AND_NOTIFIED , FINISHED ]}
294
+ not_done = fs - done
294
295
if (return_when == FIRST_COMPLETED ) and done :
295
296
return DoneAndNotDoneFutures (done , not_done )
296
297
elif (return_when == FIRST_EXCEPTION ) and done :
@@ -309,7 +310,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
309
310
f ._waiters .remove (waiter )
310
311
311
312
done .update (waiter .finished_futures )
312
- return DoneAndNotDoneFutures (done , set ( fs ) - done )
313
+ return DoneAndNotDoneFutures (done , fs - done )
313
314
314
315
class Future (object ):
315
316
"""Represents the result of an asynchronous computation."""
@@ -380,13 +381,17 @@ def running(self):
380
381
return self ._state == RUNNING
381
382
382
383
def done (self ):
383
- """Return True of the future was cancelled or finished executing."""
384
+ """Return True if the future was cancelled or finished executing."""
384
385
with self ._condition :
385
386
return self ._state in [CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED ]
386
387
387
388
def __get_result (self ):
388
389
if self ._exception :
389
- raise self ._exception
390
+ try :
391
+ raise self ._exception
392
+ finally :
393
+ # Break a reference cycle with the exception in self._exception
394
+ self = None
390
395
else :
391
396
return self ._result
392
397
@@ -426,20 +431,24 @@ def result(self, timeout=None):
426
431
timeout.
427
432
Exception: If the call raised then that exception will be raised.
428
433
"""
429
- with self ._condition :
430
- if self ._state in [CANCELLED , CANCELLED_AND_NOTIFIED ]:
431
- raise CancelledError ()
432
- elif self ._state == FINISHED :
433
- return self .__get_result ()
434
-
435
- self ._condition .wait (timeout )
436
-
437
- if self ._state in [CANCELLED , CANCELLED_AND_NOTIFIED ]:
438
- raise CancelledError ()
439
- elif self ._state == FINISHED :
440
- return self .__get_result ()
441
- else :
442
- raise TimeoutError ()
434
+ try :
435
+ with self ._condition :
436
+ if self ._state in [CANCELLED , CANCELLED_AND_NOTIFIED ]:
437
+ raise CancelledError ()
438
+ elif self ._state == FINISHED :
439
+ return self .__get_result ()
440
+
441
+ self ._condition .wait (timeout )
442
+
443
+ if self ._state in [CANCELLED , CANCELLED_AND_NOTIFIED ]:
444
+ raise CancelledError ()
445
+ elif self ._state == FINISHED :
446
+ return self .__get_result ()
447
+ else :
448
+ raise TimeoutError ()
449
+ finally :
450
+ # Break a reference cycle with the exception in self._exception
451
+ self = None
443
452
444
453
def exception (self , timeout = None ):
445
454
"""Return the exception raised by the call that the future represents.
@@ -550,7 +559,7 @@ def set_exception(self, exception):
550
559
class Executor (object ):
551
560
"""This is an abstract base class for concrete asynchronous executors."""
552
561
553
- def submit (* args , ** kwargs ):
562
+ def submit (self , fn , / , * args , ** kwargs ):
554
563
"""Submits a callable to be executed with the given arguments.
555
564
556
565
Schedules the callable to be executed as fn(*args, **kwargs) and returns
@@ -559,21 +568,7 @@ def submit(*args, **kwargs):
559
568
Returns:
560
569
A Future representing the given call.
561
570
"""
562
- if len (args ) >= 2 :
563
- pass
564
- elif not args :
565
- raise TypeError ("descriptor 'submit' of 'Executor' object "
566
- "needs an argument" )
567
- elif 'fn' in kwargs :
568
- import warnings
569
- warnings .warn ("Passing 'fn' as keyword argument is deprecated" ,
570
- DeprecationWarning , stacklevel = 2 )
571
- else :
572
- raise TypeError ('submit expected at least 1 positional argument, '
573
- 'got %d' % (len (args )- 1 ))
574
-
575
571
raise NotImplementedError ()
576
- submit .__text_signature__ = '($self, fn, /, *args, **kwargs)'
577
572
578
573
def map (self , fn , * iterables , timeout = None , chunksize = 1 ):
579
574
"""Returns an iterator equivalent to map(fn, iter).
@@ -619,7 +614,7 @@ def result_iterator():
619
614
future .cancel ()
620
615
return result_iterator ()
621
616
622
- def shutdown (self , wait = True ):
617
+ def shutdown (self , wait = True , * , cancel_futures = False ):
623
618
"""Clean-up the resources associated with the Executor.
624
619
625
620
It is safe to call this method several times. Otherwise, no other
@@ -629,6 +624,9 @@ def shutdown(self, wait=True):
629
624
wait: If True then shutdown will not return until all running
630
625
futures have finished executing and the resources used by the
631
626
executor have been reclaimed.
627
+ cancel_futures: If True then shutdown will cancel all pending
628
+ futures. Futures that are completed or running will not be
629
+ cancelled.
632
630
"""
633
631
pass
634
632
0 commit comments