Skip to content

gh-119154: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() #120925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 90 commits into from

Conversation

nineteendo
Copy link
Contributor

@nineteendo nineteendo commented Jun 23, 2024

Purpose

Currently consuming items from a queue can be a bit more complex than necessary. You need to

  1. use an infinite while loop
  2. check for a sentinel or add all worker tasks to a list
  3. call q.task_done() after processing each item
  4. join the queue
  5. cancel all worker tasks
  6. wait until all worker tasks are cancelled

By adding *.Queue.iter() and *.Queue.iter_nowait(), this becomes a bit easier, you only need to call queue.shutdown().

Overview of changes

asyncio

  • Calling queue.Queue.iter() returns an asynchronous generator which iterates over the queue of items
  • Calling queue.Queue.iter_nowait() returns a generator which iterates over the queue of items without blocking.
  • A private _AsyncQueueIterator has been added to handle the asynchronous iteration
  • The example in the documentation is greatly simplified by the new addition

queue.py

  • Calling queue.Queue.iter() returns a generator which iterates over the queue of items
  • Calling queue.Queue.iter_nowait() returns a generator which iterates over the queue of items without blocking.
  • An example has been added in the documentation which is a bit simpler than working with daemon threads

Examples

asyncio

Without iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    while True:
        i = await queue.get()
        print(f'queue -> {i}')
        await asyncio.sleep(.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    tasks = []
    for _ in range(1):
        task = asyncio.create_task(consumer(queue))
        tasks.append(task)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))

    print('produced everything')
    await queue.join()
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    print('consumed everything')

asyncio.run(main())

With iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    async for i in queue.iter():
        print(f'queue -> {i}')
        await asyncio.sleep(.3)

async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg1:
        tg1.create_task(consumer(queue))
        async with asyncio.TaskGroup() as tg2:
            tg2.create_task(producer(queue))

        queue.shutdown()
        print('produced everything')

    print('consumed everything')

asyncio.run(main())

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

queue as final step

Without iteration

import threading
import queue

def worker(q):
    while True:
        item = q.get()
        print(f'queue -> {item}')
        q.task_done()

q = queue.Queue()
threading.Thread(target=worker, args=(q,) daemon=True).start()
for item in range(5):
    q.put(item)
    print(f'{item} -> queue')

print('produced everything')
q.join()
print('consumed everything') # Worker is still running

With iteration

import concurrent.futures
import queue

def worker(q):
    for item in q.iter():
        print(f'queue -> {item}')

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp:
    tp.submit(worker, q)
    for item in range(5):
        q.put(item)
        print(f'{item} -> queue')

    q.shutdown()
    print('produced everything')

print('consumed everything') # No worker is running

Output

0 -> queue
queue -> 0
1 -> queue
queue -> 1
queue -> 2
2 -> queue
3 -> queue
4 -> queue
produced everything
queue -> 3
queue -> 4
consumed everything

queue not as final step

Without iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    while True:
        i = q.get()
        if i is None:
            break

        print(f'queue -> {i}')
        time.sleep(.3)
        q.task_done()

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    print('produced everything')
    q.join()
    q.put(None)

print('consumed everything')

With iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    for i in q.iter():
        print(f'queue -> {i}')
        time.sleep(.3)

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    q.shutdown()
    print('produced everything')

print('consumed everything')

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: library/queue.html library/asyncio-queue.html

@nineteendo nineteendo changed the title gh-120924: Simplify consumers by adding queue.Queue.iter() and queue.Queue.iter_nowait() gh-120924: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() Jun 24, 2024
@nineteendo
Copy link
Contributor Author

Sorry, I didn't intend to request a review. I merged the pull requests to leave it at just 2.

@nineteendo nineteendo marked this pull request as draft June 24, 2024 07:35
@gvanrossum
Copy link
Member

I’m sorry, I am currently on vacation and dealing with health issues. I will not be a available to review this.

@gvanrossum gvanrossum removed their request for review June 24, 2024 08:16
@nineteendo nineteendo changed the title gh-120924: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() gh-120947: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() Jun 24, 2024
@nineteendo nineteendo changed the title gh-120947: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() gh-119154: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() Jun 24, 2024
@rhettinger
Copy link
Contributor

rhettinger commented Jul 5, 2024

Note, I (with Erlend Aasland concurring) marked #120503 as rejected 2 weeks ago. Rather than accept the result from the Queue module maintainer, the OP immediately opened this new PR with substantially the same proposal and code (plus other additions). No mention was made of the prior rejection or reasons for it. This seems deceptive and inappropriate.

@nineteendo
Copy link
Contributor Author

nineteendo commented Jul 6, 2024

I first asked whether this would be a better API: #120503 (comment). And clarified how Empty exceptions would be handled: #120503 (comment). You haven't given your opinion yet.

I'll attempt tot address the previous rejection.

@nineteendo
Copy link
Contributor Author

