Skip to content

bpo-29842: Make Executor.map less eager so it handles large/unbounded… #18566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())

.. method:: map(func, *iterables, timeout=None, chunksize=1)
.. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None)

Similar to :func:`map(func, *iterables) <map>` except:

Expand All @@ -64,8 +64,15 @@ Executor Objects
performance compared to the default size of 1. With
:class:`ThreadPoolExecutor`, *chunksize* has no effect.

By default, a reasonable number of tasks are
queued beyond the number of workers, an explicit *prefetch* count may be
provided to specify how many extra tasks should be queued.

.. versionchanged:: 3.5
Added the *chunksize* argument.

.. versionchanged:: 3.12
Added the *prefetch* argument.

.. method:: shutdown(wait=True, *, cancel_futures=False)

Expand Down
34 changes: 27 additions & 7 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
__author__ = 'Brian Quinlan (brian@sweetapp.com)'

import collections
import itertools
import logging
import threading
import time
Expand Down Expand Up @@ -568,7 +569,7 @@ def submit(self, fn, /, *args, **kwargs):
"""
raise NotImplementedError()

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -580,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
prefetch: The number of chunks to queue beyond the number of
workers on the executor. If None, a reasonable default is used.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -592,21 +595,38 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
"""
if timeout is not None:
end_time = timeout + time.monotonic()
if prefetch is None:
prefetch = self._max_workers
if prefetch < 0:
raise ValueError("prefetch count may not be negative")

fs = [self.submit(fn, *args) for args in zip(*iterables)]
argsiter = zip(*iterables)
initialargs = itertools.islice(argsiter, self._max_workers + prefetch)

fs = collections.deque(self.submit(fn, *args) for args in initialargs)

# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
nonlocal argsiter
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
res = fs.popleft().result()
else:
yield fs.pop().result(end_time - time.monotonic())
Copy link
Contributor Author

@graingert graingert Jul 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't cancel the currently waited on fut correctly: #95166

import contextlib
import functools
import concurrent.futures
import threading
import sys


def fn(num, stop_event):
    if num == 1:
        stop_event.wait()
        return "done 1"

    if num == 2:
        return "done 2"


def main():
    stop_event = threading.Event()
    log = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:

        def print_n_wait(ident):
            log.append(f"{ident=} started")
            try:
                stop_event.wait()
            finally:
                log.append(f"{ident=} stopped")

        fut = pool.submit(print_n_wait, ident="first")
        try:
            with contextlib.closing(pool.map(print_n_wait, ["second", "third"], timeout=1)) as gen:
                try:
                    next(gen)
                except concurrent.futures.TimeoutError:
                    print("timed out")
                else:
                    raise RuntimeError("timeout expected")
        finally:
            stop_event.set()

    assert log == ["ident='first' started", "ident='first' stopped"], f"{log=} is wrong"

if __name__ == "__main__":
    sys.exit(main())

res = fs.popleft().result(end_time - time.monotonic())

# Dispatch next task before yielding to keep
# pipeline full
if argsiter:
try:
args = next(argsiter)
except StopIteration:
argsiter = None
else:
fs.append(self.submit(fn, *args))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the executor has been shut down, this will raise:
cannot schedule new futures after shutdown
But, also the base executor holds no state, so at this level it'll be pretty hard to tell if the executor has been shut down or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdarder
this could catch that RuntimeError and just return, or the executor could keep a weakset of result_iterators and close them on shutdown


yield res
finally:
for future in fs:
future.cancel()
Expand Down
6 changes: 4 additions & 2 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ def submit(self, fn, /, *args, **kwargs):
return f
submit.__doc__ = _base.Executor.submit.__doc__

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
Expand All @@ -795,6 +795,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
prefetch: The number of chunks to queue beyond the number of
workers on the executor. If None, a reasonable default is used.

Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
Expand All @@ -810,7 +812,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):

results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
timeout=timeout, prefetch=prefetch)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True, *, cancel_futures=False):
Expand Down
17 changes: 16 additions & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,22 @@ def record_finished(n):

self.executor.map(record_finished, range(10))
self.executor.shutdown(wait=True)
self.assertCountEqual(finished, range(10))
# No guarantees on how many tasks dispatched,
# but at least one should have been dispatched
self.assertGreater(len(finished), 0)

def test_infinite_map_input_completes_work(self):
import itertools
def identity(x):
return x

mapobj = self.executor.map(identity, itertools.count(0))
# Get one result, which shows we handle infinite inputs
# without waiting for all work to be dispatched
res = next(mapobj)
mapobj.close() # Make sure futures cancelled

self.assertEqual(res, 0)

def test_default_workers(self):
executor = self.executor_type()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:meth:`concurrent.futures.Executor.map` no longer eagerly creates all futures prior to yielding any
results. This allows it to work with huge or infinite :term:`iterable` without
consuming excessive resources or crashing, making it more suitable as a drop
in replacement for the built-in :func:`map`. Patch by Josh Rosenberg.