Skip to content

bpo-29595: Expose max_queue_size in ThreadPoolExecutor #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

prayerslayer
Copy link

@prayerslayer prayerslayer commented Feb 17, 2017

Hi!

Please forgive this blunt pull request, I wanted to open an issue first and ask, but couldn't.

The situation I ran into recently was that I used ThreadPoolExecutor to parallelize AWS API calls; I had to move data from one S3 bucket to another (~150M objects). Contrary to what I expected the maximum size of the underlying queue doesn't have a non-zero value by default. Thus my process ended up consuming gigabytes of memory, because it put more items into the queue than the threads were able to work off: The queue just kept growing. (It ran on K8s and the pod was rightfully killed eventually.)

Of course there ways to work around this. One could use more threads, to some extent. Or you could use your own queue with a defined maximum size. But I think it's more work for users of Python than necessary.

So this pull request exposes a max_queue_size parameter for the ThreadPoolExecutor which will be forwarded to queue.Queue(). It defaults to 0, so backward-compatibility is ensured. I am happy to add tests if you'd give me further instructions. I would've done this already, but I'm largely unfamiliar with this project as well as the language 😅

I hope you find this as useful as I would and am looking forward to read your thoughts about it!

https://bugs.python.org/issue29595

@the-knights-who-say-ni
Copy link

Hello, and thanks for your contribution!

I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA).

Unfortunately we couldn't find an account corresponding to your GitHub username on bugs.python.org (b.p.o) to verify you have signed the CLA. This is necessary for legal reasons before we can look at your contribution. Please follow these steps to help rectify the issue:

  1. If you don't have an account on b.p.o, please create one
  2. Make sure your GitHub username is listed in "Your Details" at b.p.o
  3. If you have not already done so, please sign the PSF contributor agreement
  4. If you just signed the CLA, please wait at least one US business day and then check "Your Details" on bugs.python.org to see if your account has been marked as having signed the CLA (the delay is due to a person having to manually check your signed CLA)
  5. Reply here saying you have completed the above steps

Thanks again to your contribution and we look forward to looking at it!

@Mariatta
Copy link
Member

Hi, can you create an issue about this in the issue tracker https://bugs.pyhon.org
Thanks.

@prayerslayer
Copy link
Author

prayerslayer commented Feb 17, 2017

I hope I did this right: http://bugs.python.org/issue29595

By the way I signed the CLA 🎉

@prayerslayer prayerslayer changed the title Expose max_queue_size in ThreadPoolExecutor bpo-29595: Expose max_queue_size in ThreadPoolExecutor Feb 17, 2017
@Winterflower
Copy link

Although the keyword max_queue_size is rather self-explanatory, I wonder if there should be some simple checks to make sure no nonsensible values such as None etc are passed to the Queue constructor.

@prayerslayer
Copy link
Author

Yes, that's a good point. I'd probably check for not None and > 0. Would you rather silently not set max_queue_size or raise an exception?

@Winterflower
Copy link

Winterflower commented Feb 17, 2017

I'll let the more experienced devs weigh-in on this, but imho 'explicit is better than implicit' and I'd prob raise an exception to alert the client code that an invalid value was passed for a keyword arg. If you fail fast, this kind of a nonsensical argument will most likely be caught early instead of puzzling people when it causes bizarre behaviour in production (due to the implicit setting of keyword args).
I haven't looked at the tests for this, but I suppose if you wanted to somehow test that your Queue is being set with the correct size, you can create a ThreadPoolExecutor class, pass in a max_queue_size arg, then grab the self._work_queue and assert that the maxsize property on that is equal to what you passed in for max_queue_size.

@prayerslayer
Copy link
Author

Updated the PR and added tests. Should I update the docs somewhere?

@@ -96,8 +98,12 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