The code is too trivial and there is almost no benefit (the edit to the example saves only one line).1

  • By supporting this by default we "support such design patterns without subclassing or a generator helper function"2.
  • You're no longer forced to use daemon threads or sentinels (in which case more than 1 line is saved). Having daemon threads spinning in the background when all work is done is undesirable.

With shutdown being new, it is better to keep it explicit.1

If you want, we can delay this change until Python 3.15.

Currently the get() is an obvious marker of where blocking occurs. Code will be harder to review with it hidden behind an abstraction.1

Is it clearer with q.iter() and q.iter_nowait()? That's should be as clear as get() and get_nowait().

Current the get provides non-blocking and timeout options which become inaccessible with this.1
It is perfectly reasonable to wait to see how people use shutdown() and whether they typically need a timeout which would be inaccessible in this abstraction.3

q.iter() provides the same options now.

The queue API is foundational and everything done there propagates to other concurrency APIs. Ideally, this API should be as compact, orthogonal, and non-opinionated as possible.1

We haven't done that with Shutdown either. This simply makes it more powerful.

I disagree with the premise, "Currently consuming items from a queue is very complex." Most consumers are simple. They only add task_done if they need to track and join on task completion. The PR does very little, only combining a while/get step with a try/except that isn't always needed.1

According to my GitHub search, only a minority of 24.5% are "simple" consumers:4

"performance is not the only golden rule for CPython. Code readability is critical for maintenance"5

Minor respellings typically don't benefit users. In this case, I think it makes them worse-off being hiding two essential elements that should be explicit (the get call and catching the exception).1

q.iter() makes this a bit more explicit. I think which exceptions are caught should make sense.

queue.Queue.iter(block=True, timeout=None)
What would this do with an Empty exception? Would it eat the exception and terminate the for-loop treating it exactly the same as a Shutdown? This seems like bug bait.6

An error is raised if it's the first element. "This ensures the queue is not empty (without risking race conditions). That seems like the only sensible behaviour as queues aren't infinite (why would you otherwise want to iterate over them without waiting).7

def iter(self, block=True, timeout=None):
    try:
        yield self.get(block=block, timeout=timeout)
    except ShutDown:
        return

    try:
        while True:
            yield self.get(block=block, timeout=timeout)
    except (Empty, ShutDown):
        return

Also, it distances the user from adding any other behavior in the except-clause, possibly making a log entry or finalizing a resource.6

You can do that after the for loop:8

for ... in q.iter():
    # do something

cleanup(q)

And as noted above it, distances the user from adding timeouts and non-blocking behaviors which typically need special handling distinct from the case of a shutdown.6

In that case, create a subclass.8

I also don't see how this would work well with code that wants to join() a queue where a worker needs to call task_done().6

It's safer to call q.task_done() explicitly, see the discussion here: #119154.9

In short, users that want to do anything other than one specific case are made worse-off by trying to use the for-loop variant. It won't even be obvious that they have options.6

I think that in most cases a subclass won't be necessary.

Footnotes

  1. https://github.com/python/cpython/pull/120503#issuecomment-2185001370 2 3 4 5 6 7

  2. https://github.com/python/cpython/issues/119154#issue-2304290807

  3. https://github.com/python/cpython/pull/120503#issuecomment-2185094829

  4. https://github.com/python/cpython/pull/120503#issuecomment-2185070439

  5. https://github.com/python/cpython/issues/121380#issuecomment-2209589929

  6. https://github.com/python/cpython/pull/120503#issuecomment-2185164409 2 3 4 5

  7. https://github.com/python/cpython/pull/120503#issuecomment-2185179733

  8. https://github.com/python/cpython/pull/120503#issuecomment-2185226017 2

  9. https://github.com/python/cpython/pull/120503#issuecomment-2185268892

@rhettinger
Copy link
Contributor

rhettinger commented Sep 24, 2024

We discussed this at the sprint today and agreed this (or any variation of it) should not be done. Multiple reasons were given.

The most important was that this is only one of the multitude of reasonable ways to use existing queue API: the new shutdown exception, previously existing custom exceptions, waiting for timeouts, multiple producers, multiple consumers, having a second queue for two-way communication, etc.

Another reason was that a senior concurrency developer had become averse to trend widening existing APIs to support the various dunder method protocols (iterator, context manager etc).

Other devs agreed that a for-loop front-end would hide (make implicit) details that we really want to be explicit for code review (i.e. when blocking happens and what exception will be caught).

This senior concurrency dev suggested that I write a small edit to the docs showing a user could easily write a wrapper like this. I'll do that in a separate PR.

@rhettinger rhettinger closed this Sep 24, 2024
@nineteendo nineteendo deleted the add-Queue.iter branch September 24, 2024 07:08
@nineteendo
Copy link
Contributor Author

Does this decision apply to #120491 as well? In that case both the issue and PR should be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

4 participants