Skip to content

gh-119154: Simplify consumers by making queue.Queue an iterable #120503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from

Conversation

nineteendo
Copy link
Contributor

@nineteendo nineteendo commented Jun 14, 2024

Purpose

Currently consuming items from a queue is very complex. You need to

  1. use an infinite while loop
  2. check for a sentinel
  3. call q.task_done() after processing each item
  4. join the queue
  5. cancel all worker tasks using sentinels

By making queue.Queue an iterable, this becomes a lot easier, you only need to call q.shutdown().

Overview of changes

  • Calling queue.Queue.__iter__() returns a generator which iterates over the queue of items
  • An example has been added in the documentation which is a lot simpler than working with daemon threads

Example

Without iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    while True:
        i = q.get()
        if i is None:
            break

        print(f'queue -> {i}')
        time.sleep(.3)
        q.task_done()

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    print('produced everything')
    q.join()
    q.put(None)

print('consumed everything')

With iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    for i in q:
        print(f'queue -> {i}')
        time.sleep(.3)

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    q.shutdown()
    print('produced everything')

print('consumed everything')

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: library/queue.html

@rhettinger rhettinger requested a review from gvanrossum June 18, 2024 02:13
@gvanrossum
Copy link
Member

Can you find another reviewer? I’m just not in the right headspace right now.

@nineteendo nineteendo changed the title gh-120499: Add *.Queue.__iter__() gh-120499: Add queue.Queue.__iter__() Jun 18, 2024
@nineteendo nineteendo marked this pull request as ready for review June 18, 2024 07:50
@nineteendo
Copy link
Contributor Author

cc @Zac-HD

@Zac-HD
Copy link
Contributor

Zac-HD commented Jun 18, 2024

I'm not a CPython core dev, so I can't approve or merge your PRs 😅

We might want to document (and test) both the old-style and new-style approaches, but otherwise this looks good to me.

@nineteendo
Copy link
Contributor Author

I'm not a CPython core dev

Yes, I know. I just wanted your feedback.

What to do with multiprocessing.Queue? close() works per process.

@willingc
Copy link
Contributor

@Zac-HD @nineteendo Picking up review for Guido. A few questions...

  • Since this is a feature, and not a bug, what is the motivation to make this addition?
  • What benefit does this particular change give the language?
  • What are the potential risks of making the change?

@Zac-HD
Copy link
Contributor

Zac-HD commented Jun 21, 2024

Quoting and expanding from #119154 for the motivation and benefit:

Trio has proven out design patterns using 'channels' as async iterables, which I think are easier to use and less error-prone than the current asyncio.Queue. For example, having a context manager yield an async iterable avoids the motivating problems of both PEP-533 and PEP-789.

Since the .shutdown() method was added in Python 3.13, an asyncio.Queue is almost identical to a send-channel plus a receive-channel. Adding an .__aiter__ method will support these design patterns without needing to subclass or use an async generator helper function.

...but all that is about asyncio.Queue, not queue.Queue. I think the argument for the latter is consistency; Guido noted:

Adding __aiter__ seems fairly uncontroversial now we have shutdown [in 3.13+] ... I'd recommend adding __iter__ to non-asyncio queues.Queue as well, just like we added shutdown to both.

I think the risks are pretty small, since it's a tightly-scoped change, although I'd aim to merge the asyncio PR first to reduce the odds of ending up in an inconsistent state or needing to revert this if the other is rejected. My main concern is actually that many users will (incorrectly!) believe that improvements to asyncio make Trio obsolete - but that's not a risk for CPython 🙂

@nineteendo nineteendo changed the title gh-120499: Add queue.Queue.__iter__() gh-120499: Simplify consumers by making queue.Queue an iterable Jun 21, 2024
@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 21, 2024

It has the same motivation as asyncio.Queue. Additionally, it allows you to cancel workers without using sentinels (or daemon threads), which is a huge improvement in my opinion. See the updated PR body text for more information.

I agree with Zac that we should merge the asyncio PR first, but I think we should aim to merge both or neither.

@rhettinger
Copy link
Contributor

Was waiting for Guido to opine on this but he is unavailable it looks like this falls to me.

I going to decline this suggestion.

  • The code is too trivial and there is almost no benefit (the edit to the example saves only one line).
  • With shutdown being new, it is better to keep it explicit.
  • Currently the get() is an obvious marker of where blocking occurs. Code will be harder to review with it hidden behind an abstraction.
  • Current the get provides non-blocking and timeout options which become inaccessible with this.
  • The queue API is foundational and everything done there propagates to other concurrency APIs. Ideally, this API should be as compact, orthogonal, and non-opinionated as possible.
  • I disagree with the premise, "Currently consuming items from a queue is very complex." Most consumers are simple. They only add task_done if they need to track and join on task completion. The PR does very little, only combining a while/get step with a try/except that isn't always needed.

Thanks again for the suggestion but let's avoid feature creep when no significant new capability is being added. Minor respellings typically don't benefit users. In this case, I think it makes them worse-off being hiding two essential elements that should be explicit (the get call and catching the exception).

If Guido wants to reopen this, please go ahead. Otherwise, I think we're better off without it.

@rhettinger rhettinger closed this Jun 23, 2024
@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 23, 2024

The code is too trivial and there is almost no benefit (the edit to the example saves only one line).

That's an example for processing queues by daemon consumer threads. I left it in because Zac mentioned documentating both approaches:

