@@ -389,22 +389,21 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
389
389
return self ._map_async (func , iterable , starmapstar , chunksize ,
390
390
callback , error_callback )
391
391
392
- def _guarded_task_generation (self , result_job , func , iterable ,
393
- buffersize_sema = None ):
392
+ def _guarded_task_generation (self , result_job , func , iterable , sema = None ):
394
393
'''Provides a generator of tasks for imap and imap_unordered with
395
394
appropriate handling for iterables which throw exceptions during
396
395
iteration.'''
397
396
try :
398
397
i = - 1
399
398
400
- if buffersize_sema is None :
399
+ if sema is None :
401
400
for i , x in enumerate (iterable ):
402
401
yield (result_job , i , func , (x ,), {})
403
402
404
403
else :
405
404
enumerated_iter = iter (enumerate (iterable ))
406
405
while True :
407
- buffersize_sema .acquire ()
406
+ sema .acquire ()
408
407
try :
409
408
i , x = next (enumerated_iter )
410
409
except StopIteration :
@@ -419,13 +418,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
419
418
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
420
419
'''
421
420
self ._check_running ()
422
- if chunksize < 1 :
423
- raise ValueError ("Chunksize must be 1+, not {0:n}" .format (chunksize ))
424
- if buffersize is not None :
425
- if not isinstance (buffersize , int ):
426
- raise TypeError ("buffersize must be an integer or None" )
427
- if buffersize < 1 :
428
- raise ValueError ("buffersize must be None or > 0" )
421
+ self ._check_chunksize (chunksize )
422
+ self ._check_buffersize (buffersize )
429
423
430
424
result = IMapIterator (self , buffersize )
431
425
if chunksize == 1 :
@@ -441,8 +435,12 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
441
435
task_batches = Pool ._get_tasks (func , iterable , chunksize )
442
436
self ._taskqueue .put (
443
437
(
444
- self ._guarded_task_generation (result ._job , mapstar , task_batches ,
445
- result ._buffersize_sema ),
438
+ self ._guarded_task_generation (
439
+ result ._job ,
440
+ mapstar ,
441
+ task_batches ,
442
+ result ._buffersize_sema ,
443
+ ),
446
444
result ._set_length ,
447
445
)
448
446
)
@@ -453,15 +451,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
453
451
Like `imap()` method but ordering of results is arbitrary.
454
452
'''
455
453
self ._check_running ()
456
- if chunksize < 1 :
457
- raise ValueError (
458
- "Chunksize must be 1+, not {0!r}" .format (chunksize )
459
- )
460
- if buffersize is not None :
461
- if not isinstance (buffersize , int ):
462
- raise TypeError ("buffersize must be an integer or None" )
463
- if buffersize < 1 :
464
- raise ValueError ("buffersize must be None or > 0" )
454
+ self ._check_chunksize (chunksize )
455
+ self ._check_buffersize (buffersize )
465
456
466
457
result = IMapUnorderedIterator (self , buffersize )
467
458
if chunksize == 1 :
@@ -477,8 +468,12 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
477
468
task_batches = Pool ._get_tasks (func , iterable , chunksize )
478
469
self ._taskqueue .put (
479
470
(
480
- self ._guarded_task_generation (result ._job , mapstar , task_batches ,
481
- result ._buffersize_sema ),
471
+ self ._guarded_task_generation (
472
+ result ._job ,
473
+ mapstar ,
474
+ task_batches ,
475
+ result ._buffersize_sema ,
476
+ ),
482
477
result ._set_length ,
483
478
)
484
479
)
@@ -531,6 +526,22 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
531
526
)
532
527
return result
533
528
529
+ @staticmethod
530
+ def _check_chunksize (chunksize ):
531
+ if chunksize < 1 :
532
+ raise ValueError (
533
+ "Chunksize must be 1+, not {0:n}" .format (chunksize )
534
+ )
535
+
536
+ @staticmethod
537
+ def _check_buffersize (buffersize ):
538
+ if buffersize is None :
539
+ return
540
+ if not isinstance (buffersize , int ):
541
+ raise TypeError ("buffersize must be an integer or None" )
542
+ if buffersize < 1 :
543
+ raise ValueError ("buffersize must be None or > 0" )
544
+
534
545
@staticmethod
535
546
def _wait_for_updates (sentinels , change_notifier , timeout = None ):
536
547
wait (sentinels , timeout = timeout )
@@ -876,7 +887,8 @@ def _set(self, i, success_result):
876
887
#
877
888
878
889
class IMapIterator (object ):
879
- def __init__ (self , pool , buffersize ):
890
+
891
+ def __init__ (self , pool , buffersize = None ):
880
892
self ._pool = pool
881
893
self ._cond = threading .Condition (threading .Lock ())
882
894
self ._job = next (job_counter )
0 commit comments