diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 8efbf0a3d59554..4f33931bea2a3d 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -38,7 +38,7 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(func, *iterables, timeout=None, chunksize=1) + .. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None) Similar to :func:`map(func, *iterables) ` except: @@ -64,8 +64,15 @@ Executor Objects performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, *chunksize* has no effect. + By default, a reasonable number of tasks are + queued beyond the number of workers, an explicit *prefetch* count may be + provided to specify how many extra tasks should be queued. + .. versionchanged:: 3.5 Added the *chunksize* argument. + + .. versionchanged:: 3.12 + Added the *prefetch* argument. .. method:: shutdown(wait=True, *, cancel_futures=False) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d7e7e41967cc21..3b8ce3ce60b59b 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,6 +4,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +import itertools import logging import threading import time @@ -568,7 +569,7 @@ def submit(self, fn, /, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -580,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -592,21 +595,38 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): """ if timeout is not None: end_time = timeout + time.monotonic() + if prefetch is None: + prefetch = self._max_workers + if prefetch < 0: + raise ValueError("prefetch count may not be negative") - fs = [self.submit(fn, *args) for args in zip(*iterables)] + argsiter = zip(*iterables) + initialargs = itertools.islice(argsiter, self._max_workers + prefetch) + + fs = collections.deque(self.submit(fn, *args) for args in initialargs) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): + nonlocal argsiter try: - # reverse to keep finishing order - fs.reverse() while fs: - # Careful not to keep a reference to the popped future if timeout is None: - yield fs.pop().result() + res = fs.popleft().result() else: - yield fs.pop().result(end_time - time.monotonic()) + res = fs.popleft().result(end_time - time.monotonic()) + + # Dispatch next task before yielding to keep + # pipeline full + if argsiter: + try: + args = next(argsiter) + except StopIteration: + argsiter = None + else: + fs.append(self.submit(fn, *args)) + + yield res finally: for future in fs: future.cancel() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7e2f5fa30e8264..b3618c8942b2dd 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -784,7 +784,7 @@ def submit(self, fn, /, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -795,6 +795,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -810,7 +812,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), - timeout=timeout) + timeout=timeout, prefetch=prefetch) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True, *, cancel_futures=False): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index e294bd3a0957c7..67d737e5e9870f 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -890,7 +890,22 @@ def record_finished(n): self.executor.map(record_finished, range(10)) self.executor.shutdown(wait=True) - self.assertCountEqual(finished, range(10)) + # No guarantees on how many tasks dispatched, + # but at least one should have been dispatched + self.assertGreater(len(finished), 0) + + def test_infinite_map_input_completes_work(self): + import itertools + def identity(x): + return x + + mapobj = self.executor.map(identity, itertools.count(0)) + # Get one result, which shows we handle infinite inputs + # without waiting for all work to be dispatched + res = next(mapobj) + mapobj.close() # Make sure futures cancelled + + self.assertEqual(res, 0) def test_default_workers(self): executor = self.executor_type() diff --git a/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst new file mode 100644 index 00000000000000..4e42fe29aa856a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst @@ -0,0 +1,4 @@ +:meth:`concurrent.futures.Executor.map` no longer eagerly creates all futures prior to yielding any +results. This allows it to work with huge or infinite :term:`iterable` without +consuming excessive resources or crashing, making it more suitable as a drop +in replacement for the built-in :func:`map`. Patch by Josh Rosenberg.