# check that max_queue_size is a positive integer
if type(max_queue_size) is not int or max_queue_size < 0:
raise ValueError("max_queue_size must be equal or greater 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight rewording to "greater or equal to 0" is possibly better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would remove this altogether again as you suggested.

self._max_workers = max_workers
self._work_queue = queue.Queue()
self._work_queue = queue.Queue(maxsize=max_queue_size)
self._threads = set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxsize is the sole arg to the initializer of Queue so you could just supply max_queue_size since it's also pretty descriptive.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the sole arg to the initializer

What if this changes? Is it then still safe to just pass max_queue_size as positional argument?

@@ -96,8 +98,12 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

# check that max_queue_size is a positive integer
if type(max_queue_size) is not int or max_queue_size < 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it strictly be an int? Classes that inherit from int won't pass this condition unless isinstance is used. Either way I'm not sure if doing any check for the type is a good idea since the Queue class generally doesn't (and fails when you put something in it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also fine if we leave verification to the Queue.

@prayerslayer
Copy link
Author

Also, should I add the same feature to the ProcessPoolExecutor?

@DimitrisJim
Copy link
Contributor

Since you've changed the signature of a class, the relevant docs need the required updates too :-)

I'd suggest not changing anything else until a core-dev also comes through and provides a review and an opinion on the change. If one doesn't come around for a while, "ping" the issue on the issue tracker to try and get attention to it.

@prayerslayer
Copy link
Author

Updated the docs and waiting for a core dev.

@prayerslayer
Copy link
Author

Fixed the build and pinging for authoritative feedback.

@prayerslayer
Copy link
Author

Any updates? It's been a while and I also pinged it on the issue tracker to no avail...

@rhettinger
Copy link
Contributor

If this feature is approved, we should give some thought to making it a keyword-only argument. That would improve code clarity and leave the positional arguments open for future expansion if needed. Also, if we ever need to make a change, keyword arguments are easier to deprecate than positional arguments (if other positional arguments had been added in the interim).

@pitrou
Copy link
Member

pitrou commented Apr 1, 2017

I think this is a nice feature addition overall.

@pitrou
Copy link
Member

pitrou commented Apr 1, 2017

Also, should I add the same feature to the ProcessPoolExecutor?

Yes, that would be reasonable, as it is vulnerable to the same problem.


An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.
threads to execute calls asynchronously. If all worker threads are busy it
buffers new work items using a queue of maximum size *max_queue_size*.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document that map and submit will block if the queue size is reached.

for i in range(-10, 10):
executor = futures.ThreadPoolExecutor(max_queue_size=i)
self.assertEqual(executor._work_queue.maxsize, i)
# provided value is forwarded without checking it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised by this. @rhettinger, is there any reason queue.Queue doesn't check for the size type?

>>> q = Queue(maxsize="foo")
>>> q.put(1)
Traceback (most recent call last):
  File "<ipython-input-4-f7b62bda2b8c>", line 1, in <module>
    q.put(1)
  File "/home/antoine/miniconda3/envs/dask35/lib/python3.5/queue.py", line 127, in put
    if self.maxsize > 0:
TypeError: unorderable types: str() > int()

def test_custom_max_queue_size(self):
# test custom valid and invalid values
for i in range(-10, 10):
executor = futures.ThreadPoolExecutor(max_queue_size=i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean to have a negative queue size?

Copy link
Author

@prayerslayer prayerslayer Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing really, but since the Queue constructor doesn't check for validity of arguments I wanted to add a test for invalid arguments. (To be sure that I didn't break this behavior.)

@@ -178,6 +178,20 @@ def test_thread_names_default(self):
self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
t.join()

def test_default_max_queue_size(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a higher-level test, i.e. create an executor with a non-zero queue size, submit a number of tasks larger than the queue size, and check that they all get executed in the end.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea :)

@prayerslayer
Copy link
Author

I think I have everything for the ThreadPoolExecutor, at least.

As for the ProcessPoolExecutor it seems like the work_ids queue would be susceptible to growing too big, can fix that later.

Regarding multiprocessing.Pool I can mechanically apply the same change, but I have really no idea what's going on there.

threads to execute calls asynchronously. If all worker threads are busy it
buffers new work items using a queue of maximum size *max_queue_size*. When
this maximum size is reached, map() and submit() will block until more work
items have been processed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that the default (0) means an unbounded queue?

@@ -142,6 +145,12 @@ And::
control the threading.Thread names for worker threads created by
the pool for easier debugging.

.. versionadded:: 3.7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a new function parameter, this should be versionchanged (versionadded means the function itself is new). Unfortunately, it seems we did the same mistake above for thread_name_prefix...

fs = [executor.submit(process_item, i) for i in range(n)]
expected_results = [process_item(i) for i in range(n)]

for f in futures.as_completed(fs, timeout=10):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, simply, self.assertEqual(sorted(f.result() for f in futures.as_completed(...)), sorted(expected_results))

n = 10

def process_item(item):
return item + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a small sleep() here to help the queue reach its limit at some point?

@pitrou
Copy link
Member

pitrou commented Oct 31, 2017

I agree multiprocessing.Pool doesn't need to be tackled here. On the other hand, I'd like to see ProcessPoolExecutor updated, for consistency reasons and as we could then re-use the test case.

akruis pushed a commit to akruis/cpython that referenced this pull request Nov 7, 2017
…on#143

- avoid type punning / strict aliasing violations
- don't rely on implementation defined placement of bit-fields in the
storage unit.
(cherry picked from commit b9c243a)
akruis pushed a commit to akruis/cpython that referenced this pull request Dec 20, 2017
Stackless contributes two tests to builtins: TaskletExit and
TaskletExit.__init__. Therefore we have to adjust the limit.

Add missing changelog entries (python#143, python#144).
akruis pushed a commit to akruis/cpython that referenced this pull request Mar 25, 2018
…on#143

- avoid type punning / strict aliasing violations
- don't rely on implementation defined placement of bit-fields in the
storage unit.
akruis pushed a commit to akruis/cpython that referenced this pull request Mar 25, 2018
Stackless contributes two tests to builtins: TaskletExit and
TaskletExit.__init__. Therefore we have to adjust the limit.

Add missing changelog entries (python#143, python#144).

(cherry picked from commit 7327e4b)
@tomMoral
Copy link
Contributor

tomMoral commented Apr 2, 2018

The recent changes by @pitrou have replaced the Queue with a SimpleQueue which is unbounded so this PR is conflicting with master. So the change is not simple.

After some investigation, one possibility would be to revert to using a queue.Queue and modify the underlying mutex to be a Rlock. This way, the gc could still be run while the lock is acquired without troubles. This change does not seem to be breaking the API of the queue.Queue, as the internal state of the queue would still be updated atomically in one thread (the gc being only allow in this same thread). What do you think about that? We could also add a mutex parameter to the Queue to avoid changing the behavior every where but this means that the same gc issue can occurs everywhere.

For the API change, my only concern is that the submit function become a blocking function in this case and that we should probably also implement some mechanisms to return in the case the queue is Full. This makes the code more complex.

@prayerslayer are you still interested on working on this issue? If not, let me know, I am interested in taking over from here.

@pitrou
Copy link
Member

pitrou commented Apr 2, 2018

After some investigation, one possibility would be to revert to using a queue.Queue and modify the underlying mutex to be a Rlock. This way, the gc could still be run while the lock is acquired without troubles.

Using a RLock does not make a routine signal-safe. The typical situation is:

  • you enter a function protected by a RLock (e.g. Queue.put); the function takes the RLock and starts modifying some internal state
  • a signal arrives and interrupts code execution
  • the signal handler, through some chain of events, re-enters the RLock protected function (in this case: the signal handler triggers something that puts something on the queue)
  • the protected function takes the RLock again; it sees the internal state in a half-modified inconsistent state: who knows what can happen?

(you can replace "signal" with "cyclic garbage collection" above for the same effect)

In other words, the problem is not the lock, but the operations that are protected by the lock.

SimpleQueue was designed specifically for SimpleQueue.put to be safe in that situation. Queue probably isn't, especially with a non-zero maxsize.

@tomMoral
Copy link
Contributor

tomMoral commented Apr 2, 2018

you can replace "signal" with "cyclic garbage collection" above for the same effect

Thanks for the explanation, I did not think about it as a signal interruption.

Thus, to get this kind of API changes, we need to implement a size mechanism in the Executor.

@GollyJer
Copy link

GollyJer commented Apr 3, 2018

Hi everyone. I stumbled on this pull request after messing around with this concept myself for the last few days.

I may not be testing this correctly but here's what I'm doing.

from time import time, strftime, sleep, gmtime
from random import randint
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue


class ThreadPoolExecutorWithQueueLimit(ThreadPoolExecutor):
    def __init__(self, max_queue_size, *args, **kwargs):
        super(ThreadPoolExecutorWithQueueLimit, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(maxsize=max_queue_size)


def nap(nap_length):
    sleep(nap_length)
    return nap_length


if __name__ == '__main__':

    startTime = time()

    range_size = 100
    max_pool_size = 10
    max_worker_count = 100

    with ThreadPoolExecutorWithQueueLimit(max_queue_size=max_pool_size,
                                          max_workers=max_worker_count) as pool_executor:
        pool = {}
        for i in range(range_size):
            function_call = pool_executor.submit(nap, randint(0, 2))
            pool[function_call] = i

        for completed_function in as_completed(pool):
            result = completed_function.result()
            i = pool[completed_function]

            print('{} completed @ {} and slept for {}'.format(
                str(i).zfill(4),
                strftime("%H:%M:%S", gmtime()),
                result))

    print('==--- Script took {} seconds. ---=='.format(
        round(time() - startTime)))

In this case the queue fills with all 100 threads even though I've set it to 10.
I expect the queue to fill to 10 and as a thread completes it prints to the console and the queue fills back to it's max.

Am I completely off base with my expectations? What have I done wrong?
Thanks!

@prayerslayer
Copy link
Author

@tomMoral: are you still interested on working on this issue? If not, let me know, I am interested in taking over from here.

I'm interested in getting this merged somehow, but since I didn't do anything for a year it's probably unrealistic that I'll finish it at all. Please go ahead :)

@noxdafox
Copy link

noxdafox commented Apr 13, 2018

Hello,

I posted a similar patch for multiprocessing.Pool long ago: https://bugs.python.org/issue19173 but it never got attention. Please let me know if I shall rework it to add it to multiprocessing.Pool as well.

I'd like to add my own 2 cents in regards of this feature as @pitrou suggested to implement it in ProcessPoolExecutor as well.

Allowing the user to set a maximum size to the internal queue communicates a false sense of control over the amount of tasks which will be submitted to a ProcessPoolExecutor.
The expectation is that, if I set the maximum size to the internal job queue to 10, then the eleventh call to submit will block. Most of the users will be surprised when they realize this is not the case.

The reason behind this flaw is that several jobs are pulled from the internal job queue and shovelled down the Pipe or SimpleQueue (or whatever the implementation uses) by a Thread. The design is such to ensure acceptable performance and reduce IPC latency cost.

As the size of the jobs may vary as well as for the size of the Pipe, we have no way to control how many jobs will fit into it. Hence there is not a simple and consistent way to ensure the promised behaviour to the user.

I experienced this issue when releasing pebble 2.0. The pool allowed the user to provide his/her own implementation of queue.Queue. I've been contacted by few users which experienced unpredictable behaviour when providing a queue with a maxsize being set. After few design iterations I realised that there was not a simple and elegant way to provide such feature and I removed it when moving to the 3.0 release.

A much simpler and explicit way to achieve this goal is by using a Semaphore. This can be easily implemented both for multiprocessing.Pool and for concurrent.future.ProcessPoolExecutor.

Note that the above mentioned issue is affecting only process based pools, thread pools will actually work as expected. I could easily reproduce the issue with multiprocessing.Pool at the time. I am not sure about concurrent.futures.ProcessPoolExecutor as I am not aware of the current status of its internals. It changed a lot since the last time I looked at it.

@pitrou
Copy link
Member

pitrou commented Apr 13, 2018

@noxdafox Using a semaphore sounds reasonable. Feel free to post a PR.

@noxdafox
Copy link

@pitrou the question is do we need to integrate this feature in the pools considering how trivial is to achieve such functionality with few lines of code?

IMHO this fits more as a recipe or gist than a core functionality of the pool.

I am more concerned about the slow creeping of features leading to a complex class to maintain in the future.

@pitrou
Copy link
Member

pitrou commented Apr 13, 2018

I agree with the maintainability concern. The concurrent.futures code has grown more complicated lately as it became more robust against various classes of errors.

@tomMoral
Copy link
Contributor

I agree that this is a bad idea to change the size of the call_queue, as it can lead to Executor and Pool starving and it also increases the probability of having deadlocks on shutdown because of the sentinels. Using a threading.BoundedSemaphore permits to decouple this mechanism from the internals of the Executor/Pool and it is fairly easy to implement. I can do it for concurrent.futures.

However, I am unsure of what should be the expected behavior, on two points of this API change:

  • Should we limit the number of pending tasks (in the queue and being processed) or the number of tasks that are not being processed (as it was the original proposition in bpo-29595).
  • Should the call to Executor.submit or Pool.apply_async be blocking? If so, should we add a timeout parameter when the number of tasks is too high?

Overall, this addition seems to also complicate the call to submit for the users and I am not sure it is worth it.

@noxdafox
Copy link

This gist shows how trivial is for a user to achieve such functionality.

There would be several other questions to sort out to properly add this feature to the Pools. What about map for example? How do we count a task? A single submission or the length of the iterable? What about the chunksize?

IMHO the benefit of having this feature built-in is not worth its maintenance.

akruis pushed a commit to akruis/cpython that referenced this pull request Jun 19, 2018
- avoid type punning / strict aliasing violations
- don't rely on implementation defined placement of bit-fields in the
storage unit.

(cherry picked from commit 5512131)
@brettcannon
Copy link
Member

Since @prayerslayer said "it's probably unrealistic that [he will] finish" this PR, I'm going to close it (obviously people can open their own PRs to implement this functionality if they choose to).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.