We might want to document (and test) both the old-style and new-style approaches, but otherwise this looks good to me.

If that example is actually rewritten using iteration, we don't need to use daemon threads.

Without iteration

import threading
import queue

def worker(q):
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

q = queue.Queue()
threading.Thread(target=worker, args=(q,) daemon=True).start()
for item in range(30):
    q.put(item)

q.join()
print('All work completed') # Worker is still running

With iteration

import concurrent.futures
import queue

def worker(q):
    for item in q:
        print(f'Working on {item}')
        print(f'Finished {item}')

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp:
    tp.submit(worker, q)
    for item in range(30):
        q.put(item)

    q.shutdown()

print('All work completed') # No worker is running

With shutdown being new, it is better to keep it explicit.

It is explicit, if you don't call it, the program will hang forever.


I disagree with the premise, "Currently consuming items from a queue is very complex." Most consumers are simple.

Processing a queue isn't always the last thing a program does, see queue_join_test() for one such example:

def worker(self, q):
while True:
x = q.get()
if x < 0:
q.task_done()
return
with self.cumlock:
self.cum += x
q.task_done()
def queue_join_test(self, q):
self.cum = 0
threads = []
for i in (0,1):
thread = threading.Thread(target=self.worker, args=(q,))
thread.start()
threads.append(thread)
for i in range(100):
q.put(i)
q.join()
self.assertEqual(self.cum, sum(range(100)),
"q.join() did not block until all tasks were done")
for i in (0,1):
q.put(-1) # instruct the threads to close
q.join() # verify that you can join twice
for thread in threads:
thread.join()

This does get a lot simpler with this addition.

@nineteendo
Copy link
Contributor Author

@gvanrossum, did rhettinger change your mind?

@nineteendo nineteendo deleted the add-Queue.__iter__ branch June 23, 2024 15:08
@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 23, 2024

Also, rhettinger, only 24.5% are "simple" consumers, not the majority like you stated:

(I deleted the old message, as the search was too broad)

Edit: removed "NOT" from the link preview.

@rhettinger
Copy link
Contributor

If Guido thinks this is a good API, I would be happy for him to reopen it.

My personal judgment is that this makes users worse off but I may be wrong.

I am clear that we don't have to do this. It is only combines a while-get with a try/except. There is no new capability being added. It is perfectly reasonable to wait to see how people use shutdown() and whether they typically need a timeout which would be inaccessible in this abstraction.

@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 23, 2024

OK, and queue.Queue.iter(block=True, timeout=None)? That also might be a better api for asyncio.Queue.

@nineteendo nineteendo restored the add-Queue.__iter__ branch June 23, 2024 16:27
@rhettinger
Copy link
Contributor

rhettinger commented Jun 23, 2024

queue.Queue.iter(block=True, timeout=None)

What would this do with an Empty exception? Would it eat the exception and terminate the for-loop treating it exactly the same as a Shutdown? This seems like bug bait.

All the PR does is hide both a get() and a try/except inside a for-loop:

   def __iter__(self):
         try:
             while True:
                 yield self.get()
         except ShutDown:
             return

Hiding the two steps makes users worse-off. Besides adding overhead, it hides a blocking call which in my expericience is something that needs to be explicit. Also, it distances the user from adding any other behavior in the except-clause, possibly making a log entry or finalizing a resource. And as noted above it, distances the user from adding timeouts and non-blocking behaviors which typically need special handling distinct from the case of a shutdown. I also don't see how this would work well with code that wants to join() a queue where a worker needs to call task_done(). In short, users that want to do anything other than one specific case are made worse-off by trying to use the for-loop variant. It won't even be obvious that they have options.

@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 23, 2024

What would this do with an Empty exception? Would it eat the exception and terminate the for-loop treating it exactly the same as a Shutdown? This seems like bug bait.

Maybe like this? This ensures the queue is not empty (without risking race conditions). That seems like the only sensible behaviour as queues aren't infinite (why would you otherwise want to iterate over them without waiting).

def iter(self, block=True, timeout=None):
    '''TODO'''
    try:
        yield self.get(block=block, timeout=timeout)
    except ShutDown:
        return

    try:
        while True:
            yield self.get(block=block, timeout=timeout)
    except (Empty, ShutDown):
        return

@nineteendo
Copy link
Contributor Author

Also, it distances the user from adding any other behavior in the except-clause, possibly making a log entry or finalizing a resource.

You can do that after the for loop:

for ... in q.iter():
    # do something

cleanup(q)

And as noted above it, distances the user from adding timeouts and non-blocking behaviors which typically need special handling distinct from the case of a shutdown.

In that case, create a subclass.

I also don't see how this would work well with code that wants to join() a queue where a worker needs to call task_done().

It can be called when the user asks for the next item, but that wouldn't work if you put them in a list first.

@nineteendo
Copy link
Contributor Author

It's safer to call q.task_done() explicitly, see the discussion here: #119154.

@gvanrossum
Copy link
Member

I am not the arbiter of APIs any more. I am currently on vacation and then dealing with medical things, so please find someone else.

@nineteendo nineteendo changed the title gh-120499: Simplify consumers by making queue.Queue an iterable gh-120947: Simplify consumers by making queue.Queue an iterable Jun 24, 2024
@nineteendo nineteendo changed the title gh-120947: Simplify consumers by making queue.Queue an iterable gh-119154: Simplify consumers by making queue.Queue an iterable Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants