Skip to content

Commit d1dee31

Browse files
committed
Remove imap and imap_unordered body code duplication
1 parent 199eb42 commit d1dee31

File tree

1 file changed

+31
-56
lines changed

1 file changed

+31
-56
lines changed

Lib/multiprocessing/pool.py

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -417,67 +417,14 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
417417
'''
418418
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
419419
'''
420-
self._check_running()
421-
self._check_chunksize(chunksize)
422-
self._check_buffersize(buffersize)
423-
424-
result = IMapIterator(self, buffersize)
425-
if chunksize == 1:
426-
self._taskqueue.put(
427-
(
428-
self._guarded_task_generation(result._job, func, iterable,
429-
result._buffersize_sema),
430-
result._set_length,
431-
)
432-
)
433-
return result
434-
else:
435-
task_batches = Pool._get_tasks(func, iterable, chunksize)
436-
self._taskqueue.put(
437-
(
438-
self._guarded_task_generation(
439-
result._job,
440-
mapstar,
441-
task_batches,
442-
result._buffersize_sema,
443-
),
444-
result._set_length,
445-
)
446-
)
447-
return (item for chunk in result for item in chunk)
420+
return self._imap(IMapIterator, func, iterable, chunksize, buffersize)
448421

449422
def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
450423
'''
451424
Like `imap()` method but ordering of results is arbitrary.
452425
'''
453-
self._check_running()
454-
self._check_chunksize(chunksize)
455-
self._check_buffersize(buffersize)
456-
457-
result = IMapUnorderedIterator(self, buffersize)
458-
if chunksize == 1:
459-
self._taskqueue.put(
460-
(
461-
self._guarded_task_generation(result._job, func, iterable,
462-
result._buffersize_sema),
463-
result._set_length,
464-
)
465-
)
466-
return result
467-
else:
468-
task_batches = Pool._get_tasks(func, iterable, chunksize)
469-
self._taskqueue.put(
470-
(
471-
self._guarded_task_generation(
472-
result._job,
473-
mapstar,
474-
task_batches,
475-
result._buffersize_sema,
476-
),
477-
result._set_length,
478-
)
479-
)
480-
return (item for chunk in result for item in chunk)
426+
return self._imap(IMapUnorderedIterator, func, iterable, chunksize,
427+
buffersize)
481428

482429
def apply_async(self, func, args=(), kwds={}, callback=None,
483430
error_callback=None):
@@ -526,6 +473,34 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
526473
)
527474
return result
528475

476+
def _imap(self, iterator_cls, func, iterable, chunksize=1,
477+
buffersize=None):
478+
self._check_running()
479+
self._check_chunksize(chunksize)
480+
self._check_buffersize(buffersize)
481+
482+
result = iterator_cls(self, buffersize)
483+
if chunksize == 1:
484+
self._taskqueue.put(
485+
(
486+
self._guarded_task_generation(result._job, func, iterable,
487+
result._buffersize_sema),
488+
result._set_length,
489+
)
490+
)
491+
return result
492+
else:
493+
task_batches = Pool._get_tasks(func, iterable, chunksize)
494+
self._taskqueue.put(
495+
(
496+
self._guarded_task_generation(result._job, mapstar,
497+
task_batches,
498+
result._buffersize_sema),
499+
result._set_length,
500+
)
501+
)
502+
return (item for chunk in result for item in chunk)
503+
529504
@staticmethod
530505
def _check_chunksize(chunksize):
531506
if chunksize < 1:

0 commit comments

Comments
 (